From: Gerd v. Egidy Date: Thu, 24 Jul 2008 17:37:26 +0000 (+0000) Subject: libt2n: (gerd) socket reconnect added (incl. unit tests) X-Git-Tag: v0.4~18 X-Git-Url: http://developer.intra2net.com/git/?p=libt2n;a=commitdiff_plain;h=af84dfb53a739a0c8c343d9172f1847fa908906d libt2n: (gerd) socket reconnect added (incl. unit tests) --- diff --git a/src/command_client.cpp b/src/command_client.cpp index 376d3f9..0f44203 100644 --- a/src/command_client.cpp +++ b/src/command_client.cpp @@ -51,6 +51,35 @@ command_client::command_client(client_connection& _c, long long _command_timeout read_hello(); } +/** @brief replace the connection currently in use with a new one + + @param _c reference of the new connection + + @note the old connection must still be valid when this method is called, + it can safely be deleted after this method returned + + @note all callbacks registered on the old connection will be copied over + to the new one +*/ +void command_client::replace_connection(client_connection& _c) +{ + // copy all callbacks registered on the old connection + for(callback_event_type e=static_cast(0); + e < __events_end; + e=static_cast(static_cast(e)+1)) + { + list > evcb=c.get_callback_list(e); + + for (list >::iterator i=evcb.begin(); i != evcb.end(); i++) + _c.add_callback(e,*i); + } + + // replace the connection + c=_c; + + read_hello(); +} + std::string command_client::read_packet(const long long &usec_timeout) { string resultpacket; diff --git a/src/command_client.hxx b/src/command_client.hxx index 257b0a8..1853e9c 100644 --- a/src/command_client.hxx +++ b/src/command_client.hxx @@ -48,6 +48,8 @@ class command_client long long _hello_timeout_usec=hello_timeout_usec_default); virtual ~command_client() {} + void replace_connection(client_connection& _c); + void send_command(command* cmd, result_container &res); void set_command_timeout_usec(long long _command_timeout_usec=command_timeout_usec_default) diff --git a/src/connection.cpp b/src/connection.cpp index b4cf198..30a0b19 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include @@ -103,6 +104,39 @@ unsigned int connection::peek_packet(std::string& data) return psize; } +/// remove all data from buffer that is not a complete packet +void connection::remove_incomplete_packets() +{ + std::string::size_type p=0; + std::string::size_type end=buffer.size(); + + while (p < end) + { + // not enough space for size information -> no packet + if (p+sizeof(packet_size_indicator) > end) + break; + + packet_size_indicator psize=ntohl(*((packet_size_indicator*)(buffer.data()+p))); + + if (p+sizeof(packet_size_indicator)+psize > end) + { + // incomplete packet + break; + } + else + { + // move p to where the next packet will start + p+=sizeof(packet_size_indicator)+psize; + } + } + + if (p < end) + { + // incomplete packets there, remove them + buffer.erase(p); + } +} + /// send a blob to the peer void connection::write(const std::string& data) { @@ -125,9 +159,6 @@ void connection::write(const std::string& data) */ void connection::add_callback(callback_event_type event, const boost::function& func) { - if (event == new_connection) - throw std::logic_error("new_connection callback not allowed for server_connections"); - callbacks[event].push_back(func); } @@ -154,5 +185,24 @@ std::list > connection::get_callback_list(callback_even return callbacks[event]; } +/** @brief reopen a already closed connection, removes incomplete packets from the buffer + + @note Only call when the connection is closed. + + @note Justs cares about the data of connection, reconnecting has to be + done in a derived class. +*/ +void connection::reopen() +{ + if (!is_closed()) + throw std::logic_error("connection::reopen() called with connection still open"); + + closed=false; + + // incomplete buffer data is worthless with a new connection + remove_incomplete_packets(); + + do_callbacks(new_connection); +} } diff --git a/src/connection.hxx b/src/connection.hxx index 94a4c02..8e72bf6 100644 --- a/src/connection.hxx +++ b/src/connection.hxx @@ -60,6 +60,9 @@ class connection void do_callbacks(callback_event_type event); + void reopen(void); + void remove_incomplete_packets(); + public: virtual ~connection(); diff --git a/src/server.cpp b/src/server.cpp index 32dda77..62cf91c 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -125,7 +125,7 @@ int server::add_connection(server_connection* newconn) newconn->set_server(this); connections[cid]=newconn; - // add all callbacks + // add all callbacks except new_connection for(int e=connection_closed; e != __events_end; e++) { std::list >::iterator i,ie=callbacks[e].end(); diff --git a/src/socket_client.cpp b/src/socket_client.cpp index 8b70511..4f4975f 100644 --- a/src/socket_client.cpp +++ b/src/socket_client.cpp @@ -185,4 +185,21 @@ void socket_client_connection::close() } } +/** @brief try to reconnect the current connection with the same connection credentials (host and port or path) +*/ +void socket_client_connection::reconnect() +{ + // close the current connection if still open + close(); + + socket_type_value type=get_type(); + + if (type == tcp_s) + tcp_connect(max_retries); + else if (type == unix_s) + unix_connect(max_retries); + + reopen(); +} + } diff --git a/src/socket_client.hxx b/src/socket_client.hxx index 7160e0d..a7fe64e 100644 --- a/src/socket_client.hxx +++ b/src/socket_client.hxx @@ -75,6 +75,8 @@ class socket_client_connection : public client_connection, public socket_handler { return socket_handler::fill_buffer(buffer,usec_timeout,usec_timeout_remaining); } void close(); + + void reconnect(); }; } diff --git a/test/Makefile.am b/test/Makefile.am index d41e6e4..ed42baf 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -4,7 +4,7 @@ check_PROGRAMS = test test_LDADD = $(top_builddir)/src/libt2n.la @BOOST_SERIALIZATION_LIB@ \ @BOOST_LDFLAGS@ @CPPUNIT_LIBS@ -test_SOURCES = test.cpp comm.cpp simplecmd.cpp callback.cpp hello.cpp \ - timeout.cpp serialize.cpp cmdgroup.cpp +test_SOURCES = callback.cpp cmdgroup.cpp comm.cpp hello.cpp reconnect.cpp \ + serialize.cpp simplecmd.cpp test.cpp timeout.cpp TESTS = test diff --git a/test/callback.cpp b/test/callback.cpp index 9d4c7d1..bc951b9 100644 --- a/test/callback.cpp +++ b/test/callback.cpp @@ -297,7 +297,7 @@ class test_callback : public TestFixture bool got_1=false; - for (int i=0; i < 4; i++) + for (int i=0; i < 5; i++) { ss.fill_buffer(500000); @@ -314,8 +314,8 @@ class test_callback : public TestFixture } ss.cleanup(); - CPPUNIT_ASSERT_EQUAL(true,static_cast(callback_done[connection_closed])); - CPPUNIT_ASSERT_EQUAL(false,static_cast(callback_done[connection_deleted])); + CPPUNIT_ASSERT_EQUAL_MESSAGE("closed done",true,static_cast(callback_done[connection_closed])); + CPPUNIT_ASSERT_EQUAL_MESSAGE("not deleted yet",false,static_cast(callback_done[connection_deleted])); for (int i=0; i < 4; i++) { @@ -328,8 +328,8 @@ class test_callback : public TestFixture } ss.cleanup(); - CPPUNIT_ASSERT_EQUAL(true,static_cast(callback_done[connection_closed])); - CPPUNIT_ASSERT_EQUAL(true,static_cast(callback_done[connection_deleted])); + CPPUNIT_ASSERT_EQUAL_MESSAGE("closed done (2)",true,static_cast(callback_done[connection_closed])); + CPPUNIT_ASSERT_EQUAL_MESSAGE("deleted done",true,static_cast(callback_done[connection_deleted])); } } } diff --git a/test/reconnect.cpp b/test/reconnect.cpp new file mode 100644 index 0000000..09c5f5f --- /dev/null +++ b/test/reconnect.cpp @@ -0,0 +1,549 @@ +/*************************************************************************** + * Copyright (C) 2004 by Intra2net AG * + * info@intra2net.com * + * * + ***************************************************************************/ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#ifdef HAVE_CONFIG_H +#include +#endif + +using namespace std; +using namespace libt2n; +using namespace CppUnit; + +// this is an evil hack to get access to real_write, don't ever do this in an app!!! +class real_write_connection: public socket_server_connection +{ + public: + void real_write(const std::string& data) + { socket_write(data); } +}; + +class test_reconnect : public TestFixture +{ + CPPUNIT_TEST_SUITE(test_reconnect); + + CPPUNIT_TEST(simple_reconnect); + CPPUNIT_TEST(reconnect_with_close); + CPPUNIT_TEST(reconnect_buffer_complete); + CPPUNIT_TEST(reconnect_buffer_several_complete); + CPPUNIT_TEST(reconnect_buffer_no_incomplete1); + CPPUNIT_TEST(reconnect_buffer_no_incomplete2); + + CPPUNIT_TEST_SUITE_END(); + + public: + + void setUp() + { } + + void tearDown() + { } + + void send_raw_socket(string hello_string, socket_server* ss, int conn_id) + { + socket_server_connection *ssc=dynamic_cast(ss->get_connection(conn_id)); + + // this is an evil hack to get access to real_write, don't ever do this in an app!!! + real_write_connection *rwc=(real_write_connection*)ssc; + rwc->real_write(hello_string); + } + + void simple_reconnect() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + socket_server ss("./socket"); + + time_t t0 = time(NULL); + + // max 10 sec + while (time(NULL) < t0 + 10 ) + { + ss.fill_buffer(1000000); + + string data; + unsigned int cid; + + if(ss.get_packet(data,cid)) + { + server_connection* con=ss.get_connection(cid); + + if (data=="QUIT") + break; + + if (data=="x") + con->write(string().insert(0,100,'X')); + else + con->write(string().insert(0,100,'Y')); + } + } + + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + // don't kill us on broken pipe + signal(SIGPIPE, SIG_IGN); + + // wait till server is up + sleep(1); + socket_client_connection sc("./socket"); + + sc.write("abc"); + + string data; + + while (!sc.get_packet(data)) + sc.fill_buffer(1000000); + + sc.reconnect(); + + sc.write("x"); + + while (!sc.get_packet(data)) + sc.fill_buffer(1000000); + + CPPUNIT_ASSERT_EQUAL(string().insert(0,100,'X'),data); + } + } + } + + void reconnect_with_close() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + socket_server ss("./socket"); + + time_t t0 = time(NULL); + + // max 10 sec + while (time(NULL) < t0 + 10 ) + { + ss.fill_buffer(1000000); + + string data; + unsigned int cid; + + if(ss.get_packet(data,cid)) + { + server_connection* con=ss.get_connection(cid); + + if (data=="QUIT") + break; + + if (data=="x") + con->write(string().insert(0,100,'X')); + else + con->write(string().insert(0,100,'Y')); + } + } + + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + // don't kill us on broken pipe + signal(SIGPIPE, SIG_IGN); + + // wait till server is up + sleep(1); + socket_client_connection sc("./socket"); + + sc.write("abc"); + + string data; + + while (!sc.get_packet(data)) + sc.fill_buffer(1000000); + + sc.close(); + + // empty buffer + sc.get_packet(data); + + sc.reconnect(); + + sc.write("x"); + + while (!sc.get_packet(data)) + sc.fill_buffer(1000000); + + CPPUNIT_ASSERT_EQUAL(string().insert(0,100,'X'),data); + } + } + } + + void reconnect_buffer_complete() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + socket_server ss("./socket"); + + time_t t0 = time(NULL); + + // max 10 sec + while (time(NULL) < t0 + 10 ) + { + ss.fill_buffer(1000000); + + string data; + unsigned int cid; + + if(ss.get_packet(data,cid)) + { + server_connection* con=ss.get_connection(cid); + + if (data=="QUIT") + break; + + if (data=="x") + con->write(string().insert(0,100,'X')); + } + } + + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + // don't kill us on broken pipe + signal(SIGPIPE, SIG_IGN); + + // wait till server is up + sleep(1); + socket_client_connection sc("./socket"); + + sc.write("x"); + + string data; + + while (!sc.packet_available()) + sc.fill_buffer(1000000); + + sc.reconnect(); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("packet not there",true,sc.get_packet(data)); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("data incorrect",string().insert(0,100,'X'),data); + } + } + } + + void reconnect_buffer_several_complete() + { + pid_t pid; + + const int packets=3; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + socket_server ss("./socket"); + + time_t t0 = time(NULL); + + // max 10 sec + while (time(NULL) < t0 + 10 ) + { + ss.fill_buffer(1000000); + + string data; + unsigned int cid; + + if(ss.get_packet(data,cid)) + { + server_connection* con=ss.get_connection(cid); + + if (data=="QUIT") + break; + + if (data=="x") + { + for (int i=0; iwrite(string().insert(0,100,'X')); + } + } + } + + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + // don't kill us on broken pipe + signal(SIGPIPE, SIG_IGN); + + // wait till server is up + sleep(1); + socket_client_connection sc("./socket"); + + sc.write("x"); + + // retrieve packets for 3 seconds + time_t t0 = time(NULL); + + // max 3 sec + while (time(NULL) < t0 + 3 ) + sc.fill_buffer(1000000); + + // we now should have packets complete packets in the buffer + + sc.reconnect(); + + // are these packets still there? + + for (int i=0; i < packets; i++) + { + string data; + + ostringstream os; + os << "packet " << i << " not there"; + + CPPUNIT_ASSERT_EQUAL_MESSAGE(os.str(),true,sc.get_packet(data)); + + os.str(""); + os << "packet " << i << " incorrect"; + + CPPUNIT_ASSERT_EQUAL_MESSAGE(os.str(),string().insert(0,100,'X'),data); + } + } + } + } + + void reconnect_buffer_no_incomplete1() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + socket_server ss("./socket"); + + time_t t0 = time(NULL); + + // max 10 sec + while (time(NULL) < t0 + 10 ) + { + ss.fill_buffer(1000000); + + string data; + unsigned int cid; + + if(ss.get_packet(data,cid)) + { + server_connection* con=ss.get_connection(cid); + + if (data=="QUIT") + break; + + if (data=="x") + { + con->write(string().insert(0,100,'X')); + send_raw_socket("aaaab",&ss,cid); + } + } + } + + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + // don't kill us on broken pipe + signal(SIGPIPE, SIG_IGN); + + // wait till server is up + sleep(1); + socket_client_connection sc("./socket"); + + sc.write("x"); + + // retrieve packets for 3 seconds + time_t t0 = time(NULL); + + // max 3 sec + while (time(NULL) < t0 + 3 ) + sc.fill_buffer(1000000); + + // we now should have one complete packet and some stuff in the buffer + + string data; + sc.get_packet(data); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("no incomplete packet",true,(sc.peek_packet(data))>0); + + sc.reconnect(); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("incomplete packet not removed",0,(int)sc.peek_packet(data)); + } + } + } + + void reconnect_buffer_no_incomplete2() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + socket_server ss("./socket"); + + time_t t0 = time(NULL); + + // max 10 sec + while (time(NULL) < t0 + 10 ) + { + ss.fill_buffer(1000000); + + string data; + unsigned int cid; + + if(ss.get_packet(data,cid)) + { + server_connection* con=ss.get_connection(cid); + + if (data=="QUIT") + break; + + if (data=="x") + { + con->write(string().insert(0,100,'X')); + + string blob=string().insert(0,100,'Y'); + + // one byte will be missing... + int size=blob.size()+1; + char sizetransfer[sizeof(int)+1]; + memcpy(sizetransfer,(void*)&size,sizeof(int)); + sizetransfer[sizeof(int)+1]=0; + + string packet=string(sizetransfer)+blob; + + send_raw_socket(packet,&ss,cid); + } + } + } + + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + // don't kill us on broken pipe + signal(SIGPIPE, SIG_IGN); + + // wait till server is up + sleep(1); + socket_client_connection sc("./socket"); + + sc.write("x"); + + // retrieve packets for 3 seconds + time_t t0 = time(NULL); + + // max 3 sec + while (time(NULL) < t0 + 3 ) + sc.fill_buffer(1000000); + + // we now should have one complete packet and some stuff in the buffer + + sc.reconnect(); + + string data; + + CPPUNIT_ASSERT_EQUAL_MESSAGE("packet not there",true,sc.get_packet(data)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("data incorrect",string().insert(0,100,'X'),data); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("incomplete packet not removed",0,(int)sc.peek_packet(data)); + } + } + } + +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(test_reconnect);