libt2n: (gerd) add lots of error handling code, unit tests for this error handling...
authorGerd v. Egidy <gerd.von.egidy@intra2net.com>
Fri, 13 Jun 2008 14:48:51 +0000 (14:48 +0000)
committerGerd v. Egidy <gerd.von.egidy@intra2net.com>
Fri, 13 Jun 2008 14:48:51 +0000 (14:48 +0000)
examples-codegen/example1-client/client.cpp
examples-codegen/example1/server.cpp
examples-codegen/example2/server.cpp
examples/minimalistic-server.cpp
src/command_server.cpp
src/command_server.hxx
src/socket_handler.cpp
src/socket_handler.hxx
src/socket_server.cpp
test/timeout.cpp

index e35799e..d0a5ebe 100644 (file)
@@ -6,24 +6,26 @@
 
 int main(int argc, char** argv)
 {
-  // use a local socket (a.k.a "unix domain socket")
-  // if you want to connect to a tcp/ip server you pass the port and server name to the constructor
-  libt2n::socket_client_connection sc("./socket");
-  // this generated class has a method for each of our exported procedures
-  cmd_group_t2nexample_client cc(sc);
+    // use a local socket (a.k.a "unix domain socket")
+    // if you want to connect to a tcp/ip server you pass the port and server name to the constructor
+    libt2n::socket_client_connection sc("./socket");
+    // this generated class has a method for each of our exported procedures
+    cmd_group_t2nexample_client cc(sc);
 
-  bool throwok=false;
+    bool throwok=false;
 
-  // exceptions are passed back to the client transparently
-  try
-  {
-    // call the remote procedure (we pass "throw" to cause a exception to be thrown)
-    cc.testfunc("throw");
-  }catch(libt2n::t2n_runtime_error &e){
-    throwok=(std::string(e.what())=="throw me around");
-  }
+    // exceptions are passed back to the client transparently
+    try
+    {
+        // call the remote procedure (we pass "throw" to cause a exception to be thrown)
+        cc.testfunc("throw");
+    }
+    catch(libt2n::t2n_runtime_error &e)
+    {
+        throwok=(std::string(e.what())=="throw me around");
+    }
 
-  // call remote procedure and check the return value is correct
-  return ( throwok && ( cc.testfunc("hello") == "hello, testfunc() was here" ) )
-    ? EXIT_SUCCESS : EXIT_FAILURE;
+    // call remote procedure and check the return value is correct
+    return ( throwok && ( cc.testfunc("hello") == "hello, testfunc() was here" ) )
+        ? EXIT_SUCCESS : EXIT_FAILURE;
 }
index f392e03..528eabc 100644 (file)
@@ -4,15 +4,20 @@
 // the automatically generated server side code (cmd_group_t2nexample class)
 #include "t2nexample_server.hxx"
 
