From 45a2ebc9695c4d7be6548b7e0f800d117ae56a0b Mon Sep 17 00:00:00 2001 From: Gerd v. Egidy Date: Sun, 29 Oct 2006 02:44:39 +0000 Subject: [PATCH 1/1] libt2n: (gerd) add client timeouts & tests, hello peek missing --- src/command_client.cpp | 59 +++++-- src/command_client.hxx | 6 +- src/command_server.cpp | 8 +- src/command_server.hxx | 2 +- src/connection.hxx | 7 +- src/server.hxx | 8 +- src/socket_client.hxx | 10 +- src/socket_handler.cpp | 24 ++- src/socket_handler.hxx | 4 +- src/socket_server.cpp | 15 ++- src/socket_server.hxx | 6 +- test/Makefile.am | 3 +- test/timeout.cpp | 411 ++++++++++++++++++++++++++++++++++++++++++++++++ 13 files changed, 516 insertions(+), 47 deletions(-) create mode 100644 test/timeout.cpp diff --git a/src/command_client.cpp b/src/command_client.cpp index 41c74de..77daf0d 100644 --- a/src/command_client.cpp +++ b/src/command_client.cpp @@ -39,23 +39,35 @@ using namespace std; namespace libt2n { -command_client::command_client(client_connection& _c) +command_client::command_client(client_connection& _c, long long _command_timeout_usec, long long _hello_timeout_usec) : c(_c) { + command_timeout_usec=_command_timeout_usec; + hello_timeout_usec=_hello_timeout_usec; + // for reconnects c.add_callback(new_connection,bind(&command_client::read_hello, boost::ref(*this))); read_hello(); } -void command_client::read_hello() +std::string command_client::read_packet(const long long &usec_timeout) { - // TODO: fix timeout string resultpacket; - while(!c.get_packet(resultpacket) && !c.is_closed()) - c.fill_buffer(); + bool got_packet=false; + long long my_timeout=usec_timeout; + while(!(got_packet=c.get_packet(resultpacket)) && my_timeout > 0 && !c.is_closed()) + c.fill_buffer(my_timeout,&my_timeout); - istringstream hello(resultpacket); + if (!got_packet) + throw t2n_transfer_error("timeout exceeded"); + + return resultpacket; +} + +void command_client::read_hello() +{ + istringstream hello(read_packet(hello_timeout_usec)); char chk[5]; hello.read(chk,4); @@ -87,8 +99,18 @@ void command_client::send_command(command* cmd, result_container &res) command_container cc(cmd); boost::archive::binary_oarchive oa(ofs); - // TODO: exceptions - oa << cc; + try + { + oa << cc; + } + catch(boost::archive::archive_exception &e) + { + ostringstream msg; + msg << "archive_exception while serializing on client-side, code " << e.code << " (" << e.what() << ")"; + throw t2n_serialization_error(msg.str()); + } + catch(...) + { throw; } std::ostream* ostr; if ((ostr=c.get_logstream(fulldebug))!=NULL) @@ -100,16 +122,21 @@ void command_client::send_command(command* cmd, result_container &res) c.write(ofs.str()); - // TODO: fix timeout - string resultpacket; - while(!c.get_packet(resultpacket)) - c.fill_buffer(); - - istringstream ifs(resultpacket); + istringstream ifs(read_packet(command_timeout_usec)); boost::archive::binary_iarchive ia(ifs); - // TODO: exceptions - ia >> res; + try + { + ia >> res; + } + catch(boost::archive::archive_exception &e) + { + ostringstream msg; + msg << "archive_exception while deserializing on client-side, code " << e.code << " (" << e.what() << ")"; + throw t2n_serialization_error(msg.str()); + } + catch(...) + { throw; } if ((ostr=c.get_logstream(fulldebug))!=NULL) { diff --git a/src/command_client.hxx b/src/command_client.hxx index 495192a..8c6b2bf 100644 --- a/src/command_client.hxx +++ b/src/command_client.hxx @@ -31,10 +31,14 @@ class command_client private: client_connection &c; + long long hello_timeout_usec; + long long command_timeout_usec; + void read_hello(); + std::string read_packet(const long long &usec_timeout); public: - command_client(client_connection& _c); + command_client(client_connection& _c, long long _command_timeout_usec=90000000, long long _hello_timeout_usec=30000000); void send_command(command* cmd, result_container &res); }; diff --git a/src/command_server.cpp b/src/command_server.cpp index 2aa4fc0..4f386ec 100644 --- a/src/command_server.cpp +++ b/src/command_server.cpp @@ -123,12 +123,12 @@ void command_server::handle_packet(const std::string& packet, server_connection* } /** @brief handle incoming commands - @param usec_timeout wait until new data is found, max timeout usecs. - -1: wait endless, 0: no timeout + @param[in,out] usec_timeout wait until new data is found, max timeout usecs. + -1: wait endless, 0: instant return */ -void command_server::handle(long long usec_timeout) +void command_server::handle(long long usec_timeout, long long* usec_timeout_remaining) { - if (s.fill_buffer(usec_timeout)) + if (s.fill_buffer(usec_timeout,usec_timeout_remaining)) { string packet; unsigned int conn_id; diff --git a/src/command_server.hxx b/src/command_server.hxx index cb41a76..1926733 100644 --- a/src/command_server.hxx +++ b/src/command_server.hxx @@ -35,7 +35,7 @@ class command_server public: command_server(server& _s); - void handle(long long usec_timeout=-1); + void handle(long long usec_timeout=-1, long long* usec_timeout_remaining=NULL); void send_hello(unsigned int conn_id); }; diff --git a/src/connection.hxx b/src/connection.hxx index 2343984..94c3123 100644 --- a/src/connection.hxx +++ b/src/connection.hxx @@ -64,11 +64,14 @@ class connection /** @brief look for new data and store it in the local buffer @param usec_timeout wait until new data is found, max timeout usecs. -1: wait endless - NULL: no timeout + 0: return instantly + @param usec_timeout_remaining if non-NULL the function will write the + not used time to the given target @retval true if new data was found (does not mean that the received data is a complete packet though) */ - virtual bool fill_buffer(long long usec_timeout=-1)=0; + virtual bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL)=0; + bool get_packet(std::string& data); /// returns true if a complete data packet is in the buffer. retrieve it with get_packet(). diff --git a/src/server.hxx b/src/server.hxx index 601329c..f3ace10 100644 --- a/src/server.hxx +++ b/src/server.hxx @@ -130,14 +130,16 @@ class server void add_callback(callback_event_type event, const boost::function& func); - /** @brief look for new data on all open connections, accept new connections + /** @brief look for new data and store it in the local buffer @param usec_timeout wait until new data is found, max timeout usecs. -1: wait endless - NULL: no timeout + 0: return instantly + @param usec_timeout_remaining if non-NULL the function will write the + not used time to the given target @retval true if new data was found (does not mean that the received data is a complete packet though) */ - virtual bool fill_buffer(long long usec_timeout=-1)=0; + virtual bool fill_buffer(long long usec_timeout=-1, long long* timeout_remaining=NULL)=0; void cleanup(); diff --git a/src/socket_client.hxx b/src/socket_client.hxx index a35bbfd..1d82400 100644 --- a/src/socket_client.hxx +++ b/src/socket_client.hxx @@ -54,13 +54,15 @@ class socket_client_connection : public client_connection, public socket_handler /** @brief read data from the socket and copy it into buffer @param usec_timeout wait until new data is found, max timeout usecs. - -1: wait endless - NULL: no timeout + -1: wait endless + 0: return instantly + @param usec_timeout_remaining if non-NULL the function will write the + not used time to the given target @retval true if new data was found (does not mean that the received data is a complete packet though) */ - bool fill_buffer(long long usec_timeout=-1) - { return socket_handler::fill_buffer(buffer,usec_timeout); } + bool fill_buffer(long long usec_timeout=-1, long long *usec_timeout_remaining=NULL) + { return socket_handler::fill_buffer(buffer,usec_timeout,usec_timeout_remaining); } void close(); }; diff --git a/src/socket_handler.cpp b/src/socket_handler.cpp index 11acc81..ed37e4d 100644 --- a/src/socket_handler.cpp +++ b/src/socket_handler.cpp @@ -97,11 +97,11 @@ bool socket_handler::is_closed() } /** @brief check if new data is waiting on the raw socket - @param usec_timeout wait until new data is found, max timeout usecs. + @param[in,out] usec_timeout wait until new data is found, max timeout usecs. -1: wait endless - NULL: no timeout + 0: return instantly */ -bool socket_handler::data_waiting(long long usec_timeout) +bool socket_handler::data_waiting(long long usec_timeout,long long* usec_timeout_remaining) { // just our socket fd_set active_fd_set; @@ -118,12 +118,18 @@ bool socket_handler::data_waiting(long long usec_timeout) { timeout_ptr = &tval; - // timeout von long long usec in int sec + int usec umrechnen + // convert timeout from long long usec to int sec + int usec tval.tv_sec = usec_timeout / 1000000; tval.tv_usec = usec_timeout % 1000000; } - if(select (FD_SETSIZE, &active_fd_set, NULL, NULL, timeout_ptr) > 0) + int ret=select (FD_SETSIZE, &active_fd_set, NULL, NULL, timeout_ptr); + + // return the timeout we did not use + if (usec_timeout > 0 && usec_timeout_remaining != NULL) + *usec_timeout_remaining=(tval.tv_sec*1000000)+tval.tv_usec; + + if (ret > 0) return true; else return false; @@ -131,14 +137,14 @@ bool socket_handler::data_waiting(long long usec_timeout) /** @brief read data from the raw socket and copy it into the provided buffer @param buffer the buffer where to append the new data - @param usec_timeout wait until new data is found, max timeout usecs. + @param[in,out] usec_timeout wait until new data is found, max timeout usecs. -1: wait endless - NULL: no timeout + 0: return instantly */ -bool socket_handler::fill_buffer(std::string& buffer, long long usec_timeout) +bool socket_handler::fill_buffer(std::string& buffer, long long usec_timeout, long long *timeout_remaining) { // fast path for timeout==0 - if (usec_timeout==0 || data_waiting(usec_timeout)) + if (usec_timeout==0 || data_waiting(usec_timeout,timeout_remaining)) return fill_buffer(buffer); else return false; diff --git a/src/socket_handler.hxx b/src/socket_handler.hxx index 32fbbf4..82ccdad 100644 --- a/src/socket_handler.hxx +++ b/src/socket_handler.hxx @@ -37,7 +37,7 @@ class socket_handler socket_type_value socket_type; - bool data_waiting(long long usec_timeout=-1); + bool data_waiting(long long usec_timeout,long long *timeout_remaining=NULL); protected: int sock; @@ -54,7 +54,7 @@ class socket_handler virtual void close(); - bool fill_buffer(std::string& buffer, long long usec_timeout); + bool fill_buffer(std::string& buffer, long long usec_timeout, long long*timeout_remaining=NULL); bool fill_buffer(std::string& buffer); public: diff --git a/src/socket_server.cpp b/src/socket_server.cpp index 38c2ea9..0eff676 100644 --- a/src/socket_server.cpp +++ b/src/socket_server.cpp @@ -172,7 +172,16 @@ void socket_server::new_connection() return; } -bool socket_server::fill_buffer(long long usec_timeout) +/** @brief look for new connections and new data in any of the existing connections + @param usec_timeout wait until new data is found, max timeout usecs. + -1: wait endless + 0: return instantly + @param usec_timeout_remaining if non-NULL the function will write the + not used time to the given target + @retval true if new data was found (does not mean that the received data + is a complete packet though) +*/ +bool socket_server::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining) { fd_set used_fdset=connection_set; @@ -193,6 +202,10 @@ bool socket_server::fill_buffer(long long usec_timeout) int ret=select (FD_SETSIZE, &used_fdset, NULL, NULL, timeout_ptr); + // return the timeout we did not use + if (usec_timeout > 0 && usec_timeout_remaining != NULL) + *usec_timeout_remaining=(tval.tv_sec*1000000)+tval.tv_usec; + if (ret < 0) { if (errno == EINTR) diff --git a/src/socket_server.hxx b/src/socket_server.hxx index 6ebe983..4c928c5 100644 --- a/src/socket_server.hxx +++ b/src/socket_server.hxx @@ -63,7 +63,7 @@ class socket_server : public socket_handler, public server ~socket_server(); - bool fill_buffer(long long usec_timeout=-1); + bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL); }; /** @brief Socket based connection @@ -86,8 +86,8 @@ class socket_server_connection : public socket_handler, public server_connection { socket_write(data); } public: - bool fill_buffer(long long usec_timeout=-1) - { return socket_handler::fill_buffer(buffer,usec_timeout); } + bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL) + { return socket_handler::fill_buffer(buffer,usec_timeout,usec_timeout_remaining); } void close(); }; diff --git a/test/Makefile.am b/test/Makefile.am index 85b34b2..ff0e4a6 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -4,6 +4,7 @@ check_PROGRAMS = test test_LDADD = $(top_builddir)/src/libt2n.la @BOOST_SERIALIZATION_LIB@ \ @BOOST_LDFLAGS@ @CPPUNIT_LIBS@ -test_SOURCES = test.cpp comm.cpp simplecmd.cpp callback.cpp hello.cpp +test_SOURCES = test.cpp comm.cpp simplecmd.cpp callback.cpp hello.cpp \ + timeout.cpp TESTS = test diff --git a/test/timeout.cpp b/test/timeout.cpp new file mode 100644 index 0000000..0493d62 --- /dev/null +++ b/test/timeout.cpp @@ -0,0 +1,411 @@ +/*************************************************************************** + * Copyright (C) 2004 by Intra2net AG * + * info@intra2net.com * + * * + ***************************************************************************/ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#ifdef HAVE_CONFIG_H +#include +#endif + +using namespace std; +using namespace libt2n; +using namespace CppUnit; + +string testfunc2(const string& str) +{ + if (str=="throw") + throw libt2n::t2n_runtime_error("throw me around"); + string ret(str); + ret+=", testfunc() was here"; + return ret; +} + +class testfunc2_res : public libt2n::result +{ + private: + string res; + + friend class boost::serialization::access; + template + void serialize(Archive & ar, const unsigned int version) + { + ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(libt2n::result); + ar & BOOST_SERIALIZATION_NVP(res); + } + + public: + testfunc2_res() + { } + + testfunc2_res(const string& str) + { + res=str; + } + + string get_data() + { + return res; + } +}; + + +class testfunc2_cmd : public libt2n::command +{ + private: + string param; + + friend class boost::serialization::access; + template + void serialize(Archive & ar, const unsigned int version) + { + ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(libt2n::command); + ar & BOOST_SERIALIZATION_NVP(param); + } + + public: + testfunc2_cmd() + { } + + testfunc2_cmd(const string& str) + { + param=str; + } + + libt2n::result* operator()() + { + return new testfunc2_res(testfunc2(param)); + } +}; + +#include + +BOOST_CLASS_EXPORT(testfunc2_cmd) +BOOST_CLASS_EXPORT(testfunc2_res) + +// this is an evil hack to get access to real_write, don't ever do this in an app!!! +class real_write_connection: public socket_server_connection +{ + public: + void real_write(const std::string& data) + { socket_write(data); } +}; + +class test_timeout : public TestFixture +{ + CPPUNIT_TEST_SUITE(test_timeout); + + CPPUNIT_TEST(HelloTimeoutNothing); + CPPUNIT_TEST(HelloTimeoutSlowData); + CPPUNIT_TEST(CommandTimeout); + CPPUNIT_TEST(CommandSlowResponse); + + CPPUNIT_TEST_SUITE_END(); + + public: + + typedef uint32_t packet_size_indicator; + + void setUp() + { } + + void tearDown() + { } + + void send_hello(string hello_string, socket_server* ss, unsigned int conn_id) + { + server_connection *sc=ss->get_connection(conn_id); + sc->write(hello_string); + } + + void send_slow_raw_socket(string data, socket_server* ss, unsigned int conn_id) + { + socket_server_connection *ssc=dynamic_cast(ss->get_connection(conn_id)); + + // this is an evil hack to get access to real_write, don't ever do this in an app!!! + real_write_connection *rwc=(real_write_connection*)ssc; + + // we write one char each 0.2 sec + for (int pos=0; pos < data.size(); pos++) + { + string onebyte; + onebyte+=data[pos]; + rwc->real_write(onebyte); + usleep(200000); + } + } + + void HelloTimeoutNothing() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + socket_server ss("./socket"); + + // max 10 sec + for (int i=0; i < 10; i++) + ss.fill_buffer(1000000); + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + string data; + + // wait till server is up + sleep(1); + socket_client_connection sc("./socket"); + + string errormsg; + + try + { + command_client cc(sc,1000000,1000000); + } + catch(t2n_transfer_error &e) + { errormsg=e.what(); } + catch(...) + { throw; } + + CPPUNIT_ASSERT_EQUAL(string("timeout exceeded"),errormsg); + } + } + } + + void HelloTimeoutSlowData() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + socket_server ss("./socket"); + + // create a valid packet + ostringstream hello; + hello << "T2Nv" << PROTOCOL_VERSION << ';'; + int byteordercheck=1; + hello.write((char*)&byteordercheck,sizeof(byteordercheck)); + hello << ';'; + + packet_size_indicator psize=htonl(hello.str().size()); + std::string send_data(hello.str()); + send_data.insert(0,(char*)&psize,sizeof(packet_size_indicator)); + + ss.add_callback(new_connection,bind(&test_timeout::send_slow_raw_socket, boost::ref(*this), send_data,&ss, _1)); + + // max 10 sec + for (int i=0; i < 10; i++) + ss.fill_buffer(1000000); + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + string data; + + // wait till server is up + sleep(1); + socket_client_connection sc("./socket"); + + string errormsg; + + try + { + command_client cc(sc,1000000,1000000); + } + catch(t2n_transfer_error &e) + { errormsg=e.what(); } + catch(...) + { throw; } + + CPPUNIT_ASSERT_EQUAL(string("timeout exceeded"),errormsg); + } + } + } + + void CommandTimeout() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + socket_server ss("./socket"); + + ostringstream hello; + hello << "T2Nv" << PROTOCOL_VERSION << ';'; + int byteordercheck=1; + hello.write((char*)&byteordercheck,sizeof(byteordercheck)); + hello << ';'; + + ss.add_callback(new_connection,bind(&test_timeout::send_hello, boost::ref(*this), hello.str(),&ss, _1)); + + // max 10 sec + for (int i=0; i < 10; i++) + ss.fill_buffer(1000000); + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + string data; + + // wait till server is up + sleep(1); + socket_client_connection sc("./socket"); + + command_client cc(sc,1000000,1000000); + result_container rc; + + string errormsg; + + try + { + cc.send_command(new testfunc2_cmd("hello"),rc); + } + catch(t2n_transfer_error &e) + { errormsg=e.what(); } + catch(...) + { throw; } + + CPPUNIT_ASSERT_EQUAL(string("timeout exceeded"),errormsg); + } + } + } + + void CommandSlowResponse() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + socket_server ss("./socket"); + + ostringstream hello; + hello << "T2Nv" << PROTOCOL_VERSION << ';'; + int byteordercheck=1; + hello.write((char*)&byteordercheck,sizeof(byteordercheck)); + hello << ';'; + + ss.add_callback(new_connection,bind(&test_timeout::send_hello, boost::ref(*this), hello.str(),&ss, _1)); + + // max 10 sec + for (int i=0; i < 10; i++) + { + ss.fill_buffer(1000000); + + string data; + unsigned int cid; + + if(ss.get_packet(data,cid)) + { + // create a valid packet & send + string response="abcdefghijklmnopqrstuvwxyz"; + packet_size_indicator psize=htonl(response.size()); + std::string send_data(response); + send_data.insert(0,(char*)&psize,sizeof(packet_size_indicator)); + send_slow_raw_socket(send_data,&ss,cid); + } + } + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + string data; + + // wait till server is up + sleep(1); + socket_client_connection sc("./socket"); + + command_client cc(sc,1000000,1000000); + result_container rc; + + string errormsg; + + try + { + cc.send_command(new testfunc2_cmd("hello"),rc); + } + catch(t2n_transfer_error &e) + { errormsg=e.what(); } + catch(...) + { throw; } + + CPPUNIT_ASSERT_EQUAL(string("timeout exceeded"),errormsg); + } + } + } + +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(test_timeout); -- 1.7.1