libasnyncio: (reinhard) more moves. UNUSABLE YET.
authorReinhard Pfau <reinhard.pfau@intra2net.com>
Mon, 20 Oct 2008 15:31:06 +0000 (15:31 +0000)
committerReinhard Pfau <reinhard.pfau@intra2net.com>
Mon, 20 Oct 2008 15:31:06 +0000 (15:31 +0000)
14 files changed:
asnycio/Makefile.am [new file with mode: 0644]
asnycio/libsimpleio.pc.in [new file with mode: 0644]
asnycio/simplecallout.cpp [new file with mode: 0644]
asnycio/simplecallout.hpp [new file with mode: 0644]
asnycio/simpleio.cpp [new file with mode: 0644]
asnycio/simpleio.hpp [new file with mode: 0644]
asnycio/simplepipe.cpp [new file with mode: 0644]
asnycio/simplepipe.hpp [new file with mode: 0644]
asnycio/simpleprocess.cpp [new file with mode: 0644]
asnycio/simpleprocess.hpp [new file with mode: 0644]
asnycio/simplesocket.cpp [new file with mode: 0644]
asnycio/simplesocket.hpp [new file with mode: 0644]
asnycio/simpletimer.cpp [new file with mode: 0644]
asnycio/simpletimer.hpp [new file with mode: 0644]