-int main(int argc, char** argv) {
-  // create local socket server (a.k.a "unix domain socket")
-  // if you want to to create a tcp/ip server you pass the port to the constructor (for details take a look at the socket_server class documentation)
-  libt2n::socket_server ss("./socket");
-  libt2n::group_command_server<cmd_group_t2nexample> cs(ss);
+int main(int argc, char** argv)
+{
+    // don't kill the server on broken pipe
+    signal(SIGPIPE, SIG_IGN);
 
-  // handle requests
-  while(true)
-    cs.handle();
+    // create local socket server (a.k.a "unix domain socket")
+    // if you want to to create a tcp/ip server you pass the port to the constructor
+    // (for details take a look at the socket_server class documentation)
+    libt2n::socket_server ss("./socket");
+    libt2n::group_command_server<cmd_group_t2nexample> cs(ss);
 
-  return 0;
+    // handle requests
+    while(true)
+        cs.handle();
+
+    return 0;
 }
index 2049f5a..f7ce0f7 100644 (file)
 using namespace libt2n;
 
 
-int main(int argc, char** argv) {
-  socket_server ss("./socket");
-  socket_server ss_other("./socket_other");
-  group_command_server<cmd_group_default> cs(ss);
-  group_command_server<cmd_group_other> cs_other(ss_other);
-
-  // handle requests
-  while(true) {
-    cs.handle(1000);
-    cs_other.handle(1000);
-  }
-
-  return 0;
+int main(int argc, char** argv)
+{
+    // don't kill the server on broken pipe
+    signal(SIGPIPE, SIG_IGN);
+
+    socket_server ss("./socket");
+    socket_server ss_other("./socket_other");
+    group_command_server<cmd_group_default> cs(ss);
+    group_command_server<cmd_group_other> cs_other(ss_other);
+
+    // handle requests
+    while(true)
+    {
+        cs.handle(1000);
+        cs_other.handle(1000);
+    }
+
+    return 0;
 }
index 39826f2..19114b4 100644 (file)
@@ -17,6 +17,8 @@
     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */
 
+#include <signal.h>
+
 #include <string>
 #include "minimalistic-stub.hxx"
 
@@ -41,15 +43,19 @@ LIBT2N_EXPORT std::string testfunc(const string& str)
 
 using namespace libt2n;
 
-int main(int argc, char** argv) {
-  extended_type_info_test();
+int main(int argc, char** argv)
+{
+    extended_type_info_test();
+
+    // don't kill us on broken pipe
+    signal(SIGPIPE, SIG_IGN);
 
-  socket_server ss("./socket");
-  group_command_server<cmd_group_example> cs(ss);
+    socket_server ss("./socket");
+    group_command_server<cmd_group_example> cs(ss);
 
-  // handle requests (without timeout)
-  while(true)
-    cs.handle();
+    // handle requests (without timeout)
+    while(true)
+        cs.handle();
 
-  return 0;
+    return 0;
 }
index cc66de4..f596860 100644 (file)
@@ -166,7 +166,21 @@ void command_server::handle(long long usec_timeout, long long* usec_timeout_rema
         unsigned int conn_id;
 
         while (s.get_packet(packet,conn_id))
-            handle_packet(packet,s.get_connection(conn_id)); 
+        {
+            server_connection* conn=s.get_connection(conn_id);
+            if (!conn)
+                EXCEPTIONSTREAM(error,logic_error,"illegal connection id " << conn_id << " received");
+
+            try
+                { handle_packet(packet,conn); }
+            catch (t2n_transfer_error &e)
+            {
+                // shut down a connection with transfer errors (usually write errors)
+                conn->close();
+            }
+            catch(...)
+                { throw; }
+        }
     }
     s.cleanup();
 }
index be3a6ab..378ef08 100644 (file)
@@ -43,6 +43,9 @@ class command_server
         void handle(long long usec_timeout=-1, long long* usec_timeout_remaining=NULL);
 
         void send_hello(unsigned int conn_id);
+
+        std::ostream* get_logstream(log_level_values level)
+            { return s.get_logstream(level); }
 };
 
 template<class T, class B> struct Derived_from {
index 0082886..532c7f8 100644 (file)
@@ -50,6 +50,7 @@ socket_handler::socket_handler(int _sock, socket_type_value _socket_type)
 : sock(_sock)
 , recv_buffer_size( default_recv_buffer_size )
 , write_block_size( default_write_block_size )
+, write_timeout( default_write_timeout )
 , socket_type(_socket_type)
 {
 }
@@ -126,9 +127,9 @@ void socket_handler::set_recv_buffer_size(unsigned int new_recv_buffer_size)
  * @brief set new size for the data chunks when writeing.
  * @param new_write_block_size the new chunk size.
  *
- * The write block size determines the amound of data which is tried to write
+ * The write block size determines the amount of data which is tried to write
  * to the socket when data needs to be sended.
- * Since writeing data is done in a loop, this does not limit the amunt of data which can
+ * Since writing data is done in a loop, this does not limit the amunt of data which can
  * be written.
  *
  * The value is normalized to be at least 512 bytes and at max 32K bytes.
@@ -139,6 +140,20 @@ void socket_handler::set_write_block_size(unsigned int new_write_block_size)
 } //
 
 
+/**
+ * @brief set new timeout for writing a block
+ * @param new_write_timeout the new timeout in usecs, -1: wait endless
+ *
+ * The write timeout determines the maximum amount of time that is waited
+ * between writing each block. If the timeout is exceeded, write will
+ * throw t2n_transfer_error
+ */
+void socket_handler::set_write_timeout(long long new_write_timeout)
+{
+    write_block_size=new_write_timeout;
+} //
+
+
 /** @brief check if new data is waiting on the raw socket
     @param[in,out] usec_timeout wait until new data is found, max timeout usecs.
             -1: wait endless
@@ -216,11 +231,7 @@ bool socket_handler::fill_buffer(std::string& buffer)
             try_again=true;
         }
         else
-        {
-            LOGSTREAM(error,"error reading from socket : " << strerror(errno));
-            // TODO: exception?
-            return false;
-        }
+            EXCEPTIONSTREAM(error,t2n_transfer_error,"error reading from socket : " << strerror(errno));
     }
 
     // End-of-file
@@ -261,35 +272,15 @@ void socket_handler::socket_write(const std::string& data)
             write_size = data.size()-offset;
 
         int rtn;
-        while ((rtn=::write(sock, data.data()+offset, write_size)) &&
-               rtn == -1 && (errno == EAGAIN || errno == EINTR))
+        while ((rtn=::write(sock, data.data()+offset, write_size)) == -1 &&
+               (errno == EAGAIN || errno == EINTR))
         {
-            fd_set write_set[1];
-            fd_set except_set[1];
-            FD_ZERO(write_set);
-            FD_ZERO(except_set);
-            FD_SET(sock, write_set);
-            FD_SET(sock, except_set);
-            // let's wait for the socket to become writable again...
-            //TODO: use a timeout here?
-            int n= ::select(sock+1, NULL, write_set, except_set, NULL);
-            if (n==1 && ! FD_ISSET(sock,write_set) && FD_ISSET(sock, except_set))
-            {
-                // if we are selected but cannot write and have an exception
-                // we have serious trouble...
-                LOGSTREAM(error,"exception on socket; cannot write any more.");
-                //TODO: exception?
-                return;
-            }
+            wait_ready_to_write(sock,write_timeout);
             LOGSTREAM(debug,"resuming write() call after EAGAIN or EINTR");
         }
 
         if (rtn == -1)
-        {
-            LOGSTREAM(error,"write() returned " << strerror(errno));
-            // TODO: exception?
-            return;
-        }
+            EXCEPTIONSTREAM(error,t2n_transfer_error,"write() returned " << strerror(errno));
         else if (rtn != write_size)
         {
             LOGSTREAM(debug,"write() wrote " << rtn << " bytes, should have been "
@@ -306,4 +297,49 @@ void socket_handler::socket_write(const std::string& data)
     return;
 } // eo socket_handler::socket_write(const std::string&)
 
+/// wait until the socket is ready to write again
+void socket_handler::wait_ready_to_write(int socket, long long write_block_timeout)
+{
+    // prepare socket sets
+    fd_set write_set[1];
+    fd_set except_set[1];
+    FD_ZERO(write_set);
+    FD_ZERO(except_set);
+    FD_SET(socket, write_set);
+    FD_SET(socket, except_set);
+
+    // prepare timeout struct
+    struct timeval tval;
+    struct timeval *timeout_ptr;
+
+    if (write_block_timeout == -1)
+        timeout_ptr = NULL;
+    else
+    {
+        timeout_ptr = &tval;
+
+        // convert timeout from long long usec to int sec + int usec
+        tval.tv_sec = write_block_timeout / 1000000;
+        tval.tv_usec = write_block_timeout % 1000000;
+    }
+
+    // let's wait for the socket to become writable again...
+    int rtn;
+    while ((rtn=::select(socket+1, NULL, write_set, except_set, timeout_ptr)) ==-1 && errno == EINTR);
+
+    if (rtn > 0 && (!FD_ISSET(socket,write_set)) && FD_ISSET(socket, except_set))
+    {
+        // if we are selected but cannot write and have an exception
+        // we have serious trouble...
+        EXCEPTIONSTREAM(error,t2n_transfer_error,"exception on socket; cannot write any more.");
+    }
+
+    if (rtn==0)
+        EXCEPTIONSTREAM(error,t2n_transfer_error,"timeout on select() for write");
+
+    if (rtn==-1)
+        EXCEPTIONSTREAM(error,t2n_transfer_error,"cannot select() for write: " << strerror(errno));
+}
+
+
 }
index b9aa175..623f31c 100644 (file)
@@ -34,15 +34,18 @@ class socket_handler
     private:
         static const unsigned int default_recv_buffer_size=2048;
         static const unsigned int default_write_block_size=4096;
+        static const long long default_write_timeout=30000000;
 
         socket_type_value socket_type;
 
         bool data_waiting(long long usec_timeout,long long *timeout_remaining=NULL);
+        void wait_ready_to_write(int socket, long long write_block_timeout);
 
     protected:
         int sock;
         unsigned int recv_buffer_size;
         unsigned int write_block_size;
+        long long write_timeout;
 
         socket_handler(int _sock, socket_type_value _socket_type);
 
@@ -67,9 +70,11 @@ class socket_handler
 
         void set_recv_buffer_size(unsigned int new_recv_buffer_size);
         void set_write_block_size(unsigned int new_write_block_size);
+        void set_write_timeout(long long new_write_timeout);
 
         unsigned int get_recv_buffer_size() const { return recv_buffer_size; }
         unsigned int get_write_block_size() const { return write_block_size; }
+        long long get_write_timeout() const { return write_timeout; }
 };
 
 }
index 0eff676..1fc99d8 100644 (file)
@@ -152,14 +152,17 @@ void socket_server::new_connection()
     int newsock = accept (sock,(struct sockaddr *) &clientname,&size);
     if (newsock < 0)
     {
-        if (errno == EAGAIN)
+        // return on non-fatal errors (list taken from man-page)
+        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ECONNABORTED || errno == EINTR ||
+            errno == EMFILE || errno == ENFILE || errno == ENOBUFS || errno == ENOMEM ||
+            errno == EPROTO || errno ==  EPERM || errno == ETIMEDOUT)
         {
-            LOGSTREAM(error,"accept error (EAGAIN): no connection waiting");
+            LOGSTREAM(error,"non-fatal accept error: " << strerror(errno));
             return;
         }
 
-        /* default: break */
-        EXCEPTIONSTREAM(error,t2n_server_error,"error accepting connection: " << strerror(errno));
+        /* fatal error: will usually kill or restart the server */
+        EXCEPTIONSTREAM(error,t2n_server_error,"fatal error accepting connection: " << strerror(errno));
     }
 
     FD_SET (newsock, &connection_set);
