From: Gerd v. Egidy Date: Fri, 27 Oct 2006 23:41:59 +0000 (+0000) Subject: libt2n: (gerd) finish server callbacks X-Git-Tag: v0.2~137 X-Git-Url: http://developer.intra2net.com/git/?p=libt2n;a=commitdiff_plain;h=6cda58a6dad87ff6efe2277db2155be60edb8d48;hp=28cb45a5725e9c6054d7048a9bf969b9f2c94d64 libt2n: (gerd) finish server callbacks --- diff --git a/src/command_server.cpp b/src/command_server.cpp index e7d6d4d..2a4a7b4 100644 --- a/src/command_server.cpp +++ b/src/command_server.cpp @@ -46,9 +46,9 @@ command_server::command_server(server& _s) s.add_callback(new_connection,bind(&command_server::new_connection_callback, boost::ref(*this), _1)); } -void command_server::new_connection_callback(server_connection* conn) +void command_server::new_connection_callback(unsigned int conn_id) { - cerr << "new connection callback: " << conn->get_id() << endl; + cerr << "new connection callback: " << conn_id << endl; } /// handle a command including deserialization and answering diff --git a/src/command_server.hxx b/src/command_server.hxx index 2f3df5a..686be14 100644 --- a/src/command_server.hxx +++ b/src/command_server.hxx @@ -37,7 +37,7 @@ class command_server void handle(long long usec_timeout=-1); - void new_connection_callback(server_connection* conn); + void new_connection_callback(unsigned int conn_id); }; } diff --git a/src/server.cpp b/src/server.cpp index b1f7962..6bc1cba 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -19,14 +19,17 @@ #include +#include + #include "server.hxx" #include "log.hxx" +#include "t2n_exception.hxx" namespace libt2n { server_connection::server_connection(int _timeout) - : connection() + : connection(), callbacks(__events_end) { set_timeout(_timeout); reset_timeout(); @@ -34,6 +37,22 @@ server_connection::server_connection(int _timeout) my_server=0; } +server_connection::~server_connection() +{ + // we want the connection_closed callbacks to be called before + close(); + + do_callbacks(connection_deleted); +} + +void server_connection::close() +{ + if (!is_closed()) + { + connection::close(); + do_callbacks(connection_closed); + } +} /// get pointer to logging stream, returns NULL if no logging needed std::ostream* server_connection::get_logstream(log_level_values level) @@ -65,6 +84,30 @@ void server_connection::reset_timeout() last_action_time=time(NULL); } +/** @brief add a callback + + @param event event the function will be called at + @param func functor (see boost function) that will be called + + @example use boost::bind to bind to member functions and parameters like this: + 17 is a fixed parameter that is always added to the call + c.add_callback(connection_closed,bind(&my_class::func_to_call_back, boost::ref(*this), 17)); +*/ +void server_connection::add_callback(callback_event_type event, const boost::function& func) +{ + if (event == new_connection) + throw t2n_parameter_error("new_connection callback not allowed for server_connections"); + + callbacks[event].push_back(func); +} + +void server_connection::do_callbacks(callback_event_type event) +{ + std::list >::iterator i,ie=callbacks[event].end(); + for (i=callbacks[event].begin(); i != ie; i++) + (*i)(); +} + server::server() : callbacks(__events_end) { @@ -84,17 +127,28 @@ server::~server() @param event event the function will be called at @param func functor (see boost function) that will be called + + @example use boost::bind to bind to member functions like this: + s.add_callback(new_connection,bind(&my_class::func_to_call_back, boost::ref(*this), _1)); */ -void server::add_callback(callback_event_type event, const boost::function& func) +void server::add_callback(callback_event_type event, const boost::function& func) { callbacks[event].push_back(func); + + // add callback to all existing connections + if (event != new_connection) + { + std::map::iterator ie=connections.end(); + for(std::map::iterator i=connections.begin(); i != ie; i++) + i->second->add_callback(event,bind(func, i->first)); + } } -void server::do_callbacks(callback_event_type event, server_connection* conn) +void server::do_callbacks(callback_event_type event, unsigned int conn_id) { - std::list >::iterator i,ie=callbacks[event].end(); + std::list >::iterator i,ie=callbacks[event].end(); for (i=callbacks[event].begin(); i != ie; i++) - (*i)(conn); + (*i)(conn_id); } int server::add_connection(server_connection* newconn) @@ -104,9 +158,17 @@ int server::add_connection(server_connection* newconn) newconn->set_server(this); connections[cid]=newconn; + // add all callbacks + for(int e=connection_closed; e != __events_end; e++) + { + std::list >::iterator i,ie=callbacks[e].end(); + for (i=callbacks[e].begin(); i != ie; i++) + newconn->add_callback(static_cast(e),bind(*i,cid)); + } + LOGSTREAM(debug,"new connection accepted, id: " << cid); - do_callbacks(new_connection,newconn); + do_callbacks(new_connection,cid); return cid; } diff --git a/src/server.hxx b/src/server.hxx index aca705b..601329c 100644 --- a/src/server.hxx +++ b/src/server.hxx @@ -56,6 +56,9 @@ class server_connection : public connection void set_id(unsigned int _connection_id) { connection_id=_connection_id; } + /// vector initialized for all callback-types, all elements in each list will be called + std::vector > > callbacks; + protected: server *my_server; @@ -63,7 +66,11 @@ class server_connection : public connection std::ostream* get_logstream(log_level_values level); + void do_callbacks(callback_event_type event); + public: + virtual ~server_connection(); + void check_timeout(); void reset_timeout(); void set_timeout(int _timeout) @@ -72,6 +79,10 @@ class server_connection : public connection /// get the id of this connection within the server object unsigned int get_id() { return connection_id; } + + void close(); + + void add_callback(callback_event_type event, const boost::function& func); }; /** @@ -87,7 +98,7 @@ class server std::ostream *logstream; /// vector initialized for all callback-types, all elements in each list will be called - std::vector > > callbacks; + std::vector > > callbacks; unsigned int next_id; @@ -100,7 +111,7 @@ class server int add_connection(server_connection* newconn); - void do_callbacks(callback_event_type event, server_connection* conn); + void do_callbacks(callback_event_type event, unsigned int conn_id); public: virtual ~server(); @@ -117,7 +128,7 @@ class server server_connection* get_connection(unsigned int conn_id); - void add_callback(callback_event_type event, const boost::function& func); + void add_callback(callback_event_type event, const boost::function& func); /** @brief look for new data on all open connections, accept new connections @param usec_timeout wait until new data is found, max timeout usecs. diff --git a/src/t2n_exception.cpp b/src/t2n_exception.cpp index 1aae5c4..0b90f87 100644 --- a/src/t2n_exception.cpp +++ b/src/t2n_exception.cpp @@ -33,6 +33,7 @@ BOOST_CLASS_EXPORT(libt2n::t2n_server_error) BOOST_CLASS_EXPORT(libt2n::t2n_transfer_error) BOOST_CLASS_EXPORT(libt2n::t2n_version_mismatch) BOOST_CLASS_EXPORT(libt2n::t2n_command_error) +BOOST_CLASS_EXPORT(libt2n::t2n_parameter_error) BOOST_CLASS_EXPORT(libt2n::t2n_serialization_error) BOOST_CLASS_EXPORT(libt2n::t2n_runtime_error) @@ -83,6 +84,12 @@ void t2n_command_error::serialize(Archive & ar, const unsigned int version) } template +void t2n_parameter_error::serialize(Archive & ar, const unsigned int version) +{ + ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(t2n_exception); +} + +template void t2n_serialization_error::serialize(Archive & ar, const unsigned int version) { ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(t2n_exception); diff --git a/src/t2n_exception.hxx b/src/t2n_exception.hxx index 4ce11aa..a494c86 100644 --- a/src/t2n_exception.hxx +++ b/src/t2n_exception.hxx @@ -206,6 +206,29 @@ class t2n_command_error : public t2n_exception { throw *this; } }; +/// illegal libt2n option set +class t2n_parameter_error : public t2n_exception +{ + private: + friend class boost::serialization::access; + template + void serialize(Archive & ar, const unsigned int version); + + public: + t2n_parameter_error(const std::string& _message) + : t2n_exception(_message) + { } + + t2n_parameter_error() + { } + + t2n_exception* clone() const + { return new t2n_parameter_error(*this); } + + void do_throw() + { throw *this; } +}; + /// error serializing or deserializing a libt2n command packet class t2n_serialization_error : public t2n_exception { diff --git a/src/types.hxx b/src/types.hxx index 2641485..fc2811e 100644 --- a/src/types.hxx +++ b/src/types.hxx @@ -30,7 +30,7 @@ enum socket_type_value { tcp_s, unix_s }; /// possible events for callback, __events_end must be the last event and must not be used /// for anything else than marking the end -enum callback_event_type { new_connection=0, connection_closed=1, connection_deleted=2, __events_end }; +enum callback_event_type { new_connection=0, connection_closed, connection_deleted, __events_end }; } diff --git a/test/Makefile.am b/test/Makefile.am index 989448a..6b3755c 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -4,6 +4,6 @@ check_PROGRAMS = test test_LDADD = $(top_builddir)/src/libt2n.la @BOOST_SERIALIZATION_LIB@ \ @BOOST_LDFLAGS@ @CPPUNIT_LIBS@ -test_SOURCES = test.cpp comm.cpp simplecmd.cpp +test_SOURCES = test.cpp comm.cpp simplecmd.cpp callback.cpp TESTS = test diff --git a/test/callback.cpp b/test/callback.cpp new file mode 100644 index 0000000..beb0c77 --- /dev/null +++ b/test/callback.cpp @@ -0,0 +1,337 @@ +/*************************************************************************** + * Copyright (C) 2004 by Intra2net AG * + * info@intra2net.com * + * * + ***************************************************************************/ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include +#include + +using namespace std; +using namespace libt2n; +using namespace CppUnit; + +class test_callback : public TestFixture +{ + CPPUNIT_TEST_SUITE(test_callback); + + CPPUNIT_TEST(NewConnCallback); + CPPUNIT_TEST(ConnClosedCallback); + CPPUNIT_TEST(ConnDeletedCallback); + CPPUNIT_TEST(CallbackOrder); + + CPPUNIT_TEST_SUITE_END(); + + std::vector callback_done; + + public: + + void setUp() + { + callback_done.resize(__events_end); + } + + void tearDown() + { + callback_done.clear(); + } + + void callback_func(callback_event_type ev, int conn_id) + { + // we don't care for the conn_id, we just mark the callback as done + if (callback_done[ev]) + throw runtime_error("callback already done for this event"); + + callback_done[ev]=true; + } + + void NewConnCallback() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + string data; + // wait till server is up + sleep(1); + + { + socket_client_connection sc("./socket"); + + sc.write("ABC"); + + // wait half a sec + sc.fill_buffer(500000); + sc.get_packet(data); + + // close the connection + } + + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + socket_server ss("./socket"); + + ss.add_callback(new_connection,bind(&test_callback::callback_func, boost::ref(*this), new_connection, _1)); + + // max 3 sec + for (int i=0; i < 3; i++) + { + ss.fill_buffer(1000000); + + string data; + unsigned int cid; + if(ss.get_packet(data,cid)) + { + server_connection* con=ss.get_connection(cid); + con->write("XYZ"); + } + } + CPPUNIT_ASSERT_EQUAL(true,static_cast(callback_done[new_connection])); + CPPUNIT_ASSERT_EQUAL(false,static_cast(callback_done[connection_closed])); + CPPUNIT_ASSERT_EQUAL(false,static_cast(callback_done[connection_deleted])); + } + } + } + + void ConnClosedCallback() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + string data; + // wait till server is up + sleep(1); + + { + socket_client_connection sc("./socket"); + + sc.write("ABC"); + + // wait half a sec + sc.fill_buffer(500000); + sc.get_packet(data); + + // close the connection + } + + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + socket_server ss("./socket"); + + ss.add_callback(connection_closed,bind(&test_callback::callback_func, boost::ref(*this), connection_closed, _1)); + + // max 3 sec + for (int i=0; i < 3; i++) + { + ss.fill_buffer(1000000); + + string data; + unsigned int cid; + if(ss.get_packet(data,cid)) + { + server_connection* con=ss.get_connection(cid); + con->write("XYZ"); + } + } + CPPUNIT_ASSERT_EQUAL(false,static_cast(callback_done[new_connection])); + CPPUNIT_ASSERT_EQUAL(true,static_cast(callback_done[connection_closed])); + CPPUNIT_ASSERT_EQUAL(false,static_cast(callback_done[connection_deleted])); + } + } + } + + void ConnDeletedCallback() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + string data; + // wait till server is up + sleep(1); + + { + socket_client_connection sc("./socket"); + + sc.write("ABC"); + + // wait half a sec + sc.fill_buffer(500000); + sc.get_packet(data); + + // close the connection + } + + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + socket_server ss("./socket"); + + ss.add_callback(connection_deleted,bind(&test_callback::callback_func, boost::ref(*this), connection_deleted, _1)); + + // max 3 sec + for (int i=0; i < 3; i++) + { + ss.fill_buffer(1000000); + + string data; + unsigned int cid; + if(ss.get_packet(data,cid)) + { + server_connection* con=ss.get_connection(cid); + con->write("XYZ"); + } + ss.cleanup(); + } + ss.cleanup(); + + CPPUNIT_ASSERT_EQUAL(false,static_cast(callback_done[new_connection])); + CPPUNIT_ASSERT_EQUAL(false,static_cast(callback_done[connection_closed])); + CPPUNIT_ASSERT_EQUAL(true,static_cast(callback_done[connection_deleted])); + } + } + } + + void CallbackOrder() + { + pid_t pid; + + switch(pid=fork()) + { + case -1: + { + CPPUNIT_FAIL("fork error"); + break; + } + case 0: + // child + { + string data; + // wait till server is up + sleep(1); + + { + socket_client_connection sc("./socket"); + + sc.write("1"); + + // wait half a sec + sc.fill_buffer(500000); + sc.get_packet(data); + + sc.write("2"); + + // close the connection + } + + // don't call atexit and stuff + _exit(0); + } + + default: + // parent + { + socket_server ss("./socket"); + + ss.add_callback(connection_closed,bind(&test_callback::callback_func, boost::ref(*this), connection_closed, _1)); + ss.add_callback(connection_deleted,bind(&test_callback::callback_func, boost::ref(*this), connection_deleted, _1)); + + bool got_1=false; + + for (int i=0; i < 4; i++) + { + ss.fill_buffer(500000); + + string data; + unsigned int cid; + if(!got_1 && ss.get_packet(data,cid)) + { + server_connection* con=ss.get_connection(cid); + con->write("XYZ"); + got_1=true; + // don't call get_packet anymore + } + ss.cleanup(); + } + ss.cleanup(); + + CPPUNIT_ASSERT_EQUAL(true,static_cast(callback_done[connection_closed])); + CPPUNIT_ASSERT_EQUAL(false,static_cast(callback_done[connection_deleted])); + + for (int i=0; i < 4; i++) + { + ss.fill_buffer(500000); + + string data; + unsigned int cid; + ss.get_packet(data,cid); + ss.cleanup(); + } + ss.cleanup(); + + CPPUNIT_ASSERT_EQUAL(true,static_cast(callback_done[connection_closed])); + CPPUNIT_ASSERT_EQUAL(true,static_cast(callback_done[connection_deleted])); + } + } + } + +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(test_callback);