libt2n: (reinhard) socket handler: buffer and blcok sizes are configurable now; incom...
authorReinhard Pfau <reinhard.pfau@intra2net.com>
Fri, 6 Jun 2008 08:22:45 +0000 (08:22 +0000)
committerReinhard Pfau <reinhard.pfau@intra2net.com>
Fri, 6 Jun 2008 08:22:45 +0000 (08:22 +0000)
src/socket_handler.cpp
src/socket_handler.hxx
test/comm.cpp

index ed14895..0082886 100644 (file)
@@ -35,6 +35,7 @@
 
 #include <sstream>
 #include <iostream>
+#include <algorithm>
 
 #include "socket_handler.hxx"
 #include "t2n_exception.hxx"
@@ -45,6 +46,15 @@ using namespace std;
 namespace libt2n
 {
 
+socket_handler::socket_handler(int _sock, socket_type_value _socket_type)
+: sock(_sock)
+, recv_buffer_size( default_recv_buffer_size )
+, write_block_size( default_write_block_size )
+, socket_type(_socket_type)
+{
+}
+
+
 /// set options like fast reuse and keepalive every socket should have
 void socket_handler::set_socket_options(int sock)
 {
@@ -96,6 +106,39 @@ bool socket_handler::is_closed()
     return !(r & O_ACCMODE);
 }
 
+
+/**
+ * @brief set a new size for the receive buffer.
+ * @param new_recv_buffer_size the new size for the receive buffer.
+ *
+ * The receive buffer determines the amount of data which is tried to read at once
+ * from the underlying socket.
+ *
+ * The value is normalized to be at least 512 bytes and at max 32K bytes.
+ */
+void socket_handler::set_recv_buffer_size(unsigned int new_recv_buffer_size)
+{
+    recv_buffer_size= std::max( 512u, std::min( 32u * 1024u, new_recv_buffer_size ));
+} //
+
+
+/**
+ * @brief set new size for the data chunks when writeing.
+ * @param new_write_block_size the new chunk size.
+ *
+ * The write block size determines the amound of data which is tried to write
+ * to the socket when data needs to be sended.
+ * Since writeing data is done in a loop, this does not limit the amunt of data which can
+ * be written.
+ *
+ * The value is normalized to be at least 512 bytes and at max 32K bytes.
+ */
+void socket_handler::set_write_block_size(unsigned int new_write_block_size)
+{
+    write_block_size= std::max( 512u, std::min( 32u * 1024u, new_write_block_size ));
+} //
+
+
 /** @brief check if new data is waiting on the raw socket
     @param[in,out] usec_timeout wait until new data is found, max timeout usecs.
             -1: wait endless
@@ -221,7 +264,23 @@ void socket_handler::socket_write(const std::string& data)
         while ((rtn=::write(sock, data.data()+offset, write_size)) &&
                rtn == -1 && (errno == EAGAIN || errno == EINTR))
         {
-            usleep (80000);
+            fd_set write_set[1];
+            fd_set except_set[1];
+            FD_ZERO(write_set);
+            FD_ZERO(except_set);
+            FD_SET(sock, write_set);
+            FD_SET(sock, except_set);
+            // let's wait for the socket to become writable again...
+            //TODO: use a timeout here?
+            int n= ::select(sock+1, NULL, write_set, except_set, NULL);
+            if (n==1 && ! FD_ISSET(sock,write_set) && FD_ISSET(sock, except_set))
+            {
+                // if we are selected but cannot write and have an exception
+                // we have serious trouble...
+                LOGSTREAM(error,"exception on socket; cannot write any more.");
+                //TODO: exception?
+                return;
+            }
             LOGSTREAM(debug,"resuming write() call after EAGAIN or EINTR");
         }
 
@@ -233,11 +292,10 @@ void socket_handler::socket_write(const std::string& data)
         }
         else if (rtn != write_size)
         {
-            LOGSTREAM(error,"write() wrote " << rtn << " bytes, should have been " 
+            LOGSTREAM(debug,"write() wrote " << rtn << " bytes, should have been "
                 << write_size << " (complete: " << data.size() << ")");
 
-            // TODO: exception?
-            return;
+            write_size = rtn;
         }
 
         offset += write_size;
@@ -246,6 +304,6 @@ void socket_handler::socket_write(const std::string& data)
     LOGSTREAM(debug,"wrote " << data.size() << " bytes");
 
     return;
-}
+} // eo socket_handler::socket_write(const std::string&)
 
 }
index 82ccdad..b9aa175 100644 (file)
@@ -32,8 +32,8 @@ namespace libt2n
 class socket_handler
 {
     private:
-        static const unsigned int recv_buffer_size=2048;
-        static const unsigned int write_block_size=4096;
+        static const unsigned int default_recv_buffer_size=2048;
+        static const unsigned int default_write_block_size=4096;
 
         socket_type_value socket_type;
 
@@ -41,9 +41,10 @@ class socket_handler
 
     protected:
         int sock;
+        unsigned int recv_buffer_size;
+        unsigned int write_block_size;
 
-        socket_handler(int _sock, socket_type_value _socket_type)
-            { sock=_sock; socket_type=_socket_type; }
+        socket_handler(int _sock, socket_type_value _socket_type);
 
         void set_socket_options(int sock);
 
@@ -58,11 +59,17 @@ class socket_handler
         bool fill_buffer(std::string& buffer);
 
     public:
-        /// is this a tcp or udp socket connection
+        /// is this a tcp or unix socket connection
         socket_type_value get_type()
             { return socket_type; }
 
         bool is_closed();
+
+        void set_recv_buffer_size(unsigned int new_recv_buffer_size);
+        void set_write_block_size(unsigned int new_write_block_size);
+
+        unsigned int get_recv_buffer_size() const { return recv_buffer_size; }
+        unsigned int get_write_block_size() const { return write_block_size; }
 };
 
 }
index 169c183..5bba916 100644 (file)
@@ -26,6 +26,7 @@ using namespace std;
 using namespace libt2n;
 using namespace CppUnit;
 
+
 class test_comm : public TestFixture
 {
     CPPUNIT_TEST_SUITE(test_comm);
@@ -35,6 +36,7 @@ class test_comm : public TestFixture
     CPPUNIT_TEST(UnixCommToServerAndBackBig);
     CPPUNIT_TEST(IPCommToServer);
     CPPUNIT_TEST(IPCommToServerAndBack);
+    CPPUNIT_TEST(IPCommToServerAndBackBig);
 
     CPPUNIT_TEST_SUITE_END();
 
@@ -327,6 +329,72 @@ class test_comm : public TestFixture
         }
     }
 
+
+    void IPCommToServerAndBackBig()
+    {
+        pid_t pid;
+
+        switch(pid=fork())
+        {
+            case -1:
+            {
+                CPPUNIT_FAIL("fork error");
+                break;
+            }
+            case 0:
+            // child
+            {
+                socket_server ss(6666);
+                ss.set_logging(&cerr,debug);
+
+                // max 10 sec
+                for (int i=0; i < 10; i++)
+                {
+                    ss.fill_buffer(1000000);
+
+                    string data;
+                    unsigned int cid;
+
+                    if(ss.get_packet(data,cid))
+                    {
+                        server_connection* con=ss.get_connection(cid);
+
+                        socket_handler* alias= dynamic_cast< socket_handler* >(con);
+
+                        if (data=="QUIT")
+                            break;
+
+                        alias->set_write_block_size( 4093 );
+                        con->write(string().insert(0,2048*1024,'y'));
+                    }
+                }
+                // don't call atexit and stuff
+                _exit(0);
+            }
+
+            default:
+            // parent
+            {
+                string data;
+
+                // wait till server is up
+                sleep(1);
+                socket_client_connection sc(6666);
+
+                sc.write(string().insert(0,100*1024,'x'));
+
+                while (!sc.get_packet(data))
+                    sc.fill_buffer(1000000);
+
+                CPPUNIT_ASSERT_EQUAL(string().insert(0,2048*1024,'y'),data);
+
+                sc.write("QUIT");
+            }
+        }
+    } // eo IPCommToServerAndBackBig()
+
+
+
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION(test_comm);