@@ -242,8 +245,18 @@ bool socket_server::fill_connection_buffers()
     std::map<unsigned int, server_connection*>::iterator ie=connections.end();
     for(std::map<unsigned int, server_connection*>::iterator i=connections.begin(); i != ie; i++)
         if (!i->second->server_connection::is_closed())
-            if (i->second->fill_buffer(0))
-                data_found=true;
+        {
+            // shutdown all connections which throw exceptions to protect the server
+            try
+            {
+                if (i->second->fill_buffer(0))
+                    data_found=true;
+            }
+            catch (t2n_transfer_error &e)
+                { i->second->close(); }
+            catch(...)
+                { throw; }
+        }
 
     return data_found;
 }
index 0493d62..8bdfa31 100644 (file)
@@ -120,14 +120,26 @@ class real_write_connection: public socket_server_connection
             { socket_write(data); }
 };
 
+// this is an evil hack to get access to real_write, don't ever do this in an app!!!
+class real_write_client_connection: public socket_client_connection
+{
+    public:
+        void real_write(const std::string& data)
+            { socket_write(data); }
+};
+
 class test_timeout : public TestFixture
 {
     CPPUNIT_TEST_SUITE(test_timeout);
 
+    CPPUNIT_TEST(ConnectTimeout);
     CPPUNIT_TEST(HelloTimeoutNothing);
     CPPUNIT_TEST(HelloTimeoutSlowData);
     CPPUNIT_TEST(CommandTimeout);
     CPPUNIT_TEST(CommandSlowResponse);
+    CPPUNIT_TEST(DisconnectOnWrite);
+    CPPUNIT_TEST(DisconnectOnRead);
+    CPPUNIT_TEST(BreakAccept);
 
     CPPUNIT_TEST_SUITE_END();
 
@@ -164,6 +176,51 @@ class test_timeout : public TestFixture
         }
     }
 
