libt2n: (gerd) finish server callbacks
authorGerd v. Egidy <gerd.von.egidy@intra2net.com>
Fri, 27 Oct 2006 23:41:59 +0000 (23:41 +0000)
committerGerd v. Egidy <gerd.von.egidy@intra2net.com>
Fri, 27 Oct 2006 23:41:59 +0000 (23:41 +0000)
src/command_server.cpp
src/command_server.hxx
src/server.cpp
src/server.hxx
src/t2n_exception.cpp
src/t2n_exception.hxx
src/types.hxx
test/Makefile.am
test/callback.cpp [new file with mode: 0644]

index e7d6d4d..2a4a7b4 100644 (file)
@@ -46,9 +46,9 @@ command_server::command_server(server& _s)
     s.add_callback(new_connection,bind(&command_server::new_connection_callback, boost::ref(*this), _1));
 }
 
-void command_server::new_connection_callback(server_connection* conn)
+void command_server::new_connection_callback(unsigned int conn_id)
 {
-    cerr << "new connection callback: " << conn->get_id() << endl;
+    cerr << "new connection callback: " << conn_id << endl;
 }
 
 /// handle a command including deserialization and answering
index 2f3df5a..686be14 100644 (file)
@@ -37,7 +37,7 @@ class command_server
 
         void handle(long long usec_timeout=-1);
 
-        void new_connection_callback(server_connection* conn);
+        void new_connection_callback(unsigned int conn_id);
 };
 
 }
index b1f7962..6bc1cba 100644 (file)
 
 #include <sstream>
 
+#include <boost/bind.hpp>
+
 #include "server.hxx"
 #include "log.hxx"
+#include "t2n_exception.hxx"
 
 namespace libt2n
 {
 
 server_connection::server_connection(int _timeout)
-    : connection()
+    : connection(), callbacks(__events_end)
 {
     set_timeout(_timeout);
     reset_timeout();
@@ -34,6 +37,22 @@ server_connection::server_connection(int _timeout)
     my_server=0;
 }
 
+server_connection::~server_connection()
+{
+    // we want the connection_closed callbacks to be called before
+    close();
+
+    do_callbacks(connection_deleted);
+}
+
+void server_connection::close()
+{
+    if (!is_closed())
+    {
+        connection::close();
+        do_callbacks(connection_closed);
+    }
+}
 
 /// get pointer to logging stream, returns NULL if no logging needed
 std::ostream* server_connection::get_logstream(log_level_values level)
@@ -65,6 +84,30 @@ void server_connection::reset_timeout()
     last_action_time=time(NULL);
 }
 
+/** @brief add a callback
+
+    @param event event the function will be called at
+    @param func functor (see boost function) that will be called
+
+    @example use boost::bind to bind to member functions and parameters like this:
+        17 is a fixed parameter that is always added to the call
+        c.add_callback(connection_closed,bind(&my_class::func_to_call_back, boost::ref(*this), 17));
+*/
+void server_connection::add_callback(callback_event_type event, const boost::function<void ()>& func)
+{
+    if (event == new_connection)
+        throw t2n_parameter_error("new_connection callback not allowed for server_connections");
+
+    callbacks[event].push_back(func);
+}
+
+void server_connection::do_callbacks(callback_event_type event)
+{
+    std::list<boost::function<void ()> >::iterator i,ie=callbacks[event].end();
+    for (i=callbacks[event].begin(); i != ie; i++)
+        (*i)();
+}
+
 server::server()
     : callbacks(__events_end)
 {
@@ -84,17 +127,28 @@ server::~server()
 
     @param event event the function will be called at
     @param func functor (see boost function) that will be called
+
+    @example use boost::bind to bind to member functions like this:
+        s.add_callback(new_connection,bind(&my_class::func_to_call_back, boost::ref(*this), _1));
 */
-void server::add_callback(callback_event_type event, const boost::function<void (server_connection*)>& func)
+void server::add_callback(callback_event_type event, const boost::function<void (unsigned int)>& func)
 {
     callbacks[event].push_back(func);
+
+    // add callback to all existing connections
+    if (event != new_connection)
+    {
+        std::map<unsigned int, server_connection*>::iterator ie=connections.end();
+        for(std::map<unsigned int, server_connection*>::iterator i=connections.begin(); i != ie; i++)
+            i->second->add_callback(event,bind(func, i->first));
+    }
 }
 
-void server::do_callbacks(callback_event_type event, server_connection* conn)
+void server::do_callbacks(callback_event_type event, unsigned int conn_id)
 {
-    std::list<boost::function<void (server_connection*)> >::iterator i,ie=callbacks[event].end();
+    std::list<boost::function<void (unsigned int)> >::iterator i,ie=callbacks[event].end();
     for (i=callbacks[event].begin(); i != ie; i++)
-        (*i)(conn);
+        (*i)(conn_id);
 }
 
 int server::add_connection(server_connection* newconn)
@@ -104,9 +158,17 @@ int server::add_connection(server_connection* newconn)
     newconn->set_server(this);
     connections[cid]=newconn;
 
+    // add all callbacks
+    for(int e=connection_closed; e != __events_end; e++)
+    {
+        std::list<boost::function<void (unsigned int)> >::iterator i,ie=callbacks[e].end();
+        for (i=callbacks[e].begin(); i != ie; i++)
+            newconn->add_callback(static_cast<callback_event_type>(e),bind(*i,cid));
+    }
+
     LOGSTREAM(debug,"new connection accepted, id: " << cid);
 
-    do_callbacks(new_connection,newconn);
+    do_callbacks(new_connection,cid);
 
     return cid;
 }
