s.add_callback(new_connection,bind(&command_server::new_connection_callback, boost::ref(*this), _1));
}
-void command_server::new_connection_callback(server_connection* conn)
+void command_server::new_connection_callback(unsigned int conn_id)
{
- cerr << "new connection callback: " << conn->get_id() << endl;
+ cerr << "new connection callback: " << conn_id << endl;
}
/// handle a command including deserialization and answering
void handle(long long usec_timeout=-1);
- void new_connection_callback(server_connection* conn);
+ void new_connection_callback(unsigned int conn_id);
};
}
#include <sstream>
+#include <boost/bind.hpp>
+
#include "server.hxx"
#include "log.hxx"
+#include "t2n_exception.hxx"
namespace libt2n
{
server_connection::server_connection(int _timeout)
- : connection()
+ : connection(), callbacks(__events_end)
{
set_timeout(_timeout);
reset_timeout();
my_server=0;
}
+server_connection::~server_connection()
+{
+ // we want the connection_closed callbacks to be called before
+ close();
+
+ do_callbacks(connection_deleted);
+}
+
+void server_connection::close()
+{
+ if (!is_closed())
+ {
+ connection::close();
+ do_callbacks(connection_closed);
+ }
+}
/// get pointer to logging stream, returns NULL if no logging needed
std::ostream* server_connection::get_logstream(log_level_values level)
last_action_time=time(NULL);
}
+/** @brief add a callback
+
+ @param event event the function will be called at
+ @param func functor (see boost function) that will be called
+
+ @example use boost::bind to bind to member functions and parameters like this:
+ 17 is a fixed parameter that is always added to the call
+ c.add_callback(connection_closed,bind(&my_class::func_to_call_back, boost::ref(*this), 17));
+*/
+void server_connection::add_callback(callback_event_type event, const boost::function<void ()>& func)
+{
+ if (event == new_connection)
+ throw t2n_parameter_error("new_connection callback not allowed for server_connections");
+
+ callbacks[event].push_back(func);
+}
+
+void server_connection::do_callbacks(callback_event_type event)
+{
+ std::list<boost::function<void ()> >::iterator i,ie=callbacks[event].end();
+ for (i=callbacks[event].begin(); i != ie; i++)
+ (*i)();
+}
+
server::server()
: callbacks(__events_end)
{
@param event event the function will be called at
@param func functor (see boost function) that will be called
+
+ @example use boost::bind to bind to member functions like this:
+ s.add_callback(new_connection,bind(&my_class::func_to_call_back, boost::ref(*this), _1));
*/
-void server::add_callback(callback_event_type event, const boost::function<void (server_connection*)>& func)
+void server::add_callback(callback_event_type event, const boost::function<void (unsigned int)>& func)
{
callbacks[event].push_back(func);
+
+ // add callback to all existing connections
+ if (event != new_connection)
+ {
+ std::map<unsigned int, server_connection*>::iterator ie=connections.end();
+ for(std::map<unsigned int, server_connection*>::iterator i=connections.begin(); i != ie; i++)
+ i->second->add_callback(event,bind(func, i->first));
+ }
}
-void server::do_callbacks(callback_event_type event, server_connection* conn)
+void server::do_callbacks(callback_event_type event, unsigned int conn_id)
{
- std::list<boost::function<void (server_connection*)> >::iterator i,ie=callbacks[event].end();
+ std::list<boost::function<void (unsigned int)> >::iterator i,ie=callbacks[event].end();
for (i=callbacks[event].begin(); i != ie; i++)
- (*i)(conn);
+ (*i)(conn_id);
}
int server::add_connection(server_connection* newconn)
newconn->set_server(this);
connections[cid]=newconn;
+ // add all callbacks
+ for(int e=connection_closed; e != __events_end; e++)
+ {
+ std::list<boost::function<void (unsigned int)> >::iterator i,ie=callbacks[e].end();
+ for (i=callbacks[e].begin(); i != ie; i++)
+ newconn->add_callback(static_cast<callback_event_type>(e),bind(*i,cid));
+ }
+
LOGSTREAM(debug,"new connection accepted, id: " << cid);
- do_callbacks(new_connection,newconn);
+ do_callbacks(new_connection,cid);
return cid;
}
void set_id(unsigned int _connection_id)
{ connection_id=_connection_id; }
+ /// vector initialized for all callback-types, all elements in each list will be called
+ std::vector<std::list<boost::function<void ()> > > callbacks;
+
protected:
server *my_server;
std::ostream* get_logstream(log_level_values level);
+ void do_callbacks(callback_event_type event);
+
public:
+ virtual ~server_connection();
+
void check_timeout();
void reset_timeout();
void set_timeout(int _timeout)
/// get the id of this connection within the server object
unsigned int get_id()
{ return connection_id; }
+
+ void close();
+
+ void add_callback(callback_event_type event, const boost::function<void ()>& func);
};
/**
std::ostream *logstream;
/// vector initialized for all callback-types, all elements in each list will be called
- std::vector<std::list<boost::function<void (server_connection*)> > > callbacks;
+ std::vector<std::list<boost::function<void (unsigned int)> > > callbacks;
unsigned int next_id;
int add_connection(server_connection* newconn);
- void do_callbacks(callback_event_type event, server_connection* conn);
+ void do_callbacks(callback_event_type event, unsigned int conn_id);
public:
virtual ~server();
server_connection* get_connection(unsigned int conn_id);
- void add_callback(callback_event_type event, const boost::function<void (server_connection*)>& func);
+ void add_callback(callback_event_type event, const boost::function<void (unsigned int)>& func);
/** @brief look for new data on all open connections, accept new connections
@param usec_timeout wait until new data is found, max timeout usecs.
BOOST_CLASS_EXPORT(libt2n::t2n_transfer_error)
BOOST_CLASS_EXPORT(libt2n::t2n_version_mismatch)
BOOST_CLASS_EXPORT(libt2n::t2n_command_error)
+BOOST_CLASS_EXPORT(libt2n::t2n_parameter_error)
BOOST_CLASS_EXPORT(libt2n::t2n_serialization_error)
BOOST_CLASS_EXPORT(libt2n::t2n_runtime_error)
}
template<class Archive>
+void t2n_parameter_error::serialize(Archive & ar, const unsigned int version)
+{
+ ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(t2n_exception);
+}
+
+template<class Archive>
void t2n_serialization_error::serialize(Archive & ar, const unsigned int version)
{
ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(t2n_exception);
{ throw *this; }
};
+/// illegal libt2n option set
+class t2n_parameter_error : public t2n_exception
+{
+ private:
+ friend class boost::serialization::access;
+ template<class Archive>
+ void serialize(Archive & ar, const unsigned int version);
+
+ public:
+ t2n_parameter_error(const std::string& _message)
+ : t2n_exception(_message)
+ { }
+
+ t2n_parameter_error()
+ { }
+
+ t2n_exception* clone() const
+ { return new t2n_parameter_error(*this); }
+
+ void do_throw()
+ { throw *this; }
+};
+
/// error serializing or deserializing a libt2n command packet
class t2n_serialization_error : public t2n_exception
{
/// possible events for callback, __events_end must be the last event and must not be used
/// for anything else than marking the end
-enum callback_event_type { new_connection=0, connection_closed=1, connection_deleted=2, __events_end };
+enum callback_event_type { new_connection=0, connection_closed, connection_deleted, __events_end };
}
test_LDADD = $(top_builddir)/src/libt2n.la @BOOST_SERIALIZATION_LIB@ \
@BOOST_LDFLAGS@ @CPPUNIT_LIBS@
-test_SOURCES = test.cpp comm.cpp simplecmd.cpp
+test_SOURCES = test.cpp comm.cpp simplecmd.cpp callback.cpp
TESTS = test
--- /dev/null
+/***************************************************************************
+ * Copyright (C) 2004 by Intra2net AG *
+ * info@intra2net.com *
+ * *
+ ***************************************************************************/
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <errno.h>
+#include <signal.h>
+#include <stdio.h>
+
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <stdexcept>
+#include <vector>
+
+#include <boost/bind.hpp>
+
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <cppunit/ui/text/TestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <socket_client.hxx>
+#include <socket_server.hxx>
+
+using namespace std;
+using namespace libt2n;
+using namespace CppUnit;
+
+class test_callback : public TestFixture
+{
+ CPPUNIT_TEST_SUITE(test_callback);
+
+ CPPUNIT_TEST(NewConnCallback);
+ CPPUNIT_TEST(ConnClosedCallback);
+ CPPUNIT_TEST(ConnDeletedCallback);
+ CPPUNIT_TEST(CallbackOrder);
+
+ CPPUNIT_TEST_SUITE_END();
+
+ std::vector<bool> callback_done;
+
+ public:
+
+ void setUp()
+ {
+ callback_done.resize(__events_end);
+ }
+
+ void tearDown()
+ {
+ callback_done.clear();
+ }
+
+ void callback_func(callback_event_type ev, int conn_id)
+ {
+ // we don't care for the conn_id, we just mark the callback as done
+ if (callback_done[ev])
+ throw runtime_error("callback already done for this event");
+
+ callback_done[ev]=true;
+ }
+
+ void NewConnCallback()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ string data;
+ // wait till server is up
+ sleep(1);
+
+ {
+ socket_client_connection sc("./socket");
+
+ sc.write("ABC");
+
+ // wait half a sec
+ sc.fill_buffer(500000);
+ sc.get_packet(data);
+
+ // close the connection
+ }
+
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ socket_server ss("./socket");
+
+ ss.add_callback(new_connection,bind(&test_callback::callback_func, boost::ref(*this), new_connection, _1));
+
+ // max 3 sec
+ for (int i=0; i < 3; i++)
+ {
+ ss.fill_buffer(1000000);
+
+ string data;
+ unsigned int cid;
+ if(ss.get_packet(data,cid))
+ {
+ server_connection* con=ss.get_connection(cid);
+ con->write("XYZ");
+ }
+ }
+ CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[new_connection]));
+ CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[connection_closed]));
+ CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[connection_deleted]));
+ }
+ }
+ }
+
+ void ConnClosedCallback()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ string data;
+ // wait till server is up
+ sleep(1);
+
+ {
+ socket_client_connection sc("./socket");
+
+ sc.write("ABC");
+
+ // wait half a sec
+ sc.fill_buffer(500000);
+ sc.get_packet(data);
+
+ // close the connection
+ }
+
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ socket_server ss("./socket");
+
+ ss.add_callback(connection_closed,bind(&test_callback::callback_func, boost::ref(*this), connection_closed, _1));
+
+ // max 3 sec
+ for (int i=0; i < 3; i++)
+ {
+ ss.fill_buffer(1000000);
+
+ string data;
+ unsigned int cid;
+ if(ss.get_packet(data,cid))
+ {
+ server_connection* con=ss.get_connection(cid);
+ con->write("XYZ");
+ }
+ }
+ CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[new_connection]));
+ CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[connection_closed]));
+ CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[connection_deleted]));
+ }
+ }
+ }
+
+ void ConnDeletedCallback()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ string data;
+ // wait till server is up
+ sleep(1);
+
+ {
+ socket_client_connection sc("./socket");
+
+ sc.write("ABC");
+
+ // wait half a sec
+ sc.fill_buffer(500000);
+ sc.get_packet(data);
+
+ // close the connection
+ }
+
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ socket_server ss("./socket");
+
+ ss.add_callback(connection_deleted,bind(&test_callback::callback_func, boost::ref(*this), connection_deleted, _1));
+
+ // max 3 sec
+ for (int i=0; i < 3; i++)
+ {
+ ss.fill_buffer(1000000);
+
+ string data;
+ unsigned int cid;
+ if(ss.get_packet(data,cid))
+ {
+ server_connection* con=ss.get_connection(cid);
+ con->write("XYZ");
+ }
+ ss.cleanup();
+ }
+ ss.cleanup();
+
+ CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[new_connection]));
+ CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[connection_closed]));
+ CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[connection_deleted]));
+ }
+ }
+ }
+
+ void CallbackOrder()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ string data;
+ // wait till server is up
+ sleep(1);
+
+ {
+ socket_client_connection sc("./socket");
+
+ sc.write("1");
+
+ // wait half a sec
+ sc.fill_buffer(500000);
+ sc.get_packet(data);
+
+ sc.write("2");
+
+ // close the connection
+ }
+
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ socket_server ss("./socket");
+
+ ss.add_callback(connection_closed,bind(&test_callback::callback_func, boost::ref(*this), connection_closed, _1));
+ ss.add_callback(connection_deleted,bind(&test_callback::callback_func, boost::ref(*this), connection_deleted, _1));
+
+ bool got_1=false;
+
+ for (int i=0; i < 4; i++)
+ {
+ ss.fill_buffer(500000);
+
+ string data;
+ unsigned int cid;
+ if(!got_1 && ss.get_packet(data,cid))
+ {
+ server_connection* con=ss.get_connection(cid);
+ con->write("XYZ");
+ got_1=true;
+ // don't call get_packet anymore
+ }
+ ss.cleanup();
+ }
+ ss.cleanup();
+
+ CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[connection_closed]));
+ CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[connection_deleted]));
+
+ for (int i=0; i < 4; i++)
+ {
+ ss.fill_buffer(500000);
+
+ string data;
+ unsigned int cid;
+ ss.get_packet(data,cid);
+ ss.cleanup();
+ }
+ ss.cleanup();
+
+ CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[connection_closed]));
+ CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[connection_deleted]));
+ }
+ }
+ }
+
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(test_callback);