+    void ConnectTimeout()
+    {
+        pid_t pid;
+
+        switch(pid=fork())
+        {
+            case -1:
+            {
+                CPPUNIT_FAIL("fork error");
+                break;
+            }
+            case 0:
+            // child
+            {
+                socket_server ss("./socket");
+
+                // don't call atexit and stuff
+                _exit(0);
+            }
+
+            default:
+            // parent
+            {
+                string data;
+
+                // wait till server is up
+                sleep(1);
+
+                string errormsg;
+
+                try
+                {
+                    socket_client_connection sc("./socket");
+                    command_client cc(sc,1000000,1000000);
+                }
+                catch(t2n_transfer_error &e)
+                { errormsg=e.what(); }
+                catch(...)
+                { throw; }
+
+                CPPUNIT_ASSERT_EQUAL(string("error reading from socket : Invalid argument"),errormsg);
+            }
+        }
+    }
+
     void HelloTimeoutNothing()
     {
         pid_t pid;
@@ -406,6 +463,237 @@ class test_timeout : public TestFixture
         }
     }
 
+    void DisconnectOnWrite()
+    {
+        pid_t pid;
+
+        switch(pid=fork())
+        {
+            case -1:
+            {
+                CPPUNIT_FAIL("fork error");
+                break;
+            }
+            case 0:
+            // child
+            {
+                socket_server ss("./socket");
+
+                // bail out as soon as we get something
+                ss.fill_buffer(-1);
+                // don't call atexit and stuff
+                _exit(0);
+            }
+
+            default:
+            // parent
+            {
+                string data;
+
+                // don't kill us on broken pipe
+                signal(SIGPIPE, SIG_IGN);
+
+                // wait till server is up
+                sleep(1);
+                socket_client_connection sc("./socket");
+
+                string errormsg;
+
+                string huge(1000000,'x');
+
+                try
+                {
+                    sc.write(huge);
+                }
+                catch(t2n_transfer_error &e)
+                { errormsg=e.what(); }
+                catch(...)
+                { throw; }
+
+                CPPUNIT_ASSERT_EQUAL(string("write() returned Broken pipe"),errormsg);
+            }
+        }
+    }
+
+    void DisconnectOnRead()
+    {
+        pid_t pid;
+
+        switch(pid=fork())
+        {
+            case -1:
+            {
+                CPPUNIT_FAIL("fork error");
+                break;
+            }
+            case 0:
+            // child
+            {
+                // wait till server is up
+                sleep(1);
+
+                socket_client_connection sc("./socket");
+
+                // this is an evil hack to get access to real_write, don't ever do this in an app!!!
+                real_write_client_connection *rwc=(real_write_client_connection*)&sc;
+                rwc->real_write(string(10000,'x'));
+
+                // don't call atexit and stuff
+                _exit(0);
+            }
+
+            default:
+            // parent
+            {
+                // don't kill us on broken pipe
+                signal(SIGPIPE, SIG_IGN);
+
+                socket_server ss("./socket");
+
+                time_t t0 = time(NULL);
+
+                // max 5 sec
+                while (time(NULL) < t0 + 5 )
+                {
+                    ss.fill_buffer(1000000);
+
+                    string data;
+                    ss.get_packet(data);
+                }
+
+                // are we still alive and able to process data?
+
+                switch(pid=fork())
+                {
+                    case -1:
+                    {
+                        CPPUNIT_FAIL("fork error");
+                        break;
+                    }
+                    case 0:
+                    // child
+                    {
+                        socket_client_connection *sc=new socket_client_connection("./socket");
+                        sc->write(string(10000,'x'));
+                        delete sc;
+                        // socket is closed regularly
+
+                        // don't run regular cleanup, otherwise cppunit stuff gets called
+                        _exit(0);
+                    }
+
+                    default:
+                    // parent
+                    {
+                        string received;
+
+                        t0 = time(NULL);
+
+                        // max 10 sec
+                        while (time(NULL) < t0 + 10 )
+                        {
+                            ss.fill_buffer(1000000);
+
+                            if (ss.get_packet(received))
+                                break;
+                        }
+
+                        CPPUNIT_ASSERT_EQUAL(string(10000,'x'),received);
+                    }
+                }
+            }
+        }
+    }
+
+    void BreakAccept()
+    {
+        pid_t pid;
+
+        switch(pid=fork())
+        {
+            case -1:
+            {
+                CPPUNIT_FAIL("fork error");
+                break;
+            }
+            case 0:
+            // child
+            {
+                // wait till server is really up and waiting
+                sleep(2);
+
+                // connect with very tight timeout and only 1 retry
+                socket_client_connection sc("./socket",50,1);
+
+                // don't call atexit and stuff
+                _exit(0);
+            }
+
+            default:
+            // parent
+            {
+                // don't kill us on broken pipe
+                signal(SIGPIPE, SIG_IGN);
+
+                socket_server ss("./socket");
+
+                // server is "working" while client wants to connect
+                sleep(5);
+
+                time_t t0 = time(NULL);
+
+                // max 5 sec
+                while (time(NULL) < t0 + 5 )
+                {
+                    ss.fill_buffer(1000000);
+
+                    string data;
+                    ss.get_packet(data);
+                }
+
+                // are we still alive and able to process data?
+
+                switch(pid=fork())
+                {
+                    case -1:
+                    {
+                        CPPUNIT_FAIL("fork error");
+                        break;
+                    }
+                    case 0:
+                    // child
+                    {
+                        socket_client_connection *sc=new socket_client_connection("./socket");
+                        sc->write(string(10000,'x'));
+                        delete sc;
+                        // socket is closed regularly
+
+                        // don't run regular cleanup, otherwise cppunit stuff gets called
+                        _exit(0);
+                    }
+
+                    default:
+                    // parent
+                    {
+                        string received;
+
+                        t0 = time(NULL);
+
+                        // max 10 sec
+                        while (time(NULL) < t0 + 10 )
+                        {
+                            ss.fill_buffer(1000000);
+
+                            if (ss.get_packet(received))
+                                break;
+                        }
+
+                        CPPUNIT_ASSERT_EQUAL(string(10000,'x'),received);
+                    }
+                }
+            }
+        }
+    }
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION(test_timeout);