index aca705b..601329c 100644 (file)
@@ -56,6 +56,9 @@ class server_connection : public connection
         void set_id(unsigned int _connection_id)
             { connection_id=_connection_id; }
 
+        /// vector initialized for all callback-types, all elements in each list will be called
+        std::vector<std::list<boost::function<void ()> > > callbacks;
+
     protected:
         server *my_server;
 
@@ -63,7 +66,11 @@ class server_connection : public connection
 
         std::ostream* get_logstream(log_level_values level);
 
+        void do_callbacks(callback_event_type event);
+
     public:
+        virtual ~server_connection();
+
         void check_timeout();
         void reset_timeout();
         void set_timeout(int _timeout)
@@ -72,6 +79,10 @@ class server_connection : public connection
         /// get the id of this connection within the server object
         unsigned int get_id()
             { return connection_id; }
+
+        void close();
+
+        void add_callback(callback_event_type event, const boost::function<void ()>& func);
 };
 
 /**
@@ -87,7 +98,7 @@ class server
         std::ostream *logstream;
 
         /// vector initialized for all callback-types, all elements in each list will be called
-        std::vector<std::list<boost::function<void (server_connection*)> > > callbacks;
+        std::vector<std::list<boost::function<void (unsigned int)> > > callbacks;
 
         unsigned int next_id;
 
@@ -100,7 +111,7 @@ class server
 
         int add_connection(server_connection* newconn);
 
-        void do_callbacks(callback_event_type event, server_connection* conn);
+        void do_callbacks(callback_event_type event, unsigned int conn_id);
 
     public:
         virtual ~server();
@@ -117,7 +128,7 @@ class server
 
         server_connection* get_connection(unsigned int conn_id);
 
-        void add_callback(callback_event_type event, const boost::function<void (server_connection*)>& func);
+        void add_callback(callback_event_type event, const boost::function<void (unsigned int)>& func);
 
         /** @brief look for new data on all open connections, accept new connections
             @param usec_timeout wait until new data is found, max timeout usecs.
index 1aae5c4..0b90f87 100644 (file)
@@ -33,6 +33,7 @@ BOOST_CLASS_EXPORT(libt2n::t2n_server_error)
 BOOST_CLASS_EXPORT(libt2n::t2n_transfer_error)
 BOOST_CLASS_EXPORT(libt2n::t2n_version_mismatch)
 BOOST_CLASS_EXPORT(libt2n::t2n_command_error)
+BOOST_CLASS_EXPORT(libt2n::t2n_parameter_error)
 BOOST_CLASS_EXPORT(libt2n::t2n_serialization_error)
 BOOST_CLASS_EXPORT(libt2n::t2n_runtime_error)
 
@@ -83,6 +84,12 @@ void t2n_command_error::serialize(Archive & ar, const unsigned int version)
 }
 
 template<class Archive>
+void t2n_parameter_error::serialize(Archive & ar, const unsigned int version)
+{
+    ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(t2n_exception);
+}
+
+template<class Archive>
 void t2n_serialization_error::serialize(Archive & ar, const unsigned int version)
 {
     ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(t2n_exception);
index 4ce11aa..a494c86 100644 (file)
@@ -206,6 +206,29 @@ class t2n_command_error : public t2n_exception
             { throw *this; }
 };
 
+/// illegal libt2n option set
+class t2n_parameter_error : public t2n_exception
+{
+    private:
+        friend class boost::serialization::access;
+        template<class Archive>
+        void serialize(Archive & ar, const unsigned int version);
+
+    public:
+        t2n_parameter_error(const std::string& _message)
+            : t2n_exception(_message)
+            { }
+
+        t2n_parameter_error()
+            { }
+
+        t2n_exception* clone() const
+            { return new t2n_parameter_error(*this); }
+
+        void do_throw()
+            { throw *this; }
+};
+
 /// error serializing or deserializing a libt2n command packet
 class t2n_serialization_error : public t2n_exception
 {
index 2641485..fc2811e 100644 (file)
@@ -30,7 +30,7 @@ enum socket_type_value { tcp_s, unix_s };
 
 /// possible events for callback, __events_end must be the last event and must not be used 
 /// for anything else than marking the end
-enum callback_event_type { new_connection=0, connection_closed=1, connection_deleted=2, __events_end };
+enum callback_event_type { new_connection=0, connection_closed, connection_deleted, __events_end };
 
 }
 
index 989448a..6b3755c 100644 (file)
@@ -4,6 +4,6 @@ check_PROGRAMS = test
 
 test_LDADD = $(top_builddir)/src/libt2n.la @BOOST_SERIALIZATION_LIB@ \
        @BOOST_LDFLAGS@ @CPPUNIT_LIBS@
-test_SOURCES = test.cpp comm.cpp simplecmd.cpp
+test_SOURCES = test.cpp comm.cpp simplecmd.cpp callback.cpp
 
 TESTS = test
diff --git a/test/callback.cpp b/test/callback.cpp
new file mode 100644 (file)
index 0000000..beb0c77
--- /dev/null
@@ -0,0 +1,337 @@
+/***************************************************************************
+ *   Copyright (C) 2004 by Intra2net AG                                    *
+ *   info@intra2net.com                                                    *
+ *                                                                         *
+ ***************************************************************************/
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <errno.h>
+#include <signal.h>
+#include <stdio.h>
+
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <stdexcept>
+#include <vector>
+
+#include <boost/bind.hpp>
+
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <cppunit/ui/text/TestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <socket_client.hxx>
+#include <socket_server.hxx>
+
+using namespace std;
+using namespace libt2n;
+using namespace CppUnit;
+
+class test_callback : public TestFixture
+{
+    CPPUNIT_TEST_SUITE(test_callback);
+
+    CPPUNIT_TEST(NewConnCallback);
+    CPPUNIT_TEST(ConnClosedCallback);
+    CPPUNIT_TEST(ConnDeletedCallback);
+    CPPUNIT_TEST(CallbackOrder);
+
+    CPPUNIT_TEST_SUITE_END();
+
+    std::vector<bool> callback_done;
+
+    public:
+
+    void setUp()
+    {
+        callback_done.resize(__events_end);
+    }
+
+    void tearDown()
+    {
+        callback_done.clear();
+    }
+
+    void callback_func(callback_event_type ev, int conn_id)
+    {
+        // we don't care for the conn_id, we just mark the callback as done
+        if (callback_done[ev])
+            throw runtime_error("callback already done for this event");
+
+        callback_done[ev]=true;
+    }
+
+    void NewConnCallback()
+    {
+        pid_t pid;
+
+        switch(pid=fork())
+        {
+            case -1:
+            {
+                CPPUNIT_FAIL("fork error");
+                break;
+            }
+            case 0:
+            // child
+            {
+                string data;
+                // wait till server is up
+                sleep(1);
+
+                {
+                    socket_client_connection sc("./socket");
+
+                    sc.write("ABC");
+
+                    // wait half a sec
+                    sc.fill_buffer(500000);
+                    sc.get_packet(data);
+
+                    // close the connection
+                }
+
+                // don't call atexit and stuff
+                _exit(0);
+            }
+
+            default:
+            // parent
+            {
+                socket_server ss("./socket");
+
+                ss.add_callback(new_connection,bind(&test_callback::callback_func, boost::ref(*this), new_connection, _1));
+
+                // max 3 sec
+                for (int i=0; i < 3; i++)
+                {
+                    ss.fill_buffer(1000000);
+
+                    string data;
+                    unsigned int cid;
+                    if(ss.get_packet(data,cid))
+                    {
+                        server_connection* con=ss.get_connection(cid);
+                        con->write("XYZ");
+                    }
+                }
+                CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[new_connection]));
+                CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[connection_closed]));
+                CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[connection_deleted]));
+            }
+        }
+    }
+
+    void ConnClosedCallback()
+    {
+        pid_t pid;
+
+        switch(pid=fork())
+        {
+            case -1:
+            {
+                CPPUNIT_FAIL("fork error");
+                break;
+            }
+            case 0:
+            // child
+            {
+                string data;
+                // wait till server is up
+                sleep(1);
+
+                {
+                    socket_client_connection sc("./socket");
+
+                    sc.write("ABC");
+
+                    // wait half a sec
+                    sc.fill_buffer(500000);
+                    sc.get_packet(data);
+
+                    // close the connection
+                }
+
+                // don't call atexit and stuff
+                _exit(0);
+            }
+
+            default:
+            // parent
+            {
+                socket_server ss("./socket");
+
+                ss.add_callback(connection_closed,bind(&test_callback::callback_func, boost::ref(*this), connection_closed, _1));
+
+                // max 3 sec
+                for (int i=0; i < 3; i++)
+                {
+                    ss.fill_buffer(1000000);
+
+                    string data;
+                    unsigned int cid;
+                    if(ss.get_packet(data,cid))
+                    {
+                        server_connection* con=ss.get_connection(cid);
+                        con->write("XYZ");
+                    }
+                }
+                CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[new_connection]));
+                CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[connection_closed]));
+                CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[connection_deleted]));
+            }
+        }
+    }
+
+    void ConnDeletedCallback()
+    {
+        pid_t pid;
+
+        switch(pid=fork())
+        {
+            case -1:
+            {
+                CPPUNIT_FAIL("fork error");
+                break;
+            }
+            case 0:
+            // child
+            {
+                string data;
+                // wait till server is up
+                sleep(1);
+
+                {
+                    socket_client_connection sc("./socket");
+
+                    sc.write("ABC");
+
+                    // wait half a sec
+                    sc.fill_buffer(500000);
+                    sc.get_packet(data);
+
+                    // close the connection
+                }
+
+                // don't call atexit and stuff
+                _exit(0);
+            }
+
+            default:
+            // parent
+            {
+                socket_server ss("./socket");
+
+                ss.add_callback(connection_deleted,bind(&test_callback::callback_func, boost::ref(*this), connection_deleted, _1));
+
+                // max 3 sec
+                for (int i=0; i < 3; i++)
+                {
+                    ss.fill_buffer(1000000);
+
+                    string data;
+                    unsigned int cid;
+                    if(ss.get_packet(data,cid))
+                    {
+                        server_connection* con=ss.get_connection(cid);
+                        con->write("XYZ");
+                    }
+                    ss.cleanup();
+                }
+                ss.cleanup();
+
+                CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[new_connection]));
+                CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[connection_closed]));
+                CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[connection_deleted]));
+            }
+        }
+    }
+
+    void CallbackOrder()
+    {
+        pid_t pid;
+
+        switch(pid=fork())
+        {
+            case -1:
+            {
+                CPPUNIT_FAIL("fork error");
+                break;
+            }
+            case 0:
+            // child
+            {
+                string data;
+                // wait till server is up
+                sleep(1);
+
+                {
+                    socket_client_connection sc("./socket");
+
+                    sc.write("1");
+
+                    // wait half a sec
+                    sc.fill_buffer(500000);
+                    sc.get_packet(data);
+
+                    sc.write("2");
+
+                    // close the connection
+                }
+
+                // don't call atexit and stuff
+                _exit(0);
+            }
+
+            default:
+            // parent
+            {
+                socket_server ss("./socket");
+
+                ss.add_callback(connection_closed,bind(&test_callback::callback_func, boost::ref(*this), connection_closed, _1));
+                ss.add_callback(connection_deleted,bind(&test_callback::callback_func, boost::ref(*this), connection_deleted, _1));
+
+                bool got_1=false;
+
+                for (int i=0; i < 4; i++)
+                {
+                    ss.fill_buffer(500000);
+
+                    string data;
+                    unsigned int cid;
+                    if(!got_1 && ss.get_packet(data,cid))
+                    {
+                        server_connection* con=ss.get_connection(cid);
+                        con->write("XYZ");
+                        got_1=true;
+                        // don't call get_packet anymore
+                    }
+                    ss.cleanup();
+                }
+                ss.cleanup();
+
+                CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[connection_closed]));
+                CPPUNIT_ASSERT_EQUAL(false,static_cast<bool>(callback_done[connection_deleted]));
+
+                for (int i=0; i < 4; i++)
+                {
+                    ss.fill_buffer(500000);
+
+                    string data;
+                    unsigned int cid;
+                    ss.get_packet(data,cid);
+                    ss.cleanup();
+                }
+                ss.cleanup();
+
+                CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[connection_closed]));
+                CPPUNIT_ASSERT_EQUAL(true,static_cast<bool>(callback_done[connection_deleted]));
+            }
+        }
+    }
+
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(test_callback);