diff --git a/asnycio/Makefile.am b/asnycio/Makefile.am
new file mode 100644 (file)
index 0000000..2694ef2
--- /dev/null
@@ -0,0 +1,13 @@
+INCLUDES = @LIBI2NCOMMON_CFLAGS@ @BOOST_CPPFLAGS@
+METASOURCES = AUTO
+lib_LTLIBRARIES = libsimpleio.la
+libsimpleio_la_SOURCES = simplecallout.cpp simpleio.cpp simplepipe.cpp \
+       simpleprocess.cpp simplesocket.cpp simpletimer.cpp
+include_HEADERS = simplecallout.hpp simpleio.hpp simplepipe.hpp \
+       simpleprocess.hpp simplesocket.hpp simpletimer.hpp
+libsimpleio_la_LIBADD = @LIBI2NCOMMON_LIBS@ @BOOST_LDFLAGS@ @BOOST_SIGNALS_LIB@
+
+libsimpleio_la_LDFLAGS = -version-info @LIBSIMPLEIO_LIB_VERSION@
+
+pkgconfigdir=$(libdir)/pkgconfig
+pkgconfig_DATA= libsimpleio.pc
diff --git a/asnycio/libsimpleio.pc.in b/asnycio/libsimpleio.pc.in
new file mode 100644 (file)
index 0000000..a671d47
--- /dev/null
@@ -0,0 +1,11 @@
+prefix=@prefix@
+exec_prefix=@exec_prefix@
+libdir=@libdir@
+includedir=@includedir@
+
+Name: libsimpleio
+Description: asynchrounous io lib
+Version: @VERSION@
+Requires: libi2ncommon
+Libs: -L${libdir} -lsimpleio
+Cflags: -I${includedir}
diff --git a/asnycio/simplecallout.cpp b/asnycio/simplecallout.cpp
new file mode 100644 (file)
index 0000000..13f5c63
--- /dev/null
@@ -0,0 +1,336 @@
+/**
+ * @file
+ *
+ * @copyright &copy; Copyright 2008 by Intra2net AG
+ * @license commercial
+ * 
+ * info@intra2net.com
+ */
+
+#include "simplecallout.hpp"
+
+#include <tracefunc.hpp>
+#include <logfunc.hpp>
+
+#include <map>
+
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+namespace
+{
+
+typedef boost::shared_ptr< Detail::Caller > CallerPtr;
+
+typedef std::map< unsigned long, CallerPtr > CallMap;
+
+
+unsigned long l_last_id=0;
+
+CallMap l_call_map;
+
+
+/**
+ * @brief creates a new id value for a call out.
+ * @return the new id.
+ *
+ * The id value is basically just a counter.
+ * It is implemented in a way that it can not be 0 and can deal with wrap arounds.
+ */
+unsigned long create_call_out_id_value()
+{
+   while ( l_call_map.find(++l_last_id) != l_call_map.end() and l_last_id != 0);
+   return l_last_id;
+} // eo create_call_out_id_value
+
+
+void add_call( CallerPtr caller )
+{
+   if (caller->joinId())
+   {
+      l_call_map[ caller->getCallOutId().getValue() ] = caller;
+   }
+} // eo add_call
+
+
+bool remove_call( unsigned long id_value )
+{
+   CallMap::iterator it= l_call_map.find(id_value);
+   if (it != l_call_map.end())
+   {
+      l_call_map.erase(it);
+      return true;
+   }
+   return false;
+} // eo remove_call(unsigned long)
+
+
+CallerPtr get_call(unsigned long id_value)
+{
+   CallMap::iterator it= l_call_map.find(id_value);
+   if (it != l_call_map.end())
+   {
+      return it->second;
+   }
+   return CallerPtr();
+} // eo get_call(unsigned long)
+
+
+bool has_call( unsigned long id_value )
+{
+   CallMap::iterator it= l_call_map.find(id_value);
+   return (it != l_call_map.end() );
+} // eo has_call(unsigned long)
+
+
+
+} // eo namespace <anonymous>
+
+
+
+/*
+** implementation of class CallOutId
+*/
+
+CallOutId::CallOutId()
+: m_value(0)
+{
+} // eo CallOutId::CallOutId()
+
+
+CallOutId::CallOutId(unsigned long value)
+: m_value(value)
+{
+} // eo CallOutId::CallOutId(unsigned long)
+
+
+bool CallOutId::thaw() const
+{
+   if (m_caller_weak_ptr.expired())
+   {
+      return false;
+   }
+   CallerPtr call_ptr= get_call(m_value);
+   if (call_ptr)
+   {
+      return call_ptr->thaw();
+   }
+   return false;
+} // eo CallOutId::thaw() const
+
+
+bool CallOutId::remove()
+{
+   if (m_caller_weak_ptr.expired())
+   {
+      return false;
+   }
+   unsigned long value= m_value;
+   m_value= 0;
+   return remove_call(value);
+} // eo CallOutId::remove()
+
+
+bool CallOutId::active() const
+{
+   return m_value!=0 and not m_caller_weak_ptr.expired() and has_call(m_value);
+} // eo CallOutId::active() const
+
+
+/**
+ * @brief retruns if the call is frozen.
+ * @return @a true iff the call is frozen.
+ */
+bool CallOutId::frozen() const
+{
+    CallerPtr caller= m_caller_weak_ptr.lock();
+    if (not caller)
+    {
+        return false;
+    }
+    return caller->frozen();
+} // eo CallOutId::frozen() const
+
+
+/**
+ * @brief returns the remaining time until the call is done or thrown away (on frozen calls).
+ * @return the remaining time.
+ *
+ * The result only makes sense if the call is still active.
+ */
+MilliTime CallOutId::remaining_time()
+{
+    CallerPtr caller= m_caller_weak_ptr.lock();
+    if ( not active() or not caller )
+    {
+        return MilliTime();
+    }
+    MilliTime t;
+    get_current_monotonic_time(t);
+    MilliTime result= caller->getWhenTime();
+    result-= t;
+    MilliTime t_null;
+    return (result < t_null ? t_null : result);
+} // eo CallOutId::remaining_time()
+
+
+namespace Detail
+{
+
+/*
+** implementation of class Caller
+*/
+
+Caller::Caller( boost::function< void() > f, long delta_sec, long delta_msec, bool frozen)
+: TimerBase()
+, m_call_out_id( create_call_out_id_value() )
+, m_func(f)
+, m_waiting(frozen)
+{
+   SCOPETRACKER();
+   setDeltaWhenTime( delta_sec, delta_msec);
+} // eo Caller::Caller(boost::function< void() >,long)
+
+
+Caller::~Caller()
+{
+   SCOPETRACKER();
+} // eo Caller::~Caller()
+
+
+void Caller::execute()
+{
+   SCOPETRACKER();
+   // NOTE: since the func may throw an exception, we first get a shared pointer
+   // (to prevent early deletion) and then we remove us from the call map.
+   CallerPtr ptr= shared_from_this();
+   m_call_out_id.remove();
+
+   if (m_func and not m_waiting)
+   {
+      m_func(); // may throw..., but at this point it doesn't harm.
+      //( it may harm at other places,...)
+   }
+} // eo Caller::execute()
+
+
+bool Caller::thaw()
+{
+   if (m_waiting)
+   {
+      m_waiting= false;
+      setDeltaWhenTime( 0, 0 );
+      return true;
+   }
+   return false;
+} // eo Caller::thaw()
+
+
+bool Caller::joinId()
+{
+   if (m_call_out_id.m_caller_weak_ptr.expired())
+   {
+      m_call_out_id.m_caller_weak_ptr= shared_from_this();
+      activate();
+      return true;
+   }
+   return false;
+} // eo Caller::joinId()
+
+
+bool Caller::frozen() const
+{
+    return m_waiting;
+} // eo Caller::frozen() const
+
+
+} // eo namespace Detail
+
+
+
+/**
+ * @brief remove a pending call by id.
+ *
+ * @param id the call id which should be removed.
+ * @return @a true iff the call was removed, @a false if no call with the given id was found.
+ */
+bool removeCallOut( CallOutId& id )
+{
+   return id.remove();
+} // eo removeCallOut(CallOutId&)
+
+
+
+template<>
+CallOutId callOut( boost::function< void() > f, long delta_sec)
+{
+   CallerPtr caller( new Detail::Caller(f,delta_sec) );
+   add_call(caller);
+   return caller->getCallOutId();
+} // eo callOut(boost::function< void() >,long)
+
+template<>
+CallOutId callOut( boost::function< void() > f, int delta_sec)
+{
+   return callOut<long>(f,delta_sec);
+} // eo callOut(boost::function< void() >,int)
+
+
+template<>
+CallOutId callOut( boost::function< void() > f, double delta_sec )
+{
+   long delta_sec_i = (long)delta_sec;
+   long delta_sec_m = (long)((delta_sec - (double)delta_sec_i)*1000.0);
+   CallerPtr caller( new Detail::Caller(f,delta_sec_i, delta_sec_m) );
+   add_call(caller);
+   return caller->getCallOutId();
+} // eo callOut(boost::function< void() >,double)
+
+
+template<>
+CallOutId callOut( boost::function< void() > f, float delta_sec )
+{
+   return callOut<double>(f,delta_sec);
+} // eo callOut(boost::function< void() >,float)
+
+
+
+
+template<>
+CallOutId frozenCall( boost::function< void() > f, long delta_sec)
+{
+   CallerPtr caller( new Detail::Caller(f,delta_sec, 0, true) );
+   add_call(caller);
+   return caller->getCallOutId();
+} // eo frozenCall(boost::function< void() >,long)
+
+template<>
+CallOutId frozenCall( boost::function< void() > f, int delta_sec)
+{
+   return frozenCall<long>(f,delta_sec);
+} // eo frozenCall(boost::function< void() >,int)
+
+
+template<>
+CallOutId frozenCall( boost::function< void() > f, double delta_sec )
+{
+   long delta_sec_i = (long)delta_sec;
+   long delta_sec_m = (long)((delta_sec - (double)delta_sec_i)*1000.0);
+   CallerPtr caller( new Detail::Caller(f,delta_sec_i, delta_sec_m, true) );
+   add_call(caller);
+   return caller->getCallOutId();
+} // eo frozenCall(boost::function< void() >,double)
+
+
+template<>
+CallOutId frozenCall( boost::function< void() > f, float delta_sec )
+{
+   return frozenCall<double>(f,delta_sec);
+} // eo frozenCall(boost::function< void() >,float)
+
+
+} // eo namespace SimpleIo
+} // eo namespace I2n
diff --git a/asnycio/simplecallout.hpp b/asnycio/simplecallout.hpp
new file mode 100644 (file)
index 0000000..c895bbc
--- /dev/null
@@ -0,0 +1,158 @@
+/**
+ * @file
+ * @brief provides a method for delayed execution of functions.
+ *
+ *
+ * @copyright &copy; Copyright 2008 by Intra2net AG
+ * @license commercial
+ *
+ * info@intra2net.com
+ */
+
+#ifndef __SIMPLEIO_SIMPLECALLOUT_HPP_
+#define __SIMPLEIO_SIMPLECALLOUT_HPP_
+
+#include "simpleio.hpp"
+
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/weak_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+// forward declarations:
+namespace Detail {
+
+class Caller;
+
+typedef boost::shared_ptr< Caller > CallerPtr;
+typedef boost::weak_ptr< Caller > CallerWeakPtr;
+
+} // eo namespace Detail
+
+
+/**
+ * @brief represents an id for a deferred call.
+ *
+ * Also provides methods for modifying the call
+ * (like thaw or delete it).
+ */
+class CallOutId
+{
+      friend class Detail::Caller;
+
+   public:
+      CallOutId();
+
+      unsigned long getValue() const {return m_value;}
+
+      bool thaw() const;
+      bool remove();
+
+      bool active() const;
+      bool frozen() const;
+
+      MilliTime remaining_time();
+
+   private:
+
+      CallOutId(unsigned long value);
+
+   private:
+
+      unsigned long m_value;
+
+      Detail::CallerWeakPtr m_caller_weak_ptr;
+
+}; // eo class CallOutId
+
+
+
+/*
+**
+*/
+
+namespace Detail {
+
+/**
+ * @brief tool class for holding and executing a deferred call.
+ *
+ */
+class Caller
+: public TimerBase
+, public boost::enable_shared_from_this< Caller >
+{
+   public:
+      Caller( boost::function< void() > f, long delta_sec, long delta_msec=0, bool frozen=false );
+      virtual ~Caller();
+
+      CallOutId getCallOutId() const { return m_call_out_id; }
+
+      bool thaw();
+
+      bool joinId();
+
+      bool frozen() const;
+
+   protected:
+
+      virtual void execute();
+
+   private:
+
+      CallOutId m_call_out_id;
+      boost::function< void() > m_func;
+      bool m_waiting;
+}; // eo class Caller
+
+
+} // eo namespace Detail
+
+/*
+**
+*/
+
+/**
+ * @brief initiates a deferred call of a function.
+ *
+ * @param f the function which should be called.
+ * @param delta_sec the delta time (in seconds) when the function should be called.
+ * @return an id to identify the call (may be used for preliminary removal of the call)
+ */
+template< typename F >
+CallOutId callOut( boost::function< void() > f, F delta_sec );
+
+template<> CallOutId callOut( boost::function< void() > f, long delta_sec );
+template<> CallOutId callOut( boost::function< void() > f, double delta_sec );
+template<> CallOutId callOut( boost::function< void() > f, float delta_sec );
+template<> CallOutId callOut( boost::function< void() > f, int delta_sec );
+
+
+/**
+ * @brief initiates a frozen call of a function.
+ *
+ * @param f the function which should be called.
+ * @param delta_sec the delta time (in seconds) when the call will be (silently) removed.
+ * @return an id to identify the call; neccessary for thaw the call.
+ */
+template< typename F >
+CallOutId frozenCall( boost::function< void() > f, F delta_sec);
+
+template<> CallOutId frozenCall( boost::function< void() > f, long delta_sec );
+template<> CallOutId frozenCall( boost::function< void() > f, double delta_sec );
+template<> CallOutId frozenCall( boost::function< void() > f, float delta_sec );
+template<> CallOutId frozenCall( boost::function< void() > f, int delta_sec );
+
+
+
+bool removeCallOut( CallOutId& id );
+
+
+} // eo namespace SimpleIo
+} // eo namespace I2n
+
+#endif
diff --git a/asnycio/simpleio.cpp b/asnycio/simpleio.cpp
new file mode 100644 (file)
index 0000000..9bf0cad
--- /dev/null
@@ -0,0 +1,1695 @@
+/** @file
+ *
+ *
+ * @copyright Copyright &copy; 2007-2008 by Intra2net AG
+ * @license commercial
+ * @contact info@intra2net.com
+ */
+
+//#define NOISEDEBUG
+
+#include "simpleio.hpp"
+
+#include <list>
+#include <vector>
+#include <map>
+#include <algorithm>
+#include <utility>
+
+#include <sys/poll.h>
+#include <sys/time.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include <boost/bind.hpp>
+
+#include <signalfunc.hpp>
+#include <timefunc.hxx>
+
+#include <iostream>
+
+#ifdef NOISEDEBUG
+#include <iostream>
+#include <iomanip>
+#define DOUT(msg) std::cout << msg << std::endl
+#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
+//#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
+#define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
+#else
+#define DOUT(msg) do {} while (0)
+#define FODOUT(obj,msg) do {} while (0)
+#define ODOUT(msg) do {} while (0)
+#endif
+
+namespace
+{
+
+
+/*
+ * configuration:
+ */
+
+
+const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
+
+
+/**
+ * contains internal helper structs and functions for io handling.
+ */
+namespace internal_io
+{
+
+/**
+ * extends struct pollfd with some convenience functions
+ */
+struct PollFd : public ::pollfd
+{
+    PollFd()
+    {
+        fd= 0;
+        events= revents= 0;
+    } // eo PollFd
+
+
+    /**
+     * initializes the struct with a given file descriptor and clears the event mask(s).
+     * @param _fd 
+     */
+    PollFd(int _fd)
+    {
+        fd= _fd;
+        events= revents= 0;
+    } // eo PollFd
+
+
+    /**
+     * set that we want to be notified about new incoming data
+     */
+    void setPOLLIN()  { events |= POLLIN; }
+
+    /**
+     * set that we want to be notified if we can send (more) data.
+     */
+    void setPOLLOUT() { events |= POLLOUT; }
+
+}; // eo struct PollFd
+
+
+typedef std::vector<PollFd> PollVector;
+typedef std::map<int,PollVector::size_type> FdPollMap;
+typedef std::map<int,I2n::SimpleIo::IOImplementation*> FdIOMap;
+
+
+/**
+ * struct for interfacing our local structures with poll()
+ */
+struct PollDataCluster
+{
+    PollVector m_poll_vector;
+    FdPollMap  m_fd_poll_map;
+    FdIOMap    m_read_fd_io_map;
+    FdIOMap    m_write_fd_io_map;
+
+    void add_read_fd( int fd, I2n::SimpleIo::IOImplementation* io);
+    void add_write_fd( int fd, I2n::SimpleIo::IOImplementation* io);
+
+    pollfd* get_pollfd_ptr();
+    unsigned int get_num_pollfds() const;
+
+}; // eo struct PollDataCluster
+
+
+template<class T>
+class PtrList : public std::list<T*>
+{
+        typedef std::list<T*> inherited;
+    public:
+        bool dirty;
+
+        static int Instances;
+
+    public:
+
+        PtrList()
+        : dirty(false)
+        {
+            ++Instances;
+        } // eo PtrList
+
+
+        ~PtrList()
+        {
+            ODOUT("");
+            --Instances;
+        }
+
+
+        /**
+         * add a new item pointer to the list.
+         *
+         * @param item item pointer which should be added
+         */
+        void add_item(T* item)
+        {
+            typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item);
+            if (it != inherited::end())
+            {
+                // nothing to do since item is already in the list
+                return;
+            }
+            push_back(item);
+        } // eo add
+
+
+        /**
+         * remove an item pointer from the list by setting NULL at the current position of the item and mark
+         * the list as dirty.
+         *
+         * @param item the io object which should be removed from the list.
+         */
+        void remove_item(T* item)
+        {
+            typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item);
+            if (it == inherited::end())
+            {
+                // nothing to do:
+                return;
+            }
+            *it = NULL; // forget the pointer
+            dirty= true; // ..and mark the list as dirty (i.e. has NULL elements)
+        } // eo remove
+
+
+        /**
+         * cleans the list of objects by removing the NULL elements (if any).
+         *
+         * @note this function should only be called when it is ensured that no other functions using iterators of this list.
+         */
+        void clean_list()
+        {
+            if (!dirty)
+            {
+                // nothing to do
+                return;
+            }
+            // remove the NULL elements now:
+            erase(
+                std::remove( inherited::begin(), inherited::end(), (T*)NULL),
+                inherited::end() );
+            dirty= false;
+        } // eo clean_list
+
+
+}; // eo class PtrList
+
+
+typedef PtrList<I2n::SimpleIo::IOImplementation> IOList;
+typedef PtrList<I2n::SimpleIo::TimerBase>        TimerList;
+
+template<> int IOList::Instances= 0;
+template<> int TimerList::Instances= 0;
+
+
+/**
+ * the (internal) global list of io objects (object pointers)
+ */
+IOList& g_io_list()
+{
+    static IOList _the_io_list;
+    return _the_io_list;
+};
+
+
+/**
+ * the (internal) global list of timer objects (object pointers)
+ */
+TimerList& g_timer_list()
+{
+    static TimerList _the_timer_list;
+    return _the_timer_list;
+}
+
+/*
+ * implementation of PollDataCluster
+ */
+
+
+/**
+ * add a new file descriptor to the read list.
+ *
+ * @param fd the file descriptor.
+ * @param io the io object which uses the fd for reading.
+ */
+void PollDataCluster::add_read_fd( int fd, I2n::SimpleIo::IOImplementation* io)
+{
+    FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
+    if (itPollMap != m_fd_poll_map.end())
+    {
+        m_poll_vector[ itPollMap->second ].setPOLLIN();
+    }
+    else
+    {
+        PollFd item(fd);
+        item.setPOLLIN();
+        m_fd_poll_map[fd] = m_poll_vector.size();
+        m_poll_vector.push_back( item );
+    }
+    m_read_fd_io_map[fd]= io;
+} // eo PollDataCluster::add_read_fd
+
+
+/**
+ * add a new file descriptor to the write list.
+ *
+ * @param fd the file descriptor.
+ * @param io the io object which uses the fd for writing.
+ */
+void PollDataCluster::add_write_fd( int fd, I2n::SimpleIo::IOImplementation* io)
+{
+    FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
+    if (itPollMap != m_fd_poll_map.end())
+    {
+        m_poll_vector[ itPollMap->second ].setPOLLOUT();
+    }
+    else
+    {
+        PollFd item(fd);
+        item.setPOLLOUT();
+        m_fd_poll_map[fd] = m_poll_vector.size();
+        m_poll_vector.push_back( item );
+    }
+    m_write_fd_io_map[fd]= io;
+} // eo PollDataCluster::add_write_fd
+
+
+/**
+ * returns a pointer to a pollfd array; suitable for passing to poll()
+ *
+ * @return pointer to pollfd array
+ */
+pollfd* PollDataCluster::get_pollfd_ptr()
+{
+    return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
+} // eo get_pollfd_ptr
+
+
+/**
+ * returns the number of entries in the pollfd array; suitable for passing to poll()
+ *
+ * @return the number of entries in the pollfd array
+ */
+unsigned int PollDataCluster::get_num_pollfds() const
+{
+    return m_poll_vector.size();
+} // eo get_num_pollfds
+
+
+
+} // eo namespace internal_io
+
+
+/*
+ * some internal tool functions and structures
+ */
+
+struct FilterMatch {
+    I2n::SimpleIo::FilterBasePtr m_filter;
+
+    FilterMatch(I2n::SimpleIo::FilterBasePtr filter)
+    : m_filter(filter)
+    {}
+
+    bool operator () (const I2n::SimpleIo::FilterBasePtr& item)
+    {
+        return item && item == m_filter;
+    }
+
+}; // eo struct FilterMatch
+
+
+void get_current_real_time(long& current_sec, long& current_msec)
+{
+    struct timeval tv;
+    gettimeofday(&tv,NULL);
+    current_sec= tv.tv_sec;
+    current_msec= (tv.tv_usec / 1000);
+    if (current_msec >= 1000)
+    {
+        current_sec += (current_msec / 1000);
+        current_msec%= 1000;
+    }
+} // eo get_current_real_time
+
+
+void get_current_monotonic_time(long& current_sec, long& current_msec)
+{
+    long nsec;
+    if (monotonic_clock_gettime(current_sec,nsec))
+    {
+        current_msec= nsec / 1000000L;
+    }
+    else
+    {
+        //fallback...
+        get_current_real_time(current_sec,current_msec);
+    }
+} // eo get_current_monotonic_time
+
+
+
+} // eo anonymous namespace
+
+
+
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+
+/**
+ * @brief gets the current time as MilliTime structure.
+ * @param[out] mt reference to the MilliTime strcucture which is filled with the result.
+ */
+void get_current_real_time(MilliTime& mt)
+{
+    long sec, msec;
+    ::get_current_real_time(sec,msec);
+    mt.set(sec,msec);
+} // eo get_current_real_time
+
+
+/**
+ * @brief gets the current time as MilliTime structure.
+ * @param[out] mt reference to the MilliTime strcucture which is filled with the result.
+ */
+void get_current_monotonic_time(MilliTime& mt)
+{
+    long sec, msec;
+    ::get_current_monotonic_time(sec,msec);
+    mt.set(sec,msec);
+} // eo get_current_monotonic_time
+
+
+/*
+ * implementation of MilliTime
+ */
+
+MilliTime::MilliTime(long sec, long msec)
+: mt_sec(sec), mt_msec(msec)
+{
+    normalize();
+} // eo MilliTime::MilliTime
+
+
+void MilliTime::set(long sec, long msec)
+{
+    mt_sec= sec;
+    mt_msec= msec;
+    normalize();
+} // eo MilliTime::set
+
+
+/**
+ * normalizes the values, so that mt_msec has a value between 0 and 999.
+ */
+void MilliTime::normalize()
+{
+    if (mt_msec < 0)
+    {
+        mt_sec += (mt_msec / 1000) - 1;
+        mt_msec = (mt_msec % 1000) + 1000;
+    }
+    else if (mt_msec>=1000)
+    {
+        mt_sec+= (mt_msec / 1000);
+        mt_msec %= 1000;
+    }
+} // eo MilliTime::normalize
+
+
+/**
+ * determine if the represented point in time is before another one.
+ * @param other the other point in time.
+ * @return true if the own point in time is before the other one.
+ */
+bool MilliTime::operator < (MilliTime& other)
+{
+    normalize();
+    other.normalize();
+    return
+        (mt_sec < other.mt_sec)
+        || (( mt_sec == other.mt_sec) && (mt_msec < other.mt_msec));
+} // eo MilliTime::operator <
+
+
+/**
+ * determine if two point in times are equal.
+ * @param other the point in time to compare with.
+ * @return true if the represented times are equal.
+ */
+bool MilliTime::operator == (MilliTime& other)
+{
+    normalize();
+    other.normalize();
+    return (( mt_sec == other.mt_sec) && (mt_msec == other.mt_msec));
+} // eo MilliTime::operator <
+
+/**
+ * @brief subtracts a time delta from the object.
+ * @param lhs the time delta to subtract.
+ * @return reference to the object itself.
+ */
+MilliTime& MilliTime::operator -= (const MilliTime& lhs)
+{
+    mt_sec  -= lhs.mt_sec;
+    mt_msec -= lhs.mt_msec;
+} // eo operator -=
+
+
+/**
+ * @brief adds a time delta from the object.
+ * @param lhs the time delta to add.
+ * @return reference to the object itself.
+ */
+MilliTime& MilliTime::operator += (const MilliTime& lhs)
+{
+    mt_sec  += lhs.mt_sec;
+    mt_msec += lhs.mt_msec;
+}  // eo operator +=
+
+
+/*
+ * implementation of TimerBase
+ */
+
+/**
+ * constructor. Adds the object to the internal timer list.
+ */
+TimerBase::TimerBase()
+: m_active(false)
+, m_marked(false)
+{
+    internal_io::g_timer_list().add_item(this);
+} // eo TimerBase::TimerBase
+
+
+/**
+ * destructor. Removes the object from the internal timer list.
+ */
+TimerBase::~TimerBase()
+{
+    ODOUT("enter");
+    if (internal_io::TimerList::Instances)
+    {
+        ODOUT("remove from list");
+        internal_io::g_timer_list().remove_item(this);
+    }
+} // eo TimerBase::~TimerBase
+
+
+/**
+ * @brief returns the point in time when the time is executed in real time.
+ * @return the point in time when the timer is to be executed.
+ */
+MilliTime TimerBase::getRealWhenTime() const
+{
+    MilliTime mono_time;
+    MilliTime real_time;
+    get_current_monotonic_time(mono_time);
+    get_current_real_time(real_time);
+    MilliTime result= m_when - mono_time + real_time;
+    return result;
+} // eo TimerBase::getRealWhenTime() const
+
+
+/**
+ * sets the time when the event should be executed.
+ * @param sec the seconds part of the point in time.
+ * @param msec  the milliseconds part of the point in time.
+ */
+void TimerBase::setWhenTime(long sec, long msec)
+{
+    m_when.set(sec,msec);
+    m_marked= false;
+} // eo TimerBase::setWhenTime
+
+
+/**
+ * sets the time when the event should be executed.
+ * @param mt the point in time.
+ */
+void TimerBase::setWhenTime(const MilliTime& mt)
+{
+    m_when= mt;
+    m_marked= false;
+} // eo TimerBase::setWhenTime
+
+
+/**
+ * sets the time delta measured from current time when the event should be executed.
+ * @param sec the seconds of the time delta
+ * @param msec the milli seconds of the time delta
+ */
+void TimerBase::setDeltaWhenTime(long sec, long msec)
+{
+    setDeltaWhenTime( MilliTime(sec,msec) );
+} // eo TimerBase::setWhenTime
+
+
+
+/**
+ * sets the time delta measured from current time when the event should be executed.
+ * @param mt the time delta
+ */
+void TimerBase::setDeltaWhenTime(const MilliTime& mt)
+{
+    get_current_monotonic_time(m_when);
+    m_when+= mt;
+    m_marked= false;
+} // eo TimerBase::setWhenTime
+
+
+/**
+ * set the active state of the timer event.
+ * @param active determines if the object should be active (default: yes).
+ */
+void TimerBase::activate(bool active)
+{
+    m_active = active;
+    if (!active)
+    {
+        // clear the mark if we are not active.
+        m_marked= false;
+    }
+} // eo TimerBase::activate
+
+
+/** @fn void TimerBase::deactivate()
+ *  deactivates the event by clearing the active state.
+ */
+
+
+/**
+ * called when the timer event occured.
+ */
+void TimerBase::execute()
+{
+} // eo TimerBase::execute
+
+
+/*
+ * implementation of FilterBase class
+ */
+
+
+FilterBase::FilterBase()
+: m_io(NULL)
+{
+} // eo FilterBase::FilterBase()
+
+
+/**
+ * injects incoming data.
+ * @param data  the new data
+ */
+void FilterBase::injectIncomingData(const std::string& data)
+{
+    if (m_io)
+    {
+        FilterBasePtr ptr= get_ptr_as< FilterBase >();
+        if (ptr)
+        {
+            m_io->injectIncomingData(ptr,data);
+        }
+    }
+} // FilterBase::injectIncomingData(const std::string&)
+
+
+/**
+ * injects outgoing data.
+ * @param data the new data
+ */
+void FilterBase::injectOutgoingData(const std::string& data)
+{
+    if (m_io)
+    {
+        FilterBasePtr ptr= get_ptr_as< FilterBase >();
+        if (ptr)
+        {
+            m_io->injectOutgoingData(ptr,data);
+        }
+    }
+} // eo FilterBase::injectOutgoingData(const std::string&)
+
+
+
+/**
+ * called when EOF detected on incoming channel (or incoming channel closed)
+ */
+void FilterBase::endOfIncomingData()
+{
+} // eo FilterBase::endOfIncomingData()
+
+/**
+ * called when the filter should reset.
+ * This is used when a new channel is opened or when the filter is taken out of a filter chain.
+ */
+void FilterBase::reset()
+{
+} // eo FilterBase::reset()
+
+
+/*
+ * implementation of IOImplementation class
+ */
+
+
+/**
+ * constructor for the base io class.
+ *
+ * Also adds the object to internal list of io objects (which is used by the backend).
+ *
+ * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
+ * @param write_fd  the file descriptor which should be used for writing (default -1 for no value)
+ */
+IOImplementation::IOImplementation(int read_fd, int write_fd)
+: m_read_fd(-1)
+, m_write_fd(-1)
+, m_eof(false)
+, m_not_writable(false)
+, m_input_buffer()
+, m_output_buffer()
+, m_marked_for_reading(false)
+, m_marked_for_writing(false)
+{
+    internal_io::g_io_list().add_item(this);
+    if (read_fd >= 0)
+    {
+        setReadFd( read_fd );
+    }
+    if (write_fd >= 0)
+    {
+        setWriteFd( write_fd );
+    }
+} // eo IOImplementation::IOImplementation
+
+
+/**
+ * destructor of the base io class.
+ *
+ * Removes the object from the interal list of io objects.
+ */
+IOImplementation::~IOImplementation()
+{
+    close();
+    if (internal_io::IOList::Instances)
+    {
+        internal_io::g_io_list().remove_item(this);
+    }
+    // now clear the filters:
+    while (! m_filter_chain.empty() )
+    {
+        FilterChain::iterator it = m_filter_chain.begin();
+        (*it)->reset();
+        (*it)->m_io= NULL;
+        //TODO: signal the filter that it is removed ?!
+        m_filter_chain.erase(it);
+    }
+} // eo IOImplementation::~IOImplementation
+
+
+/**
+ * adds another filter to the filter chain.
+ * @param filter pointer to the new filter.
+ */
+void IOImplementation::addFilter
+(
+    FilterBasePtr filter
+)
+{
+    if (!filter)
+    {
+        return; // nothing to do
+    }
+    if (filter->m_io)
+    {
+        filter->m_io->removeFilter(filter);
+    }
+    m_filter_chain.push_back( filter );
+} // eo IOImplementation::addFilter
+
+
+/**
+ * removes a filter from the filter chain.
+ * @param filter the pointer to the filter which is removed.
+ * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
+ * deleting the filter if it was dynamically allocated.
+ */
+void IOImplementation::removeFilter
+(
+    FilterBasePtr filter
+)
+{
+    FilterChain::iterator it =
+        std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
+    if (it != m_filter_chain.end())
+    {
+        filter->reset();
+        filter->m_io= NULL;
+        //TODO: signal the filter that it is removed ?!
+        m_filter_chain.erase(it);
+    }
+} // eo IOImplementation::removeFilter
+
+
+/**
+ * closes the file descriptors (/ the connection).
+ *
+ * @param direction the direction which should be closed (default: @a Direction::both for all).
+ */
+void IOImplementation::close(Direction direction)
+{
+    bool had_read_fd= (m_read_fd >= 0);
+    m_errno= 0;
+    if (direction == Direction::unspecified) direction= Direction::both;
+    if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
+    { // special case: half closed (socket) connections...
+        // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
+        // (read_fd or write_fd) this class works as desired.
+        switch(direction)
+        {
+            case Direction::in:
+                {
+                    int res= ::shutdown(m_read_fd, SHUT_RD);
+                    if (res<0) 
+                    {
+                        m_errno= errno;
+                    }
+                    m_read_fd= -1;
+                    if (!m_eof)
+                    {
+                        for(FilterChain::iterator it= m_filter_chain.begin();
+                            it != m_filter_chain.end();
+                            ++it)
+                        {
+                            (*it)->endOfIncomingData();
+                        }
+                    }
+                }
+                return;
+
+            case Direction::out:
+                {
+                    int res= ::shutdown(m_write_fd, SHUT_WR);
+                    if (res<0)
+                    {
+                        m_errno= errno;
+                    }
+                    m_write_fd= -1;
+                    m_output_buffer.clear();
+                }
+                return;
+        }
+    }
+    if (m_write_fd >= 0 && (direction & Direction::out) )
+    {
+        int res1 = ::close(m_write_fd);
+        if (m_write_fd == m_read_fd)
+        {
+            m_read_fd= -1;
+        }
+        m_write_fd= -1;
+        m_output_buffer.clear();
+        if (res1<0) m_errno= errno;
+    }
+    if (m_read_fd >=0 && (direction & Direction::in) )
+    {
+        int res1 = ::close(m_read_fd);
+        m_read_fd= -1;
+        if (res1<0) m_errno= errno;
+    }
+    if (had_read_fd && !m_eof && (m_read_fd<0))
+    {
+        for(FilterChain::iterator it= m_filter_chain.begin();
+            it != m_filter_chain.end();
+            ++it)
+        {
+            (*it)->endOfIncomingData();
+        }
+    }
+} // eo IOImplementation::close
+
+
+/**
+ * determines if the io class wants to read data.
+ * Default implementation checks only for a valid file descriptor value.
+ *
+ * @return @a true if the objects wants to read data
+ */
+bool IOImplementation::wantRead()
+{
+    return (m_read_fd >= 0) && ! m_eof;
+} // eo IOImplementation::wantRead
+
+
+/**
+ * determines if the io class wants to write data.
+ * Default implementation checks for a valid file descriptor value and if the object
+ * cannot write data immediately.
+ *
+ * @return @a true if the objects wants to write data
+ */
+bool IOImplementation::wantWrite()
+{
+    return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
+} // eo IOImplementation::wantWrite
+
+
+/**
+ * delivers if opened.
+ * The default returns @a true if at least one file descriptor (read or write) is valid.
+ * @return @a true if opened.
+ */
+bool IOImplementation::opened() const
+{
+    return (m_read_fd>=0) || (m_write_fd>=0);
+} // eo IOImplementation::opened() const
+
+
+/**
+ * returns if the read side detected an end of file (EOF).
+ * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
+ */
+bool IOImplementation::eof() const
+{
+    return (m_read_fd < 0) || m_eof;
+} // eo IOImplementatio::eof() const
+
+
+/**
+ * @brief returns of the write side didn't detect that it cannot write.
+ * @return @a true if we can write.
+ */
+bool IOImplementation::writable() const
+{
+    return (m_write_fd >=0 ) and not m_not_writable;
+} // eo IOImplementation::writable() const
+
+
+/**
+ * returns if the output buffer is empty.
+ * @return 
+ */
+bool IOImplementation::empty() const
+{
+    return m_output_buffer.empty();
+} // eo IOImplementation::empty
+
+/**
+ * puts data into the output buffer and sends it immediately if possible,
+ *
+ * The data is passed through the filter chain before it's stored in the output buffer
+ * (i.e. the output buffer contains data as it should be send directly to the descriptor).
+ * @param _data the data which should be send.
+ */
+void IOImplementation::lowSend(const std::string& _data)
+{
+    std::string data(_data);
+
+    for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
+        it_filter!= m_filter_chain.rend();
+        ++it_filter)
+    {
+        data= (*it_filter)->filterOutgoingData(data);
+    }
+    m_output_buffer+= data;
+
+    // if we can send immediately, do it:
+    if (! m_output_buffer.empty() && m_marked_for_writing)
+    {
+        doWrite();
+    }
+} // eo IOImplementation::lowSend
+
+
+/**
+ * called by the backend when there is data to read for this object.
+ *
+ * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
+ * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
+ *
+ * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
+ *
+ * @note overload this method only when You know what You are doing!
+ * (overloading is necessary when handling server sockets.)
+ */
+void IOImplementation::doRead()
+{
+    // static read buffer; should be ok as long as we don't use threads
+    static char buffer[8*1024]; // 8 KiB
+
+    m_errno = 0;
+    if (m_read_fd<0 || !m_marked_for_reading)
+    {
+        ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
+        return;
+    }
+
+    // reset the mark:
+    m_marked_for_reading = false;
+
+    // now read the data:
+    ssize_t count;
+    count = ::read(m_read_fd, buffer, sizeof(buffer));
+
+    ODOUT("::read -> " << count);
+
+    // interpret what we got:
+    if (count < 0) // error
+    {
+        m_errno = errno;
+        int fd= m_read_fd;
+
+        switch(m_errno)
+        {
+            case EINVAL:
+            case EBADF:
+            case ECONNRESET:
+            case ENETRESET:
+                if (fd == m_read_fd)
+                {
+                    close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
+                }
+                break;
+        }
+    }
+    else if (count==0) // EOF
+    {
+        // remember the read fd:
+        int fd = m_read_fd;
+        // remember the EOF:
+        m_eof= true;
+        // signal EOF
+        m_signal_eof();
+        // if the fd is still the same: close it.
+        if (fd == m_read_fd)
+        {
+            close( Direction::in );
+        }
+    }
+    else // we have valid data
+    {
+        std::string data(buffer,count);
+        ODOUT(" got \"" << data << "\"");
+        for(FilterChain::iterator it_filter= m_filter_chain.begin();
+            it_filter != m_filter_chain.end();
+            ++it_filter)
+        {
+            data= (*it_filter)->filterIncomingData(data);
+        }
+        m_input_buffer+= data;
+        m_signal_read();
+    }
+} // eo IOImplementation::doRead
+
+
+/**
+ * interface for filter classes to inject data into the filter chain (emulating new incoming data).
+ * @param from_filter the filter which injects the new data.
+ * @param _data  the new data.
+ */
+void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
+{
+    FilterChain::iterator it_filter =
+        std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
+    if (it_filter == m_filter_chain.end())
+    {
+        // dont accept data inject from a unknown filter
+        return;
+    }
+    // well: pass the data through the remaining filters:
+    // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
+    std::string data(_data);
+    for(++it_filter;
+        it_filter != m_filter_chain.end();
+        ++it_filter)
+    {
+        data= (*it_filter)->filterIncomingData(data);
+    }
+    m_input_buffer+= data;
+    m_signal_read();
+} // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
+
+
+/**
+ * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
+ * @param from_filter the filter which injects the new data.
+ * @param _data  the new data.
+ */
+void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
+{
+    FilterChain::reverse_iterator it_filter =
+        std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
+    if (it_filter == m_filter_chain.rend())
+    {
+        // dont accept data inject from a unknown filter
+        return;
+    }
+    // well: pass the data through the remaining filters:
+    // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
+    std::string data(_data);
+    for(++it_filter;
+        it_filter!= m_filter_chain.rend();
+        ++it_filter)
+    {
+        data= (*it_filter)->filterOutgoingData(data);
+    }
+    m_output_buffer+= data;
+
+    // if we can send immediately, do it:
+    if (! m_output_buffer.empty() && m_marked_for_writing)
+    {
+        doWrite();
+    }
+} // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
+
+
+/**
+ * set the read file descriptor.
+ * Although a derived class can also set the read fd directly; this method should be used
+ * for this task since it updates some flags on the fd for async operation.
+ * @param fd the new read file descriptor.
+ */
+void IOImplementation::setReadFd(int fd)
+{
+    // test if we already have a valid descriptor (and may have to close it):
+    if (m_read_fd >=0 )
+    {
+        if (m_read_fd == fd)
+        {
+            // fd was already right; consider it to be ok.
+            return;
+        }
+        close(Direction::in);
+    }
+    // reset our errno:
+    m_errno= 0;
+    // if the new descriptor looks valid, set some flags:
+    if (fd >= 0)
+    {
+        long flags= ::fcntl(fd, F_GETFL);
+        if (flags != -1)
+        {
+            // set the flags for non blocking, async operation
+            flags |= O_NONBLOCK|O_ASYNC;
+            ::fcntl(fd,F_SETFL, flags);
+        }
+        else if ( errno == EBADF )
+        {
+            // well, we seemed to be fed with an invalid descriptor...:
+            m_errno = errno;
+            fd= -1;
+        }
+    }
+    if (fd >= 0) // if still valid:
+    {
+        // set the close-on-exec flag
+        ::fcntl(fd,F_SETFD, FD_CLOEXEC);
+    }
+    m_read_fd= fd;
+    m_marked_for_reading= false;
+    m_eof= false;
+} // eo IOImplementation::setReadFd(int)
+
+
+
+/**
+ * set the write file descriptor.
+ * Although a derived class can also set the write fd directly; this method should be used
+ * for this task since it updates some flags on the fd for async operation.
+ * @param fd the new write file descriptor.
+ */
+void IOImplementation::setWriteFd(int fd)
+{
+    if (m_write_fd >=0 )
+    {
+        if (m_write_fd == fd)
+        {
+            // fd was already right; consider it to be ok.
+            return;
+        }
+        close(Direction::out);
+    }
+    // reset our errno:
+    m_errno= 0;
+    // if the new descriptor looks valid, set some flags:
+    if (fd >= 0)
+    {
+        long flags= ::fcntl(fd, F_GETFL);
+        if (flags != -1)
+        {
+            // set the flags for non blocking, async operation
+            flags |= O_NONBLOCK|O_ASYNC;
+            ::fcntl(fd,F_SETFL, flags);
+        }
+        else if (errno == EBADF)
+        {
+            // well, we seemed to be fed with an invalid descriptor...:
+            m_errno = errno;
+            fd= -1;
+        }
+    }
+    if (fd >= 0) // if still valid:
+    {
+        // set the close-on-exec flag
+        ::fcntl(fd,F_SETFD, FD_CLOEXEC);
+    }
+    m_write_fd = fd;
+    m_marked_for_writing= false;
+    m_not_writable= false;
+} // eo IOImplementation::setWriteFd(int)
+
+
+
+/**
+ * called by the backend when this object can write data.
+ *
+ * If some data was sended, the signal @a m_signal_write is called.
+ *
+ * @internal tries to write all buffered data to output; if this succeeds,
+ * the connection is assumed to be still able to accept more data.
+ * (i.e. the internal write mark is kept!)
+ *
+  * @note overload this method only when You know what You are doing!
+*/
+void IOImplementation::doWrite()
+{
+    m_errno = 0;
+    if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
+    {
+        return;
+    }
+
+    ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
+
+    //reset mark:
+    m_marked_for_writing= false;
+
+    // now write the data
+    ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
+
+    ODOUT("::write -> " << count);
+
+    if (count < 0) // error
+    {
+        m_errno= errno;
+        int fd= m_write_fd;
+
+        switch(m_errno)
+        {
+            case EPIPE:
+                m_not_writable= true;
+                // emit a signal
+                m_signal_not_writable();
+                // fall through
+            case EINVAL:
+            case EBADF:
+            case ECONNRESET:
+            case ENETRESET:
+                if (fd == m_write_fd)
+                {
+                    close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
+                }
+                break;
+        }
+    }
+    else
+    {
+        m_output_buffer.erase(0, count);
+        if (m_output_buffer.empty())
+        {
+            // special case: if we were able to send all the data, we keep the write mark:
+            m_marked_for_writing= true;
+        }
+    }
+    if (count > 0)
+    {
+        m_signal_write();
+    }
+} // eo IOImplementation::doWrite
+
+
+/*
+ * implementation of SimpleIO
+ */
+
+
+SimpleIO::SimpleIO(int read_fd, int write_fd)
+: inherited(read_fd, write_fd)
+{
+    m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
+} // eo SimpleIO::SimpleIO()
+
+
+SimpleIO::~SimpleIO()
+{
+} // eo SimpleIO::~SimpleIO()
+
+
+/**
+ * sends a string.
+ * @param data the string.
+ */
+void SimpleIO::sendString(const std::string& data)
+{
+    lowSend(data);
+} // eo SimpleIO::sendString(const std::string&)
+
+
+/**
+ * emits the signal signalReceived with the received data.
+ * This slot is connected to IOImplementation::m_signal_read.
+ */
+void SimpleIO::slotReceived()
+{
+    std::string data;
+    data.swap(m_input_buffer);
+    signal_received_string(data);
+} // eo SimpleIO::slotReceived()
+
+
+
+/*
+ * implementation of SimpleIO2
+ */
+
+
+SimpleIO2::SimpleIO2(int read_fd, int write_fd)
+: inherited(read_fd, write_fd)
+{
+    m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
+} // eo SimpleIO2::SimpleIO2()
+
+
+SimpleIO2::~SimpleIO2()
+{
+} // eo SimpleIO2::~SimpleIO2()
+
+
+/**
+ * sends a string.
+ * @param data the string.
+ */
+void SimpleIO2::sendString(const std::string& data)
+{
+    lowSend(data);
+} // eo SimpleIO2::sendString(const std::string&)
+
+
+/**
+ * emits the signal signalReceived with the received data.
+ * This slot is connected to IOImplementation::m_signal_read.
+ */
+void SimpleIO2::slotReceived()
+{
+    std::string data;
+    data.swap(m_input_buffer);
+    signal_received_string(data);
+} // eo SimpleIO2::slotReceived()
+
+
+
+/*
+ * implementation of class Backend (singleton)
+ */
+
+Backend* Backend::g_backend= NULL;
+
+int Backend::m_count_active_steps=0;
+
+
+Backend::Backend()
+: m_count_active_loops(0)
+, m_count_stop_requests(0)
+{
+    SystemTools::ignore_signal( SystemTools::Signal::PIPE );
+} // eo Backend::Backend
+
+
+Backend::~Backend()
+{
+    SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
+} // eo Backend::~Backend()
+
+/**
+ * delivers pointer to the current backend, instantiating a new backend if there was no current one.
+ *
+ * This should be the only way to access the backend which should be a singleton.
+ *
+ * @return the pointer to the current backend.
+ */
+Backend* Backend::getBackend()
+{
+    if (!g_backend)
+    {
+        g_backend = new Backend();
+    }
+    return g_backend;
+} // eo Backend::getBackend
+
+
+
+
+/**
+ * performs one backend cycle.
+ *
+ * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
+ * Also determines the timer events which become due and adjusts the timeout.
+ * Constructs the necessary structures and calls poll().
+ * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
+ *
+ * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
+ * @return @a true if there was at least one active object; otherwise @a false
+ *
+ * @note this method is a little beast.
+ *
+ * @internal 
+ * The cycle is divided into four steps: collecting; poll; mark and execute.
+ * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
+ * are calling @a Backup::doOneStep or open their own local backend loop.
+ *
+ * @todo handle some more error cases.
+ * @todo provide a plugin interface for external handler.
+ * (currently inclusion of external handler is possible by (ab)using timer classes)
+ */
+bool Backend::doOneStep(int timeout)
+{
+    ODOUT( "timeout=" << timeout );
+    internal_io::PollDataCluster poll_data;
+    bool had_active_object = false;
+
+    ++m_count_active_steps;
+
+    try {
+
+        FdSetType local_read_fds;
+        FdSetType local_write_fds;
+
+        // step 1 ; collect
+
+        { // step 1.1: collect fds for read/write operations
+            for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
+                itIOList != internal_io::g_io_list().end();
+                ++itIOList)
+            {
+                if (! *itIOList) continue; // skip NULL entries
+                int  read_fd  = (*itIOList)->m_read_fd;
+                int  write_fd = (*itIOList)->m_write_fd;
+                bool want_read  = (read_fd >= 0) and (*itIOList)->wantRead();
+                bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
+                if (read_fd >= 0 )
+                {
+                    local_read_fds.insert( read_fd );
+                }
+                if (write_fd >= 0)
+                {
+                    local_write_fds.insert( write_fd );
+                }
+                if (!want_read && !want_write) continue;
+                if (want_read)
+                {
+                    FODOUT( (*itIOList), "wants to read  (fd=" << read_fd << ")");
+                    poll_data.add_read_fd(read_fd, *itIOList);
+                }
+                if (want_write)
+                {
+                    FODOUT( (*itIOList), "wants to write  (fd=" << write_fd << ")");
+                    poll_data.add_write_fd(write_fd, *itIOList);
+                }
+                had_active_object= true;
+            }
+        }
+
+        { // step 1.2: collect timer events
+            MilliTime current_time;
+            MilliTime min_event_time;
+
+            get_current_monotonic_time(current_time);
+            bool min_event_time_set;
+
+            if (timeout >= 0)
+            {
+                min_event_time = current_time + MilliTime(0,timeout);
+                min_event_time_set= true;
+            }
+            else
+            {
+                min_event_time = current_time + MilliTime(86400,0);
+                min_event_time_set= false;
+            }
+            // TODO
+
+            for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
+                it_timer != internal_io::g_timer_list().end() 
+                && (!had_active_object || !min_event_time_set || current_time < min_event_time);
+                ++ it_timer)
+            {
+                if (! *it_timer) continue; // skip NULL entries
+                if (! (*it_timer)->m_active) continue; // skip if not enabled
+                if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
+                {
+                    min_event_time = (*it_timer)->m_when;
+                    min_event_time_set= true;
+                }
+                had_active_object= true;
+            }
+
+            if (min_event_time_set)
+            { // we have at a minimal event time, so (re)compute the timeout value:
+                MilliTime delta= (min_event_time - current_time);
+                long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
+                if (delta_ms <= 0L)
+                {
+                    timeout= 0L;
+                }
+                else
+                {
+                    timeout= delta_ms + (delta_ms<5 ? 1 : 3);
+                }
+            }
+        }
+
+        // step 2 : poll
+        ODOUT(" poll timeout is " << timeout);
+        {
+            MilliTime current_time;
+            get_current_monotonic_time(current_time);
+            ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
+        }
+        int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
+
+        ODOUT("poll -> " << poll_result);
+        {
+            MilliTime current_time;
+            get_current_monotonic_time(current_time);
+            ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
+        }
+
+        if (poll_result < 0)
+        {
+            //TODO poll error handling (signals ?!)
+        }
+
+        // step 3 : mark
+
+        // step 3.1: mark io objects (if necessary)
+        if (poll_result > 0)
+        {
+            for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
+                itPollItem != poll_data.m_poll_vector.end();
+                ++itPollItem)
+            {
+                ODOUT("  fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
+                if ( 0 == (itPollItem->revents))
+                { // preliminary continuation if nothing is to handle for this item(/fd)...
+                    continue;
+                }
+                if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
+                {
+                    IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
+                    if (io && io->m_read_fd==itPollItem->fd)
+                    {
+                        FODOUT(io,"marked for reading");
+                        io->m_marked_for_reading= true;
+                    }
+                }
+                if ( 0!= (itPollItem->revents & POLLOUT))
+                {
+                    IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
+                    if (io && io->m_write_fd==itPollItem->fd)
+                    {
+                        io->m_marked_for_writing= true;
+                    }
+                }
+                if ( 0!= (itPollItem->revents & POLLERR))
+                {
+                    IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
+                    if (0!= (itPollItem->events & POLLOUT))
+                    {
+                        if (io && io->m_write_fd==itPollItem->fd)
+                        {
+                            io->m_marked_for_writing= false;
+                            //io->close( Direction::out );
+                        }
+                    }
+                }
+                // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
+            }
+        }
+
+        //Step 3.2: mark timer objects
+        {
+            MilliTime current_time;
+
+            get_current_monotonic_time(current_time);
+            ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
+
+            for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
+                it_timer != internal_io::g_timer_list().end();
+                ++ it_timer)
+            {
+                ODOUT("  check timer " << *it_timer);
+                if (! *it_timer) continue; // skip NULL entries
+                if (! (*it_timer)->m_active) continue; // skip if not enabled
+                if ( (*it_timer)->m_when <= current_time)
+                {
+                    ODOUT("   ==> MARK");
+                    (*it_timer)->m_marked = true;
+                }
+            }
+        }
+
+
+        // step 4 : execute
+
+        // step 4.1: execute io
+        ODOUT("execute stage");
+        for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
+            it_io != internal_io::g_io_list().end();
+            ++ it_io)
+        {
+            ODOUT("  check obj " << *it_io);
+            if (NULL == *it_io) continue;
+            if ((*it_io)->m_marked_for_writing)
+            {
+                FODOUT((*it_io),"exec doWrite");
+                (*it_io)->doWrite();
+                if ((*it_io) == NULL) continue; // skip remaining if we lost the object
+                if ((*it_io)->m_errno)
+                {
+                    continue;
+                }
+            }
+            if ((*it_io)->m_marked_for_reading)
+            {
+                FODOUT((*it_io),"exec doRead");
+                (*it_io)->doRead();
+                if ((*it_io) == NULL) continue; // skip remaining if we lost the object
+                if ((*it_io)->m_errno)
+                {
+                    continue;
+                }
+            }
+        }
+
+        // step 4.2: execute timer events
+        {
+            for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
+                it_timer != internal_io::g_timer_list().end();
+                ++ it_timer)
+            {
+                if (! *it_timer) continue; // skip NULL entries
+                if (! (*it_timer)->m_active) continue; // skip if not enabled
+                if (! (*it_timer)->m_marked) continue; // skip if not marked
+
+                // reset the mark and deactivate object now since the execute() method might activate it again
+                (*it_timer)->m_marked= false;
+                (*it_timer)->m_active= false;
+
+                // now execute the event:
+                (*it_timer)->execute();
+            }
+        }
+
+    } // eo try
+    catch(...)
+    {
+        // clean up our counter
+        --m_count_active_steps;
+        // and forward the exception
+        throw;
+    }
+
+    if ( 0 == --m_count_active_steps)
+    {
+        internal_io::g_io_list().clean_list();
+        internal_io::g_timer_list().clean_list();
+    }
+
+    return had_active_object;
+} // eo Backend::doOneStep
+
+
+/**
+ * enters a backend loop.
+ *
+ * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
+ * active objects (io objects or timer objects).
+ */
+void Backend::run()
+{
+    ++m_count_active_loops;
+    do
+    {
+        try
+        {
+            if (!doOneStep(c_max_poll_wait))
+            {
+                // stop if there are no more active objects.
+                stop();
+            }
+        }
+        catch(...)
+        {
+            // clean up our counter
+            --m_count_active_loops;
+            // and forward the exception
+            throw;
+        }
+    } 
+    while (0 == m_count_stop_requests);
+    --m_count_active_loops;
+    --m_count_stop_requests;
+} // eo Backend::run
+
+
+/**
+ * @brief stops the latest loop currently run by Backend::run().
+ * @see Backend::run()
+ */
+void Backend::stop()
+{
+    if (m_count_active_loops)
+    {
+        ++m_count_stop_requests;
+    }
+} // eo Backend::stop()
+
+
+
+} // eo namespace SimpleIo
+} // eo namespace I2n
diff --git a/asnycio/simpleio.hpp b/asnycio/simpleio.hpp
new file mode 100644 (file)
index 0000000..d3727be
--- /dev/null
@@ -0,0 +1,463 @@
+/** 
+ * @file
+ * @brief simple basic IO handling.
+ *
+ * @copyright &copy; Copyright 2007-2008 by Intra2net AG
+ * @license commercial
+ * @contact info@intra2net.com
+ *
+ * Deals with POSIX file descriptors; provides an additional abstraction
+ * level above select() or poll() interface.
+ * Also provides basic functionality for dealing with timer events.
+ */
+
+#ifndef __I2N_SIMPLEIO_HPP__
+#define __I2N_SIMPLEIO_HPP__
+
+#include <string>
+#include <list>
+#include <set>
+
+#include <pointer_func.hpp>
+
+#include <boost/signal.hpp>
+#include <boost/shared_ptr.hpp>
+
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+
+/*
+ * forward declarations
+ */
+class Backend;
+class IOImplementation;
+
+/*
+ * end of forward declarations
+ */
+
+
+/**
+ * direction for io operations.
+ */
+struct Direction
+{
+    enum _Direction
+    {
+        unspecified= 0,
+        in = 1,
+        out = 2,
+        both= 3
+    } m_direction;
+
+    Direction( _Direction direction = unspecified) : m_direction(direction) {}
+
+    operator _Direction () const { return m_direction; }
+}; // eo struct IODirection;
+
+
+/**
+ * structure for storing (a point in time as) seconds and milliseconds.
+ */
+struct MilliTime
+{
+    long mt_sec;
+    long mt_msec;
+
+    MilliTime(long sec=0, long msec=0);
+
+    void set(long sec, long msec=0);
+
+    inline long long get_milliseconds() const
+    {
+        return ((long long)mt_sec * 1000L + mt_msec);
+    } // eo get_milliseconds
+
+    void normalize();
+
+    bool operator < (MilliTime& other);
+    bool operator == (MilliTime& other);
+
+    MilliTime& operator -= (const MilliTime& lhs);
+    MilliTime& operator += (const MilliTime& lhs);
+
+}; // eo struct MilliTime
+
+
+inline MilliTime operator + (const MilliTime& rhs, const MilliTime& lhs)
+{
+    MilliTime t(rhs);
+    return t+= lhs;
+} // eo operator + (const MilliTime& rhs, const MilliTime lhs)
+
+inline MilliTime operator - (const MilliTime& rhs, const MilliTime& lhs)
+{
+    MilliTime t(rhs);
+    return t-= lhs;
+} // eo operator - (const MilliTime& rhs, const MilliTime lhs)
+
+
+inline bool operator <= (MilliTime& rhs, MilliTime& lhs)
+{
+    return (rhs<lhs) || (rhs==lhs);
+} // eo operator <= (MilliTime& rhs, MilliTime& lhs)
+
+
+
+/**
+ * base class for time based events (timer events).
+ *
+ * consists basically of a point in time (when the event should be executed) and a method
+ * which will be called when the given time is reached (or passed).
+ */
+class TimerBase
+{
+        friend class Backend;
+    public:
+        TimerBase();
+        virtual ~TimerBase();
+
+        bool active() const { return m_active; } 
+
+        MilliTime getWhenTime() const {return m_when;}
+        MilliTime getRealWhenTime() const;
+
+    protected:
+
+        void setWhenTime(long sec, long msec=0);
+        void setWhenTime(const MilliTime& mt);
+
+        void setDeltaWhenTime(long sec, long msec=0);
+        void setDeltaWhenTime(const MilliTime& mt);
+
+        void activate(bool _active= true);
+        void deactivate() { activate(false); }
+
+        virtual void execute();
+
+    private:
+        /// @a true when the event is active.
+        bool m_active;
+        /// point in time when the event should be executed
+        MilliTime m_when;
+        /// mark from backend cycle that the event has to be executed.
+        bool m_marked;
+}; // eo class TimerBase
+
+
+
+/**
+ * base class for filter classes.
+ *
+ * filter objects can be "plugged" into IO objects for manipulating the data streams.
+ * (for example: one could make a filter which handles the telnet protocol and plug it into a
+ * socket io object.)
+ *
+ * @note filter object can be used only by one io object.
+ */
+class FilterBase
+: virtual public SharedBase
+{
+        friend class IOImplementation;
+    public:
+        typedef boost::shared_ptr< FilterBase > PtrType;
+    public:
+        FilterBase();
+        virtual ~FilterBase() {};
+
+    protected:
+
+        virtual std::string filterIncomingData(const std::string& data)= 0;
+        virtual std::string filterOutgoingData(const std::string& data)= 0;
+
+        virtual void endOfIncomingData();
+        virtual void reset();
+
+    protected:
+
+        void injectIncomingData(const std::string& data);
+        void injectOutgoingData(const std::string& data);
+
+    private:
+        /// pointer to the io object which uses this filter:
+        IOImplementation *m_io;
+}; // eo class FilterBase
+
+typedef FilterBase::PtrType FilterBasePtr;
+
+
+/**
+ * identity filter; does nothing with the data (in other words: it's useless ;-) ).
+ */
+class FilterIdentity : public FilterBase
+{
+    protected:
+        virtual std::string filterIncomingData(const std::string& data) { return data; }
+        virtual std::string filterOutgoingData(const std::string& data) { return data; }
+}; // eo class FilterIdentity
+
+
+/**
+ * @brief null filter; deletes everything it receives.
+ *
+ * usefull when output from a subprocess (like stderr) should be ignored...
+ */
+class FilterNull : public FilterBase
+{
+    protected:
+        virtual std::string filterIncomingData(const std::string& data) { return std::string(); }
+        virtual std::string filterOutgoingData(const std::string& data) { return std::string(); }
+}; // eo FilterNull
+
+
+/**
+ * the base class of the IO classes.
+ *
+ * provides the functionality to read from a file desriptor and write to a file descriptor (which can be
+ * identical (like for socket io), but also can be different (like pipes from/to a process)).
+ * The data stream can be filtered through plugged filter objects which are building a filter chain.
+ * Incoming data is filtered forward through the chain; outgoing data is filtered backward through the chain.
+ * (So the first filter is the one which modifies the data closest to the connections).
+ *
+ * @note the functionality is provided in conjunction with the @a Backend class which handles parts of
+ * the low level IO.
+ *
+ * @note this is a base class; it provides most "interesting" functionality in the protected section only.
+ * This way, derived classes can decide which of that functionality they want to export in their own public
+ * interfaces and which to keep hidden.
+ */
+class IOImplementation
+: public boost::signals::trackable
+, virtual public SharedBase
+{
+        friend class Backend;
+        friend class FilterBase;
+
+    public:
+
+        typedef std::list< FilterBasePtr > FilterChain;
+
+        typedef boost::signal< void() > SignalType;
+
+        typedef boost::shared_ptr< IOImplementation > PtrType;
+
+    public:
+        IOImplementation(int read_fd=-1, int write_fd=-1);
+        virtual ~IOImplementation();
+
+        virtual void close(Direction direction = Direction::both);
+
+        virtual bool wantRead();
+        virtual bool wantWrite();
+
+        virtual bool opened() const;
+        virtual bool eof() const;
+        virtual bool writable() const;
+        virtual bool empty() const;
+
+    protected:
+
+        void addFilter(FilterBasePtr filter);
+        void removeFilter(FilterBasePtr);
+
+
+        void lowSend(const std::string& data);
+
+        std::string::size_type getOutputBufferSize() const { return m_output_buffer.size(); }
+
+        void setWriteFd(int fd);
+        void setReadFd(int fd);
+
+        inline int readFd() const
+        {
+            return m_read_fd;
+        }
+
+        inline int writeFd() const
+        {
+            return m_write_fd;
+        }
+
+        inline bool isMarkedForReading() const { return m_marked_for_reading; }
+        inline bool isMarkedForWriting() const { return m_marked_for_writing; }
+
+
+    protected:
+        virtual void doRead();
+        virtual void doWrite();
+
+        void resetReadMark()  { m_marked_for_reading= false; }
+        void resetWriteMark() { m_marked_for_writing= false; }
+
+        void injectIncomingData(FilterBasePtr from_filter, const std::string& _data);
+        void injectOutgoingData(FilterBasePtr from_filter, const std::string& _data);
+
+    protected:
+        /// last error number
+        int m_errno;
+        /// the input buffer (i.e. the data read from @a m_read_fd)
+        std::string m_input_buffer;
+
+        /// the chain of filter which are applied to the data received are the data which should be send.
+        FilterChain m_filter_chain;
+
+        /// signal which is fired when end of file is detected
+        SignalType m_signal_eof;
+        /// signal which is fired when write is no longer possible
+        SignalType m_signal_not_writable;
+        /// signal which is fired when new data was read
+        SignalType m_signal_read;
+        /// signal which is fired when data was written
+        SignalType m_signal_write;
+
+        /// end of file (on @a m_read_fd) detected (used additionally when m_read_fd is valid)
+        bool m_eof;
+
+        /// unable-to-write (on @a m_write_fd) detected (used additionally when m_write_fd is valid)
+        bool m_not_writable;
+
+    private:
+        /// the file descriptor to read from (-1 if none is given)
+        int m_read_fd;
+        /// the file descriptor to write to (-1 if none is given)
+        int m_write_fd;
+        /// output buffer; contains the data which needs to be written.
+        std::string m_output_buffer;
+
+    private:
+        /// @a true when data is available to be read
+        bool m_marked_for_reading;
+        /// @a true when data can be written
+        bool m_marked_for_writing;
+
+}; // eo class IOImplementation
+
+
+
+/**
+ * same as IOImplementation, but makes fd access functions public.
+ */
+class IOImplementation2 : public IOImplementation
+{
+        typedef IOImplementation inherited;
+    public:
+        IOImplementation2(int read_fd=-1, int write_fd=-1) : inherited(read_fd, write_fd) {}
+
+        void setWriteFd(int fd) { inherited::setWriteFd(fd); }
+        void setReadFd(int fd)  { inherited::setReadFd(fd); }
+
+        int readFd() const { return inherited::readFd(); }
+        int writeFd() const { return inherited::writeFd(); }
+
+}; // eo class IOImplementation2
+
+
+/**
+ * provides sending data and receiving data via a signal.
+ *
+ * @note the received data is passed as parameter to the signal and no longer stored in the received buffer.
+ */
+class SimpleIO : public IOImplementation
+{
+        typedef IOImplementation inherited;
+    public:
+        SimpleIO(int read_fd=-1, int write_fd=-1);
+        virtual ~SimpleIO();
+
+        void sendString(const std::string& data);
+
+        boost::signal<void(const std::string&)> signal_received_string;
+
+    private:
+
+        void slotReceived();
+}; // eo class SimpleIO
+
+
+/**
+ * provides sending data and receiving data via a signal.
+ *
+ * @note the received data is passed as parameter to the signal and no longer stored in the received buffer.
+ */
+class SimpleIO2 : public IOImplementation2
+{
+        typedef IOImplementation2 inherited;
+    public:
+        SimpleIO2(int read_fd=-1, int write_fd=-1);
+        virtual ~SimpleIO2();
+
+        void sendString(const std::string& data);
+
+        boost::signal<void(const std::string&)> signal_received_string;
+
+    private:
+
+        void slotReceived();
+}; // eo class SimpleIO2
+
+
+/**
+ * provides the backend for io handling.
+ *
+ * This (singleton) object provides the management of io events. It collects all wishes for reading/writing
+ * from the io objects and information from the timer events.
+ * It poll()s for the events and distributes them to the objects,
+ *
+ * The class provides the possibility for executing one io cycle (/step) or to run a backend loop.
+ *
+ * @note the Backend class needs to be a friend of IOImplementation since it accesses private members
+ * of IOImplementation while performing the io cycles.
+ */
+class Backend
+{
+    public:
+        typedef std::set< int > FdSetType;
+
+    public:
+
+        bool doOneStep(int ms_timeout= -1);
+        void run();
+        void stop();
+
+    protected:
+        Backend();
+        Backend(const Backend& other);
+        ~Backend();
+
+    protected:
+
+        /// the number of currently active backend loops
+        int m_count_active_loops;
+        /// the number of pending stop requests (where each one should exit one active backend loop)
+        int m_count_stop_requests;
+
+        /// the number of currently active backend cycles(/ steps)
+        static int m_count_active_steps;
+
+    public:
+        static Backend* getBackend();
+
+    protected:
+        /// pointer to the active backend (which is delivered by Backend::getBackend)
+        static Backend* g_backend;
+}; // eo class Backend
+
+
+
+/*
+** tool functions:
+*/
+
+
+void get_current_real_time(MilliTime& mt);
+void get_current_monotonic_time(MilliTime& mt);
+
+
+} // eo namespace SimpleIo
+} // eo namespace I2n
+
+#endif
diff --git a/asnycio/simplepipe.cpp b/asnycio/simplepipe.cpp
new file mode 100644 (file)
index 0000000..4eba4bc
--- /dev/null
@@ -0,0 +1,98 @@
+/** @file
+ *
+ * (c) Copyright 2007 by Intra2net AG
+ * 
+ * info@intra2net.com
+ */
+
+#include "simplepipe.hpp"
+
+#include <functional>
+#include <boost/bind.hpp>
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
+
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+
+/*
+ * Implementation of SimplePipe
+ */
+
+SimplePipe::SimplePipe()
+{
+   m_signal_read.connect(boost::bind(&SimplePipe::slotReceived,this));
+} // eo SimplePipe::SimplePipe()
+
+
+SimplePipe::~SimplePipe()
+{
+} // eo SimplePipe::~SimplePipe()
+
+
+/**
+ * makes a pipe.
+ * This method connects itself with a newly created peer object with a bidirectional pipe.
+ * @return the peer pipe object.
+ */
+bool SimplePipe::makePipe(SimplePipe& peer)
+{
+   close(); // just in case...
+
+   int fds[2];
+
+   int res= ::socketpair( AF_UNIX, SOCK_STREAM, 0, fds);
+
+   if (res)
+   {
+      m_errno= errno;
+      return false;
+   }
+   else
+   {
+      m_errno= 0;
+   }
+
+   peer.close(); // just in case
+
+   setWriteFd(fds[0]);
+   setReadFd(fds[0]);
+
+   peer.setWriteFd(fds[1]);
+   peer.setReadFd(fds[1]);
+
+   return true;
+} // eo SimplePipe.:makePipe()
+
+
+/**
+ * sends a string through the pipe.
+ * @param data the data which should be sent to the other side.
+ */
+void SimplePipe::sendString(const std::string& data)
+{
+   lowSend(data);
+} // eo SimplePipe::sendString(const std::string&)
+
+
+/**
+ * emits the signal signalReceived with the received data.
+ * This slot is connected to IOImplementation::m_signal_read.
+ */
+void SimplePipe::slotReceived()
+{
+   std::string data;
+   data.swap(m_input_buffer);
+   signal_received_string(data);
+} // eo SimplePipe::slotReceived()
+
+
+} // eo namespace SimpleIo
+} // eo namespace I2n
diff --git a/asnycio/simplepipe.hpp b/asnycio/simplepipe.hpp
new file mode 100644 (file)
index 0000000..80b7cc6
--- /dev/null
@@ -0,0 +1,44 @@
+/** @file
+ *
+ * (c) Copyright 2007 by Intra2net AG
+ * 
+ * info@intra2net.com
+ */
+
+#ifndef _SIMPLEIO_SIMPLEPIPE_HPP_
+#define _SIMPLEIO_SIMPLEPIPE_HPP_
+
+#include "simpleio.hpp"
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+
+class SimplePipe : public IOImplementation
+{
+   public:
+      SimplePipe();
+      virtual ~SimplePipe();
+
+      bool makePipe(SimplePipe& peer);
+
+      void sendString(const std::string& data);
+
+      boost::signal<void(const std::string&)> signal_received_string;
+
+   protected:
+
+   private:
+
+      void slotReceived();
+
+}; // eo SimplePipe
+
+
+} // eo namespace SimpleIo
+} // eo namespace I2n
+
+
+#endif
diff --git a/asnycio/simpleprocess.cpp b/asnycio/simpleprocess.cpp
new file mode 100644 (file)
index 0000000..8e3754d
--- /dev/null
@@ -0,0 +1,817 @@
+/** @file
+ *
+ *
+ * (c) Copyright 2007-2008 by Intra2net AG
+ *
+ * info@intra2net.com
+ */
+
+//#define NOISEDEBUG
+
+#include "simpleprocess.hpp"
+
+#include <iterator>
+#include <algorithm>
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <errno.h>
+#include <signal.h>
+#include <sys/wait.h>
+
+#include <filefunc.hxx>
+
+
+#ifdef NOISEDEBUG
+#include <iostream>
+#include <iomanip>
+#define DOUT(msg) std::cout << msg << std::endl
+#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
+#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
+#else
+#define DOUT(msg) do {} while (0)
+#define FODOUT(obj,msg) do {} while (0)
+#define ODOUT(msg) do {} while (0)
+#endif
+
+
+namespace
+{
+
+using namespace I2n::SimpleIo;
+
+/**
+ * local configuration values
+ */
+namespace config
+{
+
+   /// the capacity of the child status list (/ vector)
+   const unsigned int pid_pool_capacity= 512;
+
+} // eo namespace config
+
+
+
+/// the previous handler for the child signal (SIGCHLD)
+void (*oldChildHandler)(int) = NULL;
+
+/// method pointer for activating process manager
+void (ProcessManager::*_activate_manager)();
+
+PidStateList pending_pid_states;
+
+
+/**
+ * signal handler for child signal (SIGCHLD)
+ * @param sig the signal number as provided by the OS
+ */
+void handleSigChild(int sig)
+{
+   int status;
+   pid_t pid;
+   while ( (pid = waitpid(-1,&status,WNOHANG)) > 0)
+   {
+      pending_pid_states.push_back( PidStatePair(pid,status) );
+   }
+   if (_activate_manager)
+   {
+      // tricky way to access a protected method without being a (official) friend:
+      ( ProcessManager::getInstance()->*_activate_manager)();
+   }
+   //TODO: ?
+   signal(sig,handleSigChild);
+} // eo handleSigChild
+
+
+namespace process
+{
+
+typedef std::pair<pid_t, ProcessImplementation*> PidProcPair;
+typedef std::list< PidProcPair > PidProcList;
+
+
+template< typename F, typename S >
+struct CmpFirst
+{
+   F _f;
+   CmpFirst ( F f ) : _f(f) {}
+   bool operator () ( const std::pair<F,S>& v ) const { return v.first == _f; }
+}; // eo struct CmpFirst
+
+
+std::list<ProcessImplementation*> g_process_list;
+PidProcList g_pid_list;
+
+
+void addProcessInstance( ProcessImplementation* obj )
+{
+   g_process_list.push_back(obj);
+} // eo addProcessInstance(ProcessImplementation*)
+
+
+void removeProcessInstance( ProcessImplementation* obj )
+{
+   // remove obj from list
+   g_process_list.remove(obj);
+   // clear pointers in pid list
+   for(PidProcList::iterator it= g_pid_list.begin();
+      it != g_pid_list.end();
+      ++it)
+   {
+      if (it->second == obj)
+      {
+         it->second= NULL;
+      }
+   }
+} // eo removeProcessInstance(ProcessImplementation*)
+
+
+void addChildProcess( pid_t pid, ProcessImplementation* obj)
+{
+   g_pid_list.push_back ( PidProcPair(pid,obj) );
+} // eo addChildProcess(pid_t,ProcessImplementation*)
+
+
+void removeChildProcess ( pid_t pid, ProcessImplementation* obj)
+{
+   PidProcList::iterator it= std::find(
+      g_pid_list.begin(), g_pid_list.end(),
+      PidProcPair(pid,obj));
+   if (it != g_pid_list.end())
+   {
+      g_pid_list.erase(it);
+   }
+} // eo removeChildProcess(pid_t,ProcessImplementation*)
+
+
+bool findChildProcess ( pid_t pid, ProcessImplementation* & obj )
+{
+   PidProcList::iterator it = std::find_if(
+      g_pid_list.begin(), g_pid_list.end(),
+      CmpFirst<pid_t,ProcessImplementation*>(pid) );
+   if (it == g_pid_list.end())
+   {
+      return false;
+   }
+   obj = it->second;
+   return true;
+} // eo findChildProcess(pid_t,ProcessImplementation*&)
+
+
+} // eo namespace process
+
+
+
+
+
+/*
+** misc tools
+*/
+
+
+/**
+ * convenience tool for closing file descriptors...
+ */
+struct FdCloser
+{
+   int m_fd;
+
+   FdCloser(int fd=-1) : m_fd(fd) {}
+
+   ~FdCloser()
+   {
+      if (m_fd >= 0) ::close(m_fd);
+   }
+
+   void release() { m_fd= -1; }
+
+}; // eo struct FdCloser
+
+
+
+} // eo namespace <anonymous>
+
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+
+/*
+ * global functions
+ */
+
+/**
+ * installs the handler for the child signal (SIGCHLD).
+ * Installing this handler is mandatory for the process subsystem to work correctly.
+ * @return @a true iff the child handler is successfully installed.
+ */
+bool installChildHandler()
+{
+   if (oldChildHandler)
+   {
+      // already installed
+      return true;
+   }
+   if (! ProcessManager::getInstance() )
+   {
+      // we need an instance of the process manager
+      return false;
+   }
+   pending_pid_states.reserve( config::pid_pool_capacity );
+   oldChildHandler = signal( Signal::CHLD, handleSigChild );
+   if (oldChildHandler == SIG_ERR)
+   {
+      oldChildHandler= NULL;
+      return false;
+   }
+   return true;
+} // eo installChildHandler
+
+
+/**
+ * uninstalls the child handler.
+ * @return @a true iff the old child handler is reestablished.
+ */
+bool restoreChildHandler()
+{
+   if (!oldChildHandler)
+   {
+      return false;
+   }
+   void(*res)(int) = signal( Signal::CHLD, oldChildHandler);
+
+   if (res == SIG_ERR)
+   {
+      return false;
+   }
+   oldChildHandler= NULL;
+   return true;
+} // eo restoreChildHandler
+
+
+
+
+/*
+ * Implementation of ProcessImplementation
+ */
+
+IOImplementation2* ProcessImplementation::_StderrOnStdout   = ((IOImplementation2*) 1);
+IOImplementation2* ProcessImplementation::_UseParentsStderr = ((IOImplementation2*) 0);
+
+
+/**
+ * @brief constructor for the process implementation.
+ *
+ * the constructor takes the path to the executable and (initial) cli arguments.
+ *
+ * @param path path to the executable.
+ * @param args initial command line arguments.
+ */
+ProcessImplementation::ProcessImplementation(
+   const std::string& path,
+   const std::vector<std::string>& args
+   )
+: IOImplementation(-1,-1)
+, m_path(path)
+, m_nice_inc(0)
+, m_create_new_session(false)
+, m_pid(0)
+, m_state(ProcessState::stopped)
+, m_exit_code(0)
+{
+   m_args.push_back(path);
+   std::copy( args.begin(), args.end(), std::back_inserter(m_args) );
+   process::addProcessInstance(this);
+} // eo ProcessImplementation::ProcessImplementation(const std::string&)
+
+
+ProcessImplementation::~ProcessImplementation()
+{
+   if (m_pid > 0 && m_state!=ProcessState::stopped)
+   {
+      stopProcess(true);
+   }
+   process::removeProcessInstance(this);
+} // eo ProcessImplementation::~ProcessImplementation()
+
+
+void ProcessImplementation::close(Direction direction)
+{
+   inherited::close(direction);
+   if (!inherited::opened() &&  (m_state != ProcessState::stopped) )
+   {
+      stopProcess(false);
+   }
+} // eo ProcessImplementation::close(Direction)
+
+
+/**
+ * returns an object for adding new arguments to the argument list.
+ * @return the adder object.
+ */
+PushBackFiller<std::string, std::vector > ProcessImplementation::getArgAdder()
+{
+   return PushBackFiller<std::string, std::vector >(m_args);
+} // eo ProcessImplementation::getArgAdder()
+
+
+/**
+ * @brief set if the process should create a new session when started.
+ * @param enable determine if the process should start a new session.
+ * @return @a true iff the value of enable was accepted.
+ *
+ * If the process is already running, a new value is not accepted.
+ */
+bool ProcessImplementation::setCreateNewSession( bool enable )
+{
+   if (m_state != ProcessState::stopped and enable != m_create_new_session)
+   {
+      return false;
+   }
+   m_create_new_session= enable;
+   return true;
+} // eo ProcessImplementation::setCreateNewSession(bool);
+
+
+/**
+ * @brief sets a new nice increment.
+ * @param nice the desired nice increment.
+ * @return @a true if the value was accepted and - in case the process was already started -
+ * the nice value was successfully changed.
+ */
+bool ProcessImplementation::setNice(int nice)
+{
+   errno= 0;
+   if (m_state != ProcessState::stopped)
+   {
+      int delta= m_nice_inc + nice;
+      m_nice_inc= nice;
+      int res= ::nice(delta);
+      if (res == -1 and  errno !=0 )
+      {
+         return false;
+      }
+   }
+   else
+   {
+      m_nice_inc = nice;
+   }
+   return true;
+} // eo ProcessImplementation::setNice(int)
+
+
+/**
+ * @brief sets the work dir the process should be started with.
+ * @param workdir the workdir
+ * @return @a true if the new workdir was accepted.
+ *
+ * The method will return @a false if the process is already started.
+ * The workdir can only be set before the process is started.
+ */
+bool ProcessImplementation::setWorkDir(const std::string& workdir)
+{
+   if ( m_state != ProcessState::stopped and workdir != m_workdir)
+   {
+      return false;
+   }
+   if (not workdir.empty())
+   {
+       I2n::Stat stat(workdir);
+       if (not stat or not stat.is_directory())
+       {
+           return false;
+       }
+   }
+   m_workdir= workdir;
+   return true;
+} // eo ProcessImplementation::setWorkDir(const std::string&)
+
+
+/**
+ * @brief sets new arguments for the process (the path to the binary is kept).
+ *
+ * @param args the new cli arguments for the subprocess (replacing the old ones).
+ */
+void ProcessImplementation::resetArgs( const std::vector< std::string >& args )
+{
+   if (m_args.size() > 1)
+   {
+      m_args.erase( ++m_args.begin(), m_args.end());
+   }
+   std::copy( args.begin(), args.end(), std::back_inserter(m_args) );
+} // eo ProcessImplementation::resetArgs(const std::vectors< std::string >&)
+
+
+/**
+ * starts the new process.
+ * provides pipes for sending data to/ receiving data from the new process.
+ * Basically forks and execs the new process.
+ *
+ * @param stderr if not NULL the given object will be connected to stderr of the new process.
+ *  The object can then be used for reading the data from the process' stderr; but cannot be written to.
+ * (The object will be closed if it was open).
+ * If the constant @a ProcessImplementation::StderrOnStdout is passed then stderr of the new process will
+ * be written to the same channel as stdout (i.e. can be read from the process class instance like the
+ * normal output).
+ * If NULL then the stderr channel from the parent process will also be used by the child.
+ * @return @a true iff the new subprocess started.
+ */
+bool ProcessImplementation::startProcess( IOImplementation2 *stderr )
+{
+   bool stderr2stdout= false;
+   m_errno = 0;
+   m_input_buffer.clear();
+   if (m_pid > 0 && m_state != ProcessState::stopped)
+   {
+      // process still/already running...
+      return false;
+   }
+   m_exit_code= 0;
+
+   if (stderr == _StderrOnStdout)
+   {
+      stderr2stdout= true;
+      stderr= NULL;
+   }
+
+   int to_process_pipe[2];
+   int from_process_pipe[2];
+   int from_process_stderr_pipe[2]= { -1, -1 };
+
+   if ( ::pipe(to_process_pipe) )
+   {
+      m_errno= errno;
+      return false;
+   }
+   FdCloser closeTo0( to_process_pipe[0] );
+   FdCloser closeTo1( to_process_pipe[1] );
+   if ( ::pipe (from_process_pipe) )
+   {
+      m_errno= errno;
+      return false;
+   }
+   FdCloser closeFrom0( from_process_pipe[0] );
+   FdCloser closeFrom1( from_process_pipe[1] );
+   if (stderr)
+   {
+      if (stderr->opened()) stderr->close();
+      if ( ::pipe (from_process_stderr_pipe) )
+      {
+         m_errno= errno;
+         return false;
+      }
+   }
+   FdCloser closeFromErr0( from_process_stderr_pipe[0] );
+   FdCloser closeFromErr1( from_process_stderr_pipe[1] );
+
+   m_pid = ::fork();
+
+   if ( m_pid == (pid_t)-1 )
+   {
+      m_errno= errno;
+      m_pid= 0;
+      // error; something went wrong
+      return false;
+   }
+   else if (m_pid > 0)
+   {
+      // we are in the parent part
+
+      // keep the fd's we need and (later) close the other ones:
+      closeTo1.release(); // don't close this fd!
+      setWriteFd(to_process_pipe[1]);
+      closeFrom0.release(); // don't close this fd!
+      setReadFd(from_process_pipe[0]);
+
+      if (stderr)
+      {
+         closeFromErr0.release(); // don't close this fd!
+         stderr->setReadFd(from_process_stderr_pipe[0]);
+      }
+
+      m_state= ProcessState::running;
+      process::addChildProcess(m_pid,this);
+      DOUT(" started child with pid " << m_pid);
+      return true;
+   }
+   else // pid > 0
+   {
+      // we are in the child part
+
+      // dup the fd's for stdin/-out/-err into place:
+      ::dup2(to_process_pipe[0],0);
+      ::dup2(from_process_pipe[1],1);
+      if (stderr)
+      {
+         ::dup2(from_process_stderr_pipe[1],2);
+         ::close(from_process_stderr_pipe[0]); ::close(from_process_stderr_pipe[1]);
+      }
+      else if (stderr2stdout)
+      {
+         ::dup2(from_process_pipe[1],2);
+      }
+      // close what we don't need:
+      ::close(to_process_pipe[0]); ::close(to_process_pipe[1]);
+      ::close(from_process_pipe[0]); ::close(from_process_pipe[1]);
+
+      // set workdir if requested:
+      if (not m_workdir.empty())
+      {
+         int r= ::chdir( m_workdir.c_str() );
+         if (r !=0 )
+         {
+            //TODO?
+            exit(255);
+         }
+      }
+
+      //
+      // collect args:
+      char **argv= new char*[m_args.size()+1];
+      int i=0;
+      for(std::vector<std::string>::iterator it= m_args.begin();
+         it != m_args.end();
+         ++it,++i)
+      {
+         argv[i]= strdup( it->c_str() );
+      }
+      argv[i]= NULL;
+      // update nice level:
+      if (m_nice_inc)
+      {
+         nice(m_nice_inc);
+      }
+      // create a new session id if requested:
+      if (m_create_new_session)
+      {
+         setsid();
+      }
+      // execute:
+      execv(m_path.c_str(), argv);
+      // exit if exec failed
+      exit(255);
+      //cleanup! ... just joking; we exec or we exit, in either case the system cleans
+      // everything which needs to be cleaned up.
+   }
+   return false; // keep the compiler happy...
+} // eo ProcessImplementation::startProcess()
+
+
+/**
+ * convenience method for starting the child process.
+ * This method uses predefined enum values for the stderr handling mode.
+ *
+ * @param stderr_mode the desired stderr mode.
+ * @return @a true iff the child process was created.
+ */
+bool ProcessImplementation::startProcess( ProcessImplementation::StderrMode stderr_mode )
+{
+   switch (stderr_mode)
+   {
+      case UseParentsStderr:
+         return startProcess( _UseParentsStderr );
+
+      case StderrOnStdout:
+         return startProcess( _StderrOnStdout );
+   }
+   return false;
+}; // eo ProcessImplementation::startProcess(ProcessImplementation::StderrMode)
+
+
+/**
+ * stops the process.
+ *
+ * @todo think about a more intelligent handling...
+ */
+void ProcessImplementation::stopProcess(bool force)
+{
+   // TODO: do it somewhat more intelligent?!
+   if (force)
+   {
+      kill(Signal::KILL);
+      //TODO: set running state?
+   }
+   else
+   {
+      kill(Signal::TERM);
+   }
+} // eo ProcessImplementation::stop(bool)
+
+
+
+/**
+ * sends a signal to the child process.
+ * @param signal the Signal which should be send.
+ * @return @a true if the signal was sent; @a false if an error occured.
+ */
+bool ProcessImplementation::kill(Signal signal)
+{
+   m_errno = 0;
+   if (m_pid == 0 || m_pid == (pid_t)-1)
+   {
+      m_errno= ESRCH;
+      return false;
+   }
+   int res = ::kill(m_pid, signal);
+   if (res < 0)
+   {
+      m_errno= errno;
+      return false;
+   }
+   if (signal == Signal::CONT && m_state == ProcessState::suspended)
+   {
+      m_state = ProcessState::running;
+   }
+   return true;
+} // eo ProcessImplementation::kill(Signal)
+
+
+
+/**
+ * set a new child state with information gobbled by the child signal handler.
+ *
+ * @note This method should only be called by the process manager!
+ *
+ * @param pid the pid of the child process.
+ * @param status the new status value (as delivered by waitpid())
+ */
+void ProcessImplementation::setChildState(pid_t pid, int status)
+{
+   DOUT("setChildState("<<pid<<","<<status<<")   pid="<<m_pid);
+   if (pid != m_pid)
+   {
+      // old child... ignore!
+      return;
+   }
+   if (WIFSTOPPED(status))
+   {
+      DOUT("stopped");
+      // stopped:
+      int stopsignal = WSTOPSIG(status);
+      // make stop signal available in exit_code:
+      m_exit_code= (stopsignal << 8);
+      m_state= ProcessState::suspended;
+      return;
+   }
+#ifdef WIFCONTINUED
+   if (WIFCONTINUED(status))
+   {
+      DOUT("continued");
+      // continued after a stop:
+      m_state= ProcessState::running;
+      return;
+   }
+#endif
+   if (WIFEXITED(status))
+   {
+      DOUT("normal exit");
+      //normal exit:
+      m_exit_code= (0xff & WEXITSTATUS(status));
+      m_pid= 0;
+      close(Direction::out);
+      m_state= ProcessState::stopped;
+      m_signal_terminated();
+      return;
+   }
+   if (WIFSIGNALED(status))
+   {
+      DOUT("signaled stop");
+      // exit by signal:
+      int termsignal = WTERMSIG(status);
+      // make term signal available in exit code (normal exit codes are only 8 bit)
+      m_exit_code = (termsignal << 8);
+      m_pid= 0;
+      close(Direction::out);
+      m_state= ProcessState::stopped;
+      m_signal_terminated();
+      return;
+   }
+   // this point should never be reached...!!
+} // eo ProcessImplementation::setChildState(pid_t,int)
+
+
+/*
+ * implementation of ProcessManager
+ */
+
+/// the instance of the process manager (highlander; there can be only one!)
+ProcessManager* ProcessManager::the_instance= NULL;
+
+
+ProcessManager::ProcessManager()
+{
+   setWhenTime(0);
+} // eo ProcessManager::ProcessManager
+
+
+/**
+ * delivers the process manager instance (generate if it doesn't exist)
+ * @return the process manager instance
+ */
+ProcessManager* ProcessManager::getInstance()
+{
+   if (! the_instance)
+   {
+      the_instance = new ProcessManager();
+      _activate_manager = &ProcessManager::activateMe;
+   }
+   return the_instance;
+} // eo ProcessManager::getInstance
+
+
+/**
+ * activate the timer so it's handled by the next backend cycle
+ */
+void ProcessManager::activateMe()
+{
+   setWhenTime(0);
+   activate();
+} // eo ProcessManager::activateMe
+
+
+/**
+ * real work is done here.
+ * Processes the information collected by the child signal handler.
+ */
+void ProcessManager::execute()
+{
+   PidStateList pid_state_list;
+   {
+      // block child signals (within this scope)
+      ScopedSignalBlocker blocker( Signal::CHLD );
+      // and now fetch the list of pending information
+      // (simply swap with our local empty list)
+      std::swap(pid_state_list, pending_pid_states);
+      // reserve the desired (minimum) capacity
+      pending_pid_states.reserve( config::pid_pool_capacity );
+   }
+   ODOUT("exec, " << pid_state_list.size() << " entries");
+
+   // interpret states:
+   for(PidStateList::iterator it = pid_state_list.begin();
+      it != pid_state_list.end();
+      ++it)
+   {
+      pid_t pid  = it->first;
+      int status = it->second;
+      ODOUT("  pid=" << pid << ", status=" << status);
+      ProcessImplementation *process_obj;
+      if (process::findChildProcess(pid,process_obj))
+      {
+         ODOUT("  local managed child,  process_obj="<< process_obj);
+         // pid found in list:
+         if (!WIFSTOPPED(status)
+#ifdef WIFCONTINUED
+            && !WIFCONTINUED(status)
+#endif
+            )
+         {
+            // take it from list if the child exited:
+            process::removeChildProcess(pid,process_obj);
+         }
+         if (process_obj)
+         {
+            // give the process object a chance to handle the state change:
+            process_obj->setChildState(pid, status);
+         }
+      }
+      else
+      {
+         ODOUT("foreign child");
+         // pid not found in list:
+         /* NOTE: in a non threaded environment this pid must be from a child process which is not
+            managed by this process classes; since this method is called after all setup of a child process
+            is done (; especially entering the new child pid into our internal lists).
+         */
+         m_foreign_pid_states.push_back(*it);
+      }
+   }
+
+   // handle the foreign childs:
+   {
+      /* idea:
+       * fetch a (pid,status) from the list, erase it (to avoid reentrance problems)
+       * and fire the signal. If someone forks childs outside this module then he can
+       * connect to the signal and receive all necessary status information gobbled by
+       * our child handler.
+       */
+      while (! m_foreign_pid_states.empty())
+      {
+         PidStateList::iterator it= m_foreign_pid_states.begin();
+         pid_t pid  = it->first;
+         int status = it->second;
+         m_foreign_pid_states.erase(it);
+         m_foreign_child_state_changed_signal(pid,status);
+      }
+   }
+
+} // eo ProcessManager::execute
+
+
+} // eo namespace SimpleIo
+} // eo namespace I2n
diff --git a/asnycio/simpleprocess.hpp b/asnycio/simpleprocess.hpp
new file mode 100644 (file)
index 0000000..d42c755
--- /dev/null
@@ -0,0 +1,198 @@
+/** @file
+ *
+ * simple process handling based on simple io classes.
+ *
+ * (c) Copyright 2007-2008 by Intra2net AG
+ *
+ * info@intra2net.com
+ */
+
+#ifndef _CONND_SIMPLEPROCESS_HPP_
+#define _CONND_SIMPLEPROCESS_HPP_
+
+#include <vector>
+#include <utility>
+
+#include <sys/types.h>
+
+#include <containerfunc.hpp>
+#include <signalfunc.hpp>
+#include "simpleio.hpp"
+
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+using SystemTools::Signal;
+using SystemTools::ScopedSignalBlocker;
+
+
+class ProcessManager;
+
+
+typedef std::pair< pid_t, int >      PidStatePair;
+typedef std::vector< PidStatePair >  PidStateList;
+
+
+/**
+ * represents process states.
+ */
+struct ProcessState
+{
+   enum _ProcessState
+   {
+      stopped = 0,
+      running,
+      suspended
+   }; // eo enum _ProcessState
+
+   _ProcessState m_state;
+
+   ProcessState(_ProcessState _state = stopped) : m_state(_state) {}
+
+   operator _ProcessState() const { return m_state; }
+}; // eo struct ProcessState
+
+
+/**
+ * specialisation of the io implementation class which fork/exec's a subprocess and
+ * connects with the new child's stdin/stdout.
+ *
+ * @note the signal @a IOImplementation::m_signal_eof of the base class can be used to detect when the
+ * new child closes it's stdout (which usually means that the child ended).
+ */
+class ProcessImplementation : public IOImplementation
+{
+      typedef IOImplementation inherited;
+
+      friend class ProcessManager;
+
+   public:
+
+      enum StderrMode
+      {
+         /// "magic" constant to pass to start() when childs stderr should be the same as parents stderr.
+         UseParentsStderr= 0,
+         /// "magic" constant to pass to start() when stderr should be the same as stdout for the new process.
+         StderrOnStdout
+      }; // eo enum StderrMode
+
+   public:
+      ProcessImplementation(
+         const std::string& path,
+         const std::vector<std::string>& args = std::vector<std::string>());
+      virtual ~ProcessImplementation();
+
+      virtual void close(Direction direction = Direction::both);
+
+      virtual bool startProcess( IOImplementation2* stderr );
+      bool startProcess( StderrMode stderr_mode = UseParentsStderr );
+
+      virtual void stopProcess(bool force=false);
+
+      PushBackFiller<std::string, std::vector > getArgAdder();
+
+      bool setCreateNewSession( bool enable= true);
+
+      bool setNice(int nice);
+
+      bool setWorkDir(const std::string& workdir);
+
+      void resetArgs( const std::vector< std::string >& args = std::vector< std::string >() );
+
+      /// returns the current process state
+      ProcessState processState() const { return m_state; }
+
+      ///returns the exit code of the process (if in stopped state)
+      int exitCode() const { return m_exit_code; }
+
+
+   protected:
+
+      bool kill(const Signal signal);
+
+      void setChildState(pid_t pid, int status);
+
+   protected:
+      /// the path to the binary
+      std::string m_path;
+      /// argument list (starting with argv0, usually the name of the binary)
+      std::vector<std::string> m_args;
+      /// increment of the nice level when the new child is started
+      int  m_nice_inc;
+      /// determines if the child should start a new session.
+      bool m_create_new_session;
+      /// determines the workdir where the child process should be started with.
+      std::string m_workdir;
+
+      /// the pid of the child process
+      pid_t m_pid;
+      /// the state of the child process
+      ProcessState m_state;
+      /// the exit code of the child (-1 if not available yet)
+      int  m_exit_code;
+
+      /// signal which is fired when the child terminated
+      SignalType m_signal_terminated;
+
+
+      /// "magic" constant to pass to start() when childs stderr should be the same as parents stderr.
+      static IOImplementation2* _UseParentsStderr;
+      /// "magic" constant to pass to start() when stderr should be the same as stdout for the new process.
+      static IOImplementation2* _StderrOnStdout;
+
+   private:
+
+}; // eo class ProcessImplementation
+
+
+/**
+ * manages overall process related stuff.
+ *
+ * @note this class is implemented as a singleton.
+ * @note this class uses the io timer interface to be called within the backend loops when necessary.
+ */
+class ProcessManager : public TimerBase
+{
+   public:
+
+      static ProcessManager* getInstance();
+
+   protected:
+      ProcessManager();
+      ProcessManager(const ProcessManager&);
+
+      virtual void execute();
+
+      void activateMe();
+
+   public:
+
+      /**
+       * the signal which is fired when waitpid() returns a status for a child process
+       * which is not managed by this process subsystem.
+       * Another module which forks child processes can connect to this signal to receive
+       * the information when these child processes are terminated.
+       */
+      boost::signal<void(pid_t,int)> m_foreign_child_state_changed_signal;
+
+   protected:
+
+      static ProcessManager *the_instance;
+
+      PidStateList  m_foreign_pid_states;
+
+   private:
+};
+
+
+bool installChildHandler();
+bool restoreChildHandler();
+
+
+} // eo namespace SimpleIo
+} // eo I2n
+
+#endif
diff --git a/asnycio/simplesocket.cpp b/asnycio/simplesocket.cpp
new file mode 100644 (file)
index 0000000..def93c0
--- /dev/null
@@ -0,0 +1,397 @@
+/** @file
+ *
+ * (c) Copyright 2008 by Intra2net AG
+ * 
+ * info@intra2net.com
+ *
+ * @todo unlink unix server socket on close.
+ */
+
+#include "simplesocket.hpp"
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <errno.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <limits.h>
+#include <filefunc.hxx>
+
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+
+namespace
+{
+
+struct sockaddr_un dummy_un;
+
+union MegaAddr {
+   struct sockaddr         m_addr;
+   struct sockaddr_in      m_addr_in;
+   struct sockaddr_un      m_addr_un; // NOTE (historically) too small...
+   // storage is large enough to hold all sockaddr_* variants with the (historical) exception of _un !
+   struct sockaddr_storage m_addr_store;
+   // a char array large enough to hold _un (with an path up to the maximum allowed size!)
+   // (the +1 is added for a later 0-termination of the path)
+   char m_buffer[ sizeof(dummy_un) - sizeof(dummy_un.sun_path) + PATH_MAX + 1 ];
+};
+
+
+} // eo namespace <anonymous>
+
+
+
+/*
+** implementation of ServerSocketBaseImplementation
+*/
+
+
+ServerSocketBaseImplementation::ServerSocketBaseImplementation()
+: IOImplementation()
+{
+} // eo ServerSocketBaseImplementation::ServerSocketBaseImplementation()
+
+
+/**
+ * @brief handled incoming connections on the server port.
+ *
+ * accepts the new connection, stores the peer address in an internal buffer
+ * and calls a (derived) acceptNewConnection method to create an apropriate
+ * IO class instance.
+ * If no io instance is created the connection is closed.
+ */
+void ServerSocketBaseImplementation::doRead()
+{
+   MegaAddr addr;
+   socklen_t addrlen = sizeof(addr);
+
+   // reset errno:
+   m_errno= 0;
+
+   // reset the mark:
+   resetReadMark();
+
+   int fd = ::accept( readFd(), &addr.m_addr, &addrlen);
+   if (fd < 0)
+   {
+      // handle errors.
+      m_errno= errno;
+      switch (m_errno)
+      {
+         case EBADF:
+         case EINVAL:
+         case ENOTSOCK:
+            // should not happen...
+            close();
+            break;
+         default:;
+      }
+      return;
+   }
+
+   if (addrlen < sizeof(addr))
+   {
+      // in case of unix domain socket: terminate the path!
+      // NOTE we are doing this here since we don't pass the length info.
+      addr.m_buffer[addrlen]= 0;
+   }
+   else
+   {
+      //something went terribly wrong!!
+      // the resulting address structure is larger than it ever could be...
+      ::close(fd);
+      return;
+   }
+
+   IOImplementationPtr connection= acceptNewConnection(fd, &addr.m_addr);
+   if(!connection)
+   {
+      ::close(fd);
+      return;
+   }
+   if (m_new_connection_base_callback)
+   {
+      m_new_connection_base_callback(connection);
+   }
+} // eo ServerSocketBaseImplementation::doRead()
+
+
+/**
+ * @brief handles write events.
+ *
+ * since server sockets never ever get a write mark, something must
+ * be wrong and the connection is closed!
+ */
+void ServerSocketBaseImplementation::doWrite()
+{
+   // if this method is called, something went wrong...
+   close();
+   //TODO throw something?!
+} // eo ServerSocketBaseImplementation::doWrite()
+
+
+
+/**
+ * @brief sets a function which is called when a new connection was established.
+ *
+ * The function gets a (shared) pointer to the new connetion as parameter and is
+ * expected to store it when it accepts the connection.
+ * (Else the conenction gets deleted after the function was called.)
+ *
+ * @param func the function which hsould be called on new conenctions.
+ */
+void ServerSocketBaseImplementation::setNewConnectionBaseCallback(
+   const NewConnectionBaseCallbackFunc& func)
+{
+   m_new_connection_base_callback= func;
+} // eo ServerSocketBaseImplementation::setNewConnectionBaseCallback(NewConnectionBaseCallbackFunc&)
+
+
+
+/**
+ * @brief callback for new connections.
+ *
+ * The base method always returns an empty pointer.
+ *
+ * Derived classes must override this method and do something useful with
+ * the passed file descriptor.
+ *
+ * @param fd the file descriptor of the new connection.
+ * @param addr pointer to the address structure filled from ::accept()
+ * @return shared pointer to the new IO class instance (; empty if not accepted)
+ */
+IOImplementationPtr ServerSocketBaseImplementation::acceptNewConnection(
+   int fd, boost::any addr)
+{
+   // needs to be defined in derived class!
+   return IOImplementationPtr();
+} // eo ServerSocketBaseImplementation::acceptNewConnection(int,boost::any)
+
+
+
+/*
+** implementation of UnixIOSocket
+*/
+
+
+UnixIOSocket::UnixIOSocket()
+{
+} // eo UnixIOSocket::UnixIOSocket()
+
+
+
+UnixIOSocket::UnixIOSocket(const std::string& path)
+: m_peer_pid(0)
+, m_peer_uid(0)
+, m_peer_gid(0)
+{
+   open(path);
+} // eo UnixIOSocket::UnixIOSocket(const std::string&)
+
+
+UnixIOSocket::UnixIOSocket(
+   int fd, const std::string& path,
+   unsigned int peer_pid, unsigned int peer_uid, unsigned int peer_gid)
+: IOImplementation(fd,fd)
+, m_path(path)
+, m_peer_pid(peer_pid)
+, m_peer_uid(peer_uid)
+, m_peer_gid(peer_gid)
+{
+} // eo UnixIOSocket::UnixIOSocket(int,const std::string&,unsigned,unsigned,unsigned)
+
+
+/**
+ * @brief opens a (client) connection to an unix domain socket.
+ *
+ * @param path the path the server is listening on.
+ * @return @a true iff the connection was successfully opened.
+ */
+bool UnixIOSocket::open(const std::string& path)
+{
+   if (opened()) close();
+
+   m_errno= 0;
+   m_path.clear();
+
+   if (path.empty() || path.size() >= PATH_MAX)
+   {
+      return false;
+   }
+
+   int fd= ::socket(PF_UNIX, SOCK_STREAM, 0);
+   if (fd<0)
+   {
+      m_errno= errno;
+      return false;
+   }
+
+   {
+      MegaAddr addr;
+      addr.m_addr_un.sun_family= AF_UNIX;
+      strncpy(addr.m_addr_un.sun_path, path.c_str(), PATH_MAX);
+      if (::connect(fd,(sockaddr*)&addr.m_addr_un, SUN_LEN(&addr.m_addr_un)) < 0)
+      {
+         m_errno= errno;
+         ::close(fd);
+         return false;
+      }
+   }
+   m_path= path;
+   setReadFd(fd);
+   setWriteFd(fd);
+   return true;
+} // eo UnixIOSocket::open(const std::string&,int)
+
+
+/*
+** implementation of UnixServerSocketBase
+*/
+
+
+UnixServerSocketBase::UnixServerSocketBase()
+{
+} // eo UnixServerSocketBase::UnixServerSocketBase
+
+
+UnixServerSocketBase::UnixServerSocketBase(const std::string& path, int mode)
+{
+   open(path,mode);
+} // eo UnixServerSocketBase::UnixServerSocketBase(const std::string&,int)
+
+
+/**
+ * @brief opens the server part of an unix domain socket.
+ *
+ * @param path the path the new port should listen on.
+ * @param mode the mode for the path.
+ * @return @a true iff the port was successfully opened.
+ */
+bool UnixServerSocketBase::open(const std::string& path, int mode)
+{
+   if (opened()) close();
+   m_errno= 0;
+   if (path.empty() || path.size() >= PATH_MAX)
+   {
+      return false;
+   }
+
+   int fd= ::socket(PF_UNIX, SOCK_STREAM, 0);
+   if (fd<0)
+   {
+      m_errno= errno;
+      return false;
+   }
+
+   {
+      MegaAddr addr;
+      addr.m_addr_un.sun_family= AF_UNIX;
+      strncpy(addr.m_addr_un.sun_path, path.c_str(), PATH_MAX);
+      ::I2n::unlink(path); // just in case...
+      mode_t old_mask= ::umask( (mode & 0777) ^ 0777);
+      if (::bind(fd,(sockaddr*)&addr.m_addr_un, SUN_LEN(&addr.m_addr_un)) < 0)
+      {
+         m_errno= errno;
+         ::umask(old_mask);
+         ::close(fd);
+         return false;
+      }
+      ::umask(old_mask);
+   }
+   m_path= path;
+
+   {
+      int res= ::listen(fd,8);
+      if (res < 0)
+      {
+         m_errno= errno;
+         ::close(fd);
+         return false;
+      }
+   }
+   setReadFd(fd);
+   return true;
+} // eo UnixServerSocketBase::open(const std::string&,int)
+
+
+/**
+ * @brief called from base class to create a new connection instance.
+ *
+ * This method accepts only connections to unix domain sockets.
+ * It also tries to determine the peer pid, uid and gid.
+ *
+ * @param fd the file descriptor of a freshly accepted connection.
+ * @param addr conatins "pointer to struct sockaddr"
+ * @return @a a (shared) pointer to the new connection isntance; empty if none was
+ * created.
+ */
+IOImplementationPtr UnixServerSocketBase::acceptNewConnection(int fd, boost::any addr)
+{
+   struct sockaddr *addr_ptr= NULL;
+   try {
+      addr_ptr = boost::any_cast<struct sockaddr*>(addr);
+   }
+   catch (boost::bad_any_cast&)
+   {
+      return IOImplementationPtr();
+   }
+   // check for the right family:
+   if (addr_ptr->sa_family != AF_UNIX)
+   {
+      return IOImplementationPtr();
+   }
+   struct sockaddr_un *un_ptr = reinterpret_cast<struct sockaddr_un*>(addr_ptr);
+   std::string peer_path( un_ptr->sun_path );
+   unsigned peer_pid=0;
+   unsigned peer_gid=0;
+   unsigned peer_uid=0;
+#ifdef __linux__
+   { // the linux way to get peer info (pid,gid,uid):
+      struct ucred cred;
+      socklen_t cred_len = sizeof(cred);
+      if (getsockopt(fd,SOL_SOCKET,SO_PEERCRED,&cred,&cred_len) == 0)
+      {
+         peer_pid= cred.pid;
+         peer_uid= cred.uid;
+         peer_gid= cred.gid;
+      }
+   }
+#else
+#error dont know how to determine peer info.
+#endif
+
+   UnixIOSocketPtr ptr( createIOSocket(fd, peer_path, peer_pid, peer_uid, peer_gid) );
+   return ptr;
+} // eo UnixServerSocketBase::acceptNewConnection(int,boost::any);
+
+
+/**
+ * @brief "real" creator of the connection instance.
+ *
+ * called by UnixServerSocketBase::acceptNewConnection to create the new io instance.
+ *
+ * @param fd file descriptor for the socket
+ * @param path path as delivered by peer.
+ * @param peer_pid peer pid.
+ * @param peer_uid peer uid.
+ * @param peer_gid peer gid.
+ * @return (shared) pointer to the new io instance.
+ */
+UnixIOSocketPtr UnixServerSocketBase::createIOSocket(
+   int fd, const std::string& path,
+   unsigned int peer_pid,
+   unsigned int peer_uid, unsigned int peer_gid)
+{
+   return UnixIOSocketPtr(
+      new UnixIOSocket(fd, path, peer_pid, peer_uid, peer_gid)
+   );
+} // eo UnixServerSocketBase::createIOSocket(int,const std::string&,unsigned,unsigned,unsigned)
+
+
+
+}// eo namespace SimpleIo
+}// eo namespace I2n
diff --git a/asnycio/simplesocket.hpp b/asnycio/simplesocket.hpp
new file mode 100644 (file)
index 0000000..745c787
--- /dev/null
@@ -0,0 +1,215 @@
+/** @file
+ * @brief socket classes for the SimpleIo framework.
+ *
+ *
+ * (c) Copyright 2008 by Intra2net AG
+ * 
+ * info@intra2net.com
+ */
+
+#ifndef __SIMPLEIO__SIMPLESOCKET_HPP__
+#define __SIMPLEIO__SIMPLESOCKET_HPP__
+
+#include "simpleio.hpp"
+
+#include <string>
+#include <boost/any.hpp>
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/type_traits/is_base_of.hpp>
+#include <boost/static_assert.hpp>
+#include <boost/function.hpp>
+
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+
+typedef boost::shared_ptr< IOImplementation > IOImplementationPtr;
+
+
+/**
+ * @brief base class for server sockets.
+ *
+ * Contains all the stuff which is common for different types of server sockets.
+ */
+class ServerSocketBaseImplementation
+: public IOImplementation
+{
+   public:
+      typedef boost::function< void(IOImplementationPtr) > NewConnectionBaseCallbackFunc;
+
+   public:
+
+      void setNewConnectionBaseCallback( const NewConnectionBaseCallbackFunc& func);
+
+   protected:
+      ServerSocketBaseImplementation();
+
+
+      virtual void doRead();
+      virtual void doWrite();
+
+      virtual IOImplementationPtr acceptNewConnection(int fd, boost::any addr);
+
+
+   protected:
+
+      NewConnectionBaseCallbackFunc m_new_connection_base_callback;
+
+}; // eo class ServerSocketBaseImplementation
+
+
+typedef boost::shared_ptr< ServerSocketBaseImplementation > ServerSocketBaseImplementationPtr;
+
+
+/*
+** unix domain sockets
+*/
+
+
+template<
+   class IOClass
+>
+class UnixServerSocket;
+
+
+/**
+ * @brief spezialized IO class for unix domain sockets.
+ *
+ */
+class UnixIOSocket
+: public IOImplementation
+{
+   public:
+      UnixIOSocket();
+      UnixIOSocket(const std::string& path);
+
+      bool open(const std::string& path);
+
+   protected:
+      friend class UnixServerSocketBase;
+      friend class UnixServerSocket<UnixIOSocket>;
+
+      UnixIOSocket(
+         int fd, const std::string& path,
+         unsigned int peer_pid, unsigned int peer_uid, unsigned int peer_gid);
+
+   protected:
+
+      std::string  m_path;
+      unsigned int m_peer_pid;
+      unsigned int m_peer_uid;
+      unsigned int m_peer_gid;
+
+}; // eo class UnixIOSocket
+
+typedef boost::shared_ptr< UnixIOSocket > UnixIOSocketPtr;
+
+
+
+/**
+ * @brief spezialized server socket class for unix domain sockets.
+ *
+ */
+class UnixServerSocketBase
+: public ServerSocketBaseImplementation
+{
+   public:
+      UnixServerSocketBase();
+      UnixServerSocketBase(const std::string& path, int mode=0600);
+
+      bool open(const std::string& path, int mode= 0600);
+
+   protected:
+
+      virtual IOImplementationPtr acceptNewConnection(int fd, boost::any addr);
+
+      virtual UnixIOSocketPtr createIOSocket(
+         int fd, const std::string& path,
+         unsigned int peer_pid,
+         unsigned int peer_uid, unsigned int peer_gid);
+
+   protected:
+
+      std::string m_path;
+
+}; // eo class UnixServerSocketBase
+
+
+/**
+ * @brief unix server socket class which "produces" connections of a determined type.
+ *
+ + @param IOClass the type of the connections.
+ */
+template<
+   class IOClass
+>
+class UnixServerSocket
+: public UnixServerSocketBase
+{
+      BOOST_STATIC_ASSERT(( boost::is_base_of<UnixIOSocket,IOClass>::value ));
+
+   public:
+      typedef boost::shared_ptr< IOClass > IOClassPtr;
+      typedef boost::function< void(IOClassPtr) > NewConnectionCallbackFunc;
+
+   public:
+
+      UnixServerSocket()
+      : UnixServerSocketBase()
+      {}
+
+      UnixServerSocket(const std::string& path, int mode=0600)
+      : UnixServerSocketBase(path,mode)
+      {}
+
+      void setNewConnectionCallback( const NewConnectionCallbackFunc& func)
+      {
+         if (func)
+         {
+            UnixServerSocketBase::setNewConnectionBaseCallback(
+               boost::bind(
+                  func,
+                  boost::bind<IOClassPtr, IOImplementationPtr>(
+                     &UnixServerSocket::my_ptr_cast,
+                     _1
+                  )
+               )
+            );
+         }
+         else
+         {
+            UnixServerSocketBase::setNewConnectionBaseCallback(
+               NewConnectionBaseCallbackFunc()
+            );
+         }
+      }
+
+   protected:
+
+      virtual UnixIOSocketPtr createIOSocket(
+         int fd, const std::string& path,
+         unsigned int peer_pid,
+         unsigned int peer_uid, unsigned int peer_gid)
+      {
+         return UnixIOSocketPtr(
+            new IOClass(fd, path, peer_pid, peer_uid, peer_gid)
+         );
+      }
+
+      static IOClassPtr my_ptr_cast(IOImplementationPtr ptr)
+      {
+         return boost::dynamic_pointer_cast<IOClass>(ptr);
+      }
+
+}; // eo class UnixServerSocket
+
+
+}// eo namespace SimpleIo
+}// eo namespace I2n
+
+
+#endif
diff --git a/asnycio/simpletimer.cpp b/asnycio/simpletimer.cpp
new file mode 100644 (file)
index 0000000..f716981
--- /dev/null
@@ -0,0 +1,107 @@
+/** @file
+ *
+ * (c) Copyright 2007 by Intra2net AG
+ * 
+ * info@intra2net.com
+ */
+
+//#define NOISEDEBUG
+
+
+#include "simpletimer.hpp"
+
+
+#ifdef NOISEDEBUG
+#include <iostream>
+#include <iomanip>
+#define DOUT(msg) std::cout << msg << std::endl
+#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
+#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
+#else
+#define DOUT(msg) do {} while (0)
+#define FODOUT(obj,msg) do {} while (0)
+#define ODOUT(msg) do {} while (0)
+#endif
+
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+
+/*
+ * Implementation of SimpleTimer
+ */
+
+SimpleTimer::SimpleTimer()
+{
+} // eo SimpleTimer::SimpleTimer()
+
+
+SimpleTimer::SimpleTimer(const MilliTime& delta, const TimerSignal::slot_function_type& action)
+{
+   addAction(action);
+   startTimer(delta);
+} // eo SimpleTimer::SimpleTimer(const MilliTime&, const TimerSignal::slot_type&)
+
+
+SimpleTimer::SimpleTimer(long milli_seonds_delta, const TimerSignal::slot_function_type& action)
+{
+   addAction(action);
+   startTimerMS(milli_seonds_delta);
+} // eo SimpleTimer::SimpleTimer(long, const TimerSignal::slot_type&)
+
+
+void SimpleTimer::execute()
+{
+   ODOUT("execute()!");
+   m_timer_signal();
+   m_timer_signal_p(this);
+} // eo SimpleTimer::execute
+
+
+void SimpleTimer::addAction( const TimerSignal::slot_function_type& action )
+{
+   m_timer_signal.connect(action);
+} // eo SimpleTimer::addAction(const TimerSignal::slot_type&)
+
+
+void SimpleTimer::addActionP( const TimerSignalP::slot_function_type& action )
+{
+   m_timer_signal_p.connect(action);
+} // eo SimpleTimer::addAction(const TimerSignalP::slot_type&)
+
+
+void SimpleTimer::startTimer( const MilliTime& delta )
+{
+   m_delta= delta;
+   setDeltaWhenTime( delta );
+   activate(true);
+#ifdef NOISEDEBUG
+   MilliTime now, t;
+   get_current_time(now);
+   t= getWhenTime();
+   MilliTime dt= t-now;
+   ODOUT("startTimer");
+   ODOUT("  now: sec="<< now.mt_sec << ", msec="<<now.mt_msec);
+   ODOUT("  t:   sec="<< t.mt_sec << ", msec="<<t.mt_msec);
+   ODOUT("  dt:  sec="<< dt.mt_sec << ", msec="<<dt.mt_msec);
+#endif
+} // eo SimpleTimer::startTimer(const MilliTime&)
+
+
+void SimpleTimer::startTimerMS( long milli_seconds )
+{
+   startTimer( MilliTime(0,milli_seconds) );
+} // eo SimpleTimer::stratTimerMS(long)
+
+
+void SimpleTimer::stopTimer()
+{
+   deactivate();
+} // eo SimpleTimer::stopTimer
+
+
+} // eo namespace SimpleIo
+} // eo namespace I2n
diff --git a/asnycio/simpletimer.hpp b/asnycio/simpletimer.hpp
new file mode 100644 (file)
index 0000000..64b3d41
--- /dev/null
@@ -0,0 +1,62 @@
+/** @file
+ *
+ * provides timer objects based on the TimerBase class 
+ *
+ * (c) Copyright 2007 by Intra2net AG
+ * 
+ * info@intra2net.com
+ */
+
+#ifndef _SIMPLEIO_SIMPLETIMER_HPP_
+#define _SIMPLEIO_SIMPLETIMER_HPP_
+
+#include "simpleio.hpp"
+
+#include <boost/signal.hpp>
+
+namespace I2n
+{
+namespace SimpleIo
+{
+
+
+class SimpleTimer : public TimerBase
+{
+   public:
+
+      typedef boost::signal<void()>       TimerSignal;
+      typedef boost::signal<void(SimpleTimer*)> TimerSignalP;
+
+   public:
+
+      SimpleTimer();
+
+      // convenience constructors for simple timeouts:
+      SimpleTimer(const MilliTime& delta, const TimerSignal::slot_function_type& action);
+      SimpleTimer(long milli_seonds_delta, const TimerSignal::slot_function_type& action);
+
+      // add actions:
+      void addAction( const TimerSignal::slot_function_type& action );
+      void addActionP( const TimerSignalP::slot_function_type& action );
+
+      // start the timer:
+      void startTimer( const MilliTime& delta );
+      void startTimerMS( long milli_seconds );
+
+      // stop the timer:
+      void stopTimer();
+
+   protected:
+      MilliTime    m_delta;
+
+      TimerSignal  m_timer_signal;
+      TimerSignalP m_timer_signal_p;
+
+      virtual void execute();
+}; // eo class SimpleTimer
+
+
+} // eo namespace SimpleIo
+} // eo namespace I2n
+
+#endif