namespace libt2n
{
-command_client::command_client(client_connection& _c)
+command_client::command_client(client_connection& _c, long long _command_timeout_usec, long long _hello_timeout_usec)
: c(_c)
{
+ command_timeout_usec=_command_timeout_usec;
+ hello_timeout_usec=_hello_timeout_usec;
+
// for reconnects
c.add_callback(new_connection,bind(&command_client::read_hello, boost::ref(*this)));
read_hello();
}
-void command_client::read_hello()
+std::string command_client::read_packet(const long long &usec_timeout)
{
- // TODO: fix timeout
string resultpacket;
- while(!c.get_packet(resultpacket) && !c.is_closed())
- c.fill_buffer();
+ bool got_packet=false;
+ long long my_timeout=usec_timeout;
+ while(!(got_packet=c.get_packet(resultpacket)) && my_timeout > 0 && !c.is_closed())
+ c.fill_buffer(my_timeout,&my_timeout);
- istringstream hello(resultpacket);
+ if (!got_packet)
+ throw t2n_transfer_error("timeout exceeded");
+
+ return resultpacket;
+}
+
+void command_client::read_hello()
+{
+ istringstream hello(read_packet(hello_timeout_usec));
char chk[5];
hello.read(chk,4);
command_container cc(cmd);
boost::archive::binary_oarchive oa(ofs);
- // TODO: exceptions
- oa << cc;
+ try
+ {
+ oa << cc;
+ }
+ catch(boost::archive::archive_exception &e)
+ {
+ ostringstream msg;
+ msg << "archive_exception while serializing on client-side, code " << e.code << " (" << e.what() << ")";
+ throw t2n_serialization_error(msg.str());
+ }
+ catch(...)
+ { throw; }
std::ostream* ostr;
if ((ostr=c.get_logstream(fulldebug))!=NULL)
c.write(ofs.str());
- // TODO: fix timeout
- string resultpacket;
- while(!c.get_packet(resultpacket))
- c.fill_buffer();
-
- istringstream ifs(resultpacket);
+ istringstream ifs(read_packet(command_timeout_usec));
boost::archive::binary_iarchive ia(ifs);
- // TODO: exceptions
- ia >> res;
+ try
+ {
+ ia >> res;
+ }
+ catch(boost::archive::archive_exception &e)
+ {
+ ostringstream msg;
+ msg << "archive_exception while deserializing on client-side, code " << e.code << " (" << e.what() << ")";
+ throw t2n_serialization_error(msg.str());
+ }
+ catch(...)
+ { throw; }
if ((ostr=c.get_logstream(fulldebug))!=NULL)
{
private:
client_connection &c;
+ long long hello_timeout_usec;
+ long long command_timeout_usec;
+
void read_hello();
+ std::string read_packet(const long long &usec_timeout);
public:
- command_client(client_connection& _c);
+ command_client(client_connection& _c, long long _command_timeout_usec=90000000, long long _hello_timeout_usec=30000000);
void send_command(command* cmd, result_container &res);
};
}
/** @brief handle incoming commands
- @param usec_timeout wait until new data is found, max timeout usecs.
- -1: wait endless, 0: no timeout
+ @param[in,out] usec_timeout wait until new data is found, max timeout usecs.
+ -1: wait endless, 0: instant return
*/
-void command_server::handle(long long usec_timeout)
+void command_server::handle(long long usec_timeout, long long* usec_timeout_remaining)
{
- if (s.fill_buffer(usec_timeout))
+ if (s.fill_buffer(usec_timeout,usec_timeout_remaining))
{
string packet;
unsigned int conn_id;
public:
command_server(server& _s);
- void handle(long long usec_timeout=-1);
+ void handle(long long usec_timeout=-1, long long* usec_timeout_remaining=NULL);
void send_hello(unsigned int conn_id);
};
/** @brief look for new data and store it in the local buffer
@param usec_timeout wait until new data is found, max timeout usecs.
-1: wait endless
- NULL: no timeout
+ 0: return instantly
+ @param usec_timeout_remaining if non-NULL the function will write the
+ not used time to the given target
@retval true if new data was found (does not mean that the received data
is a complete packet though)
*/
- virtual bool fill_buffer(long long usec_timeout=-1)=0;
+ virtual bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL)=0;
+
bool get_packet(std::string& data);
/// returns true if a complete data packet is in the buffer. retrieve it with get_packet().
void add_callback(callback_event_type event, const boost::function<void (unsigned int)>& func);
- /** @brief look for new data on all open connections, accept new connections
+ /** @brief look for new data and store it in the local buffer
@param usec_timeout wait until new data is found, max timeout usecs.
-1: wait endless
- NULL: no timeout
+ 0: return instantly
+ @param usec_timeout_remaining if non-NULL the function will write the
+ not used time to the given target
@retval true if new data was found (does not mean that the received data
is a complete packet though)
*/
- virtual bool fill_buffer(long long usec_timeout=-1)=0;
+ virtual bool fill_buffer(long long usec_timeout=-1, long long* timeout_remaining=NULL)=0;
void cleanup();
/** @brief read data from the socket and copy it into buffer
@param usec_timeout wait until new data is found, max timeout usecs.
- -1: wait endless
- NULL: no timeout
+ -1: wait endless
+ 0: return instantly
+ @param usec_timeout_remaining if non-NULL the function will write the
+ not used time to the given target
@retval true if new data was found (does not mean that the received data
is a complete packet though)
*/
- bool fill_buffer(long long usec_timeout=-1)
- { return socket_handler::fill_buffer(buffer,usec_timeout); }
+ bool fill_buffer(long long usec_timeout=-1, long long *usec_timeout_remaining=NULL)
+ { return socket_handler::fill_buffer(buffer,usec_timeout,usec_timeout_remaining); }
void close();
};
}
/** @brief check if new data is waiting on the raw socket
- @param usec_timeout wait until new data is found, max timeout usecs.
+ @param[in,out] usec_timeout wait until new data is found, max timeout usecs.
-1: wait endless
- NULL: no timeout
+ 0: return instantly
*/
-bool socket_handler::data_waiting(long long usec_timeout)
+bool socket_handler::data_waiting(long long usec_timeout,long long* usec_timeout_remaining)
{
// just our socket
fd_set active_fd_set;
{
timeout_ptr = &tval;
- // timeout von long long usec in int sec + int usec umrechnen
+ // convert timeout from long long usec to int sec + int usec
tval.tv_sec = usec_timeout / 1000000;
tval.tv_usec = usec_timeout % 1000000;
}
- if(select (FD_SETSIZE, &active_fd_set, NULL, NULL, timeout_ptr) > 0)
+ int ret=select (FD_SETSIZE, &active_fd_set, NULL, NULL, timeout_ptr);
+
+ // return the timeout we did not use
+ if (usec_timeout > 0 && usec_timeout_remaining != NULL)
+ *usec_timeout_remaining=(tval.tv_sec*1000000)+tval.tv_usec;
+
+ if (ret > 0)
return true;
else
return false;
/** @brief read data from the raw socket and copy it into the provided buffer
@param buffer the buffer where to append the new data
- @param usec_timeout wait until new data is found, max timeout usecs.
+ @param[in,out] usec_timeout wait until new data is found, max timeout usecs.
-1: wait endless
- NULL: no timeout
+ 0: return instantly
*/
-bool socket_handler::fill_buffer(std::string& buffer, long long usec_timeout)
+bool socket_handler::fill_buffer(std::string& buffer, long long usec_timeout, long long *timeout_remaining)
{
// fast path for timeout==0
- if (usec_timeout==0 || data_waiting(usec_timeout))
+ if (usec_timeout==0 || data_waiting(usec_timeout,timeout_remaining))
return fill_buffer(buffer);
else
return false;
socket_type_value socket_type;
- bool data_waiting(long long usec_timeout=-1);
+ bool data_waiting(long long usec_timeout,long long *timeout_remaining=NULL);
protected:
int sock;
virtual void close();
- bool fill_buffer(std::string& buffer, long long usec_timeout);
+ bool fill_buffer(std::string& buffer, long long usec_timeout, long long*timeout_remaining=NULL);
bool fill_buffer(std::string& buffer);
public:
return;
}
-bool socket_server::fill_buffer(long long usec_timeout)
+/** @brief look for new connections and new data in any of the existing connections
+ @param usec_timeout wait until new data is found, max timeout usecs.
+ -1: wait endless
+ 0: return instantly
+ @param usec_timeout_remaining if non-NULL the function will write the
+ not used time to the given target
+ @retval true if new data was found (does not mean that the received data
+ is a complete packet though)
+*/
+bool socket_server::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining)
{
fd_set used_fdset=connection_set;
int ret=select (FD_SETSIZE, &used_fdset, NULL, NULL, timeout_ptr);
+ // return the timeout we did not use
+ if (usec_timeout > 0 && usec_timeout_remaining != NULL)
+ *usec_timeout_remaining=(tval.tv_sec*1000000)+tval.tv_usec;
+
if (ret < 0)
{
if (errno == EINTR)
~socket_server();
- bool fill_buffer(long long usec_timeout=-1);
+ bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL);
};
/** @brief Socket based connection
{ socket_write(data); }
public:
- bool fill_buffer(long long usec_timeout=-1)
- { return socket_handler::fill_buffer(buffer,usec_timeout); }
+ bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL)
+ { return socket_handler::fill_buffer(buffer,usec_timeout,usec_timeout_remaining); }
void close();
};
test_LDADD = $(top_builddir)/src/libt2n.la @BOOST_SERIALIZATION_LIB@ \
@BOOST_LDFLAGS@ @CPPUNIT_LIBS@
-test_SOURCES = test.cpp comm.cpp simplecmd.cpp callback.cpp hello.cpp
+test_SOURCES = test.cpp comm.cpp simplecmd.cpp callback.cpp hello.cpp \
+ timeout.cpp
TESTS = test
--- /dev/null
+/***************************************************************************
+ * Copyright (C) 2004 by Intra2net AG *
+ * info@intra2net.com *
+ * *
+ ***************************************************************************/
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <errno.h>
+#include <signal.h>
+#include <stdio.h>
+
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <stdexcept>
+
+#include <boost/bind.hpp>
+
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <cppunit/ui/text/TestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <boost/archive/binary_oarchive.hpp>
+#include <boost/archive/binary_iarchive.hpp>
+#include <boost/archive/xml_oarchive.hpp>
+#include <boost/archive/xml_iarchive.hpp>
+#include <boost/serialization/serialization.hpp>
+
+#include <container.hxx>
+#include <socket_client.hxx>
+#include <socket_server.hxx>
+#include <command_client.hxx>
+#include <command_server.hxx>
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+using namespace std;
+using namespace libt2n;
+using namespace CppUnit;
+
+string testfunc2(const string& str)
+{
+ if (str=="throw")
+ throw libt2n::t2n_runtime_error("throw me around");
+ string ret(str);
+ ret+=", testfunc() was here";
+ return ret;
+}
+
+class testfunc2_res : public libt2n::result
+{
+ private:
+ string res;
+
+ friend class boost::serialization::access;
+ template<class Archive>
+ void serialize(Archive & ar, const unsigned int version)
+ {
+ ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(libt2n::result);
+ ar & BOOST_SERIALIZATION_NVP(res);
+ }
+
+ public:
+ testfunc2_res()
+ { }
+
+ testfunc2_res(const string& str)
+ {
+ res=str;
+ }
+
+ string get_data()
+ {
+ return res;
+ }
+};
+
+
+class testfunc2_cmd : public libt2n::command
+{
+ private:
+ string param;
+
+ friend class boost::serialization::access;
+ template<class Archive>
+ void serialize(Archive & ar, const unsigned int version)
+ {
+ ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(libt2n::command);
+ ar & BOOST_SERIALIZATION_NVP(param);
+ }
+
+ public:
+ testfunc2_cmd()
+ { }
+
+ testfunc2_cmd(const string& str)
+ {
+ param=str;
+ }
+
+ libt2n::result* operator()()
+ {
+ return new testfunc2_res(testfunc2(param));
+ }
+};
+
+#include <boost/serialization/export.hpp>
+
+BOOST_CLASS_EXPORT(testfunc2_cmd)
+BOOST_CLASS_EXPORT(testfunc2_res)
+
+// this is an evil hack to get access to real_write, don't ever do this in an app!!!
+class real_write_connection: public socket_server_connection
+{
+ public:
+ void real_write(const std::string& data)
+ { socket_write(data); }
+};
+
+class test_timeout : public TestFixture
+{
+ CPPUNIT_TEST_SUITE(test_timeout);
+
+ CPPUNIT_TEST(HelloTimeoutNothing);
+ CPPUNIT_TEST(HelloTimeoutSlowData);
+ CPPUNIT_TEST(CommandTimeout);
+ CPPUNIT_TEST(CommandSlowResponse);
+
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ typedef uint32_t packet_size_indicator;
+
+ void setUp()
+ { }
+
+ void tearDown()
+ { }
+
+ void send_hello(string hello_string, socket_server* ss, unsigned int conn_id)
+ {
+ server_connection *sc=ss->get_connection(conn_id);
+ sc->write(hello_string);
+ }
+
+ void send_slow_raw_socket(string data, socket_server* ss, unsigned int conn_id)
+ {
+ socket_server_connection *ssc=dynamic_cast<socket_server_connection*>(ss->get_connection(conn_id));
+
+ // this is an evil hack to get access to real_write, don't ever do this in an app!!!
+ real_write_connection *rwc=(real_write_connection*)ssc;
+
+ // we write one char each 0.2 sec
+ for (int pos=0; pos < data.size(); pos++)
+ {
+ string onebyte;
+ onebyte+=data[pos];
+ rwc->real_write(onebyte);
+ usleep(200000);
+ }
+ }
+
+ void HelloTimeoutNothing()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ socket_server ss("./socket");
+
+ // max 10 sec
+ for (int i=0; i < 10; i++)
+ ss.fill_buffer(1000000);
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ string data;
+
+ // wait till server is up
+ sleep(1);
+ socket_client_connection sc("./socket");
+
+ string errormsg;
+
+ try
+ {
+ command_client cc(sc,1000000,1000000);
+ }
+ catch(t2n_transfer_error &e)
+ { errormsg=e.what(); }
+ catch(...)
+ { throw; }
+
+ CPPUNIT_ASSERT_EQUAL(string("timeout exceeded"),errormsg);
+ }
+ }
+ }
+
+ void HelloTimeoutSlowData()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ socket_server ss("./socket");
+
+ // create a valid packet
+ ostringstream hello;
+ hello << "T2Nv" << PROTOCOL_VERSION << ';';
+ int byteordercheck=1;
+ hello.write((char*)&byteordercheck,sizeof(byteordercheck));
+ hello << ';';
+
+ packet_size_indicator psize=htonl(hello.str().size());
+ std::string send_data(hello.str());
+ send_data.insert(0,(char*)&psize,sizeof(packet_size_indicator));
+
+ ss.add_callback(new_connection,bind(&test_timeout::send_slow_raw_socket, boost::ref(*this), send_data,&ss, _1));
+
+ // max 10 sec
+ for (int i=0; i < 10; i++)
+ ss.fill_buffer(1000000);
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ string data;
+
+ // wait till server is up
+ sleep(1);
+ socket_client_connection sc("./socket");
+
+ string errormsg;
+
+ try
+ {
+ command_client cc(sc,1000000,1000000);
+ }
+ catch(t2n_transfer_error &e)
+ { errormsg=e.what(); }
+ catch(...)
+ { throw; }
+
+ CPPUNIT_ASSERT_EQUAL(string("timeout exceeded"),errormsg);
+ }
+ }
+ }
+
+ void CommandTimeout()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ socket_server ss("./socket");
+
+ ostringstream hello;
+ hello << "T2Nv" << PROTOCOL_VERSION << ';';
+ int byteordercheck=1;
+ hello.write((char*)&byteordercheck,sizeof(byteordercheck));
+ hello << ';';
+
+ ss.add_callback(new_connection,bind(&test_timeout::send_hello, boost::ref(*this), hello.str(),&ss, _1));
+
+ // max 10 sec
+ for (int i=0; i < 10; i++)
+ ss.fill_buffer(1000000);
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ string data;
+
+ // wait till server is up
+ sleep(1);
+ socket_client_connection sc("./socket");
+
+ command_client cc(sc,1000000,1000000);
+ result_container rc;
+
+ string errormsg;
+
+ try
+ {
+ cc.send_command(new testfunc2_cmd("hello"),rc);
+ }
+ catch(t2n_transfer_error &e)
+ { errormsg=e.what(); }
+ catch(...)
+ { throw; }
+
+ CPPUNIT_ASSERT_EQUAL(string("timeout exceeded"),errormsg);
+ }
+ }
+ }
+
+ void CommandSlowResponse()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ socket_server ss("./socket");
+
+ ostringstream hello;
+ hello << "T2Nv" << PROTOCOL_VERSION << ';';
+ int byteordercheck=1;
+ hello.write((char*)&byteordercheck,sizeof(byteordercheck));
+ hello << ';';
+
+ ss.add_callback(new_connection,bind(&test_timeout::send_hello, boost::ref(*this), hello.str(),&ss, _1));
+
+ // max 10 sec
+ for (int i=0; i < 10; i++)
+ {
+ ss.fill_buffer(1000000);
+
+ string data;
+ unsigned int cid;
+
+ if(ss.get_packet(data,cid))
+ {
+ // create a valid packet & send
+ string response="abcdefghijklmnopqrstuvwxyz";
+ packet_size_indicator psize=htonl(response.size());
+ std::string send_data(response);
+ send_data.insert(0,(char*)&psize,sizeof(packet_size_indicator));
+ send_slow_raw_socket(send_data,&ss,cid);
+ }
+ }
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ string data;
+
+ // wait till server is up
+ sleep(1);
+ socket_client_connection sc("./socket");
+
+ command_client cc(sc,1000000,1000000);
+ result_container rc;
+
+ string errormsg;
+
+ try
+ {
+ cc.send_command(new testfunc2_cmd("hello"),rc);
+ }
+ catch(t2n_transfer_error &e)
+ { errormsg=e.what(); }
+ catch(...)
+ { throw; }
+
+ CPPUNIT_ASSERT_EQUAL(string("timeout exceeded"),errormsg);
+ }
+ }
+ }
+
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(test_timeout);