From 5c8a3d407c74ffefacb57a4909dd0a6198ea0312 Mon Sep 17 00:00:00 2001 From: Reinhard Pfau Date: Mon, 20 Oct 2008 15:31:06 +0000 Subject: [PATCH] libasnyncio: (reinhard) more moves. UNUSABLE YET. --- asnycio/Makefile.am | 13 + asnycio/libsimpleio.pc.in | 11 + asnycio/simplecallout.cpp | 336 +++++++++ asnycio/simplecallout.hpp | 158 +++++ asnycio/simpleio.cpp | 1695 +++++++++++++++++++++++++++++++++++++++++++++ asnycio/simpleio.hpp | 463 +++++++++++++ asnycio/simplepipe.cpp | 98 +++ asnycio/simplepipe.hpp | 44 ++ asnycio/simpleprocess.cpp | 817 ++++++++++++++++++++++ asnycio/simpleprocess.hpp | 198 ++++++ asnycio/simplesocket.cpp | 397 +++++++++++ asnycio/simplesocket.hpp | 215 ++++++ asnycio/simpletimer.cpp | 107 +++ asnycio/simpletimer.hpp | 62 ++ 14 files changed, 4614 insertions(+), 0 deletions(-) create mode 100644 asnycio/Makefile.am create mode 100644 asnycio/libsimpleio.pc.in create mode 100644 asnycio/simplecallout.cpp create mode 100644 asnycio/simplecallout.hpp create mode 100644 asnycio/simpleio.cpp create mode 100644 asnycio/simpleio.hpp create mode 100644 asnycio/simplepipe.cpp create mode 100644 asnycio/simplepipe.hpp create mode 100644 asnycio/simpleprocess.cpp create mode 100644 asnycio/simpleprocess.hpp create mode 100644 asnycio/simplesocket.cpp create mode 100644 asnycio/simplesocket.hpp create mode 100644 asnycio/simpletimer.cpp create mode 100644 asnycio/simpletimer.hpp diff --git a/asnycio/Makefile.am b/asnycio/Makefile.am new file mode 100644 index 0000000..2694ef2 --- /dev/null +++ b/asnycio/Makefile.am @@ -0,0 +1,13 @@ +INCLUDES = @LIBI2NCOMMON_CFLAGS@ @BOOST_CPPFLAGS@ +METASOURCES = AUTO +lib_LTLIBRARIES = libsimpleio.la +libsimpleio_la_SOURCES = simplecallout.cpp simpleio.cpp simplepipe.cpp \ + simpleprocess.cpp simplesocket.cpp simpletimer.cpp +include_HEADERS = simplecallout.hpp simpleio.hpp simplepipe.hpp \ + simpleprocess.hpp simplesocket.hpp simpletimer.hpp +libsimpleio_la_LIBADD = @LIBI2NCOMMON_LIBS@ @BOOST_LDFLAGS@ @BOOST_SIGNALS_LIB@ + +libsimpleio_la_LDFLAGS = -version-info @LIBSIMPLEIO_LIB_VERSION@ + +pkgconfigdir=$(libdir)/pkgconfig +pkgconfig_DATA= libsimpleio.pc diff --git a/asnycio/libsimpleio.pc.in b/asnycio/libsimpleio.pc.in new file mode 100644 index 0000000..a671d47 --- /dev/null +++ b/asnycio/libsimpleio.pc.in @@ -0,0 +1,11 @@ +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + +Name: libsimpleio +Description: asynchrounous io lib +Version: @VERSION@ +Requires: libi2ncommon +Libs: -L${libdir} -lsimpleio +Cflags: -I${includedir} diff --git a/asnycio/simplecallout.cpp b/asnycio/simplecallout.cpp new file mode 100644 index 0000000..13f5c63 --- /dev/null +++ b/asnycio/simplecallout.cpp @@ -0,0 +1,336 @@ +/** + * @file + * + * @copyright © Copyright 2008 by Intra2net AG + * @license commercial + * + * info@intra2net.com + */ + +#include "simplecallout.hpp" + +#include +#include + +#include + + +namespace I2n +{ +namespace SimpleIo +{ + +namespace +{ + +typedef boost::shared_ptr< Detail::Caller > CallerPtr; + +typedef std::map< unsigned long, CallerPtr > CallMap; + + +unsigned long l_last_id=0; + +CallMap l_call_map; + + +/** + * @brief creates a new id value for a call out. + * @return the new id. + * + * The id value is basically just a counter. + * It is implemented in a way that it can not be 0 and can deal with wrap arounds. + */ +unsigned long create_call_out_id_value() +{ + while ( l_call_map.find(++l_last_id) != l_call_map.end() and l_last_id != 0); + return l_last_id; +} // eo create_call_out_id_value + + +void add_call( CallerPtr caller ) +{ + if (caller->joinId()) + { + l_call_map[ caller->getCallOutId().getValue() ] = caller; + } +} // eo add_call + + +bool remove_call( unsigned long id_value ) +{ + CallMap::iterator it= l_call_map.find(id_value); + if (it != l_call_map.end()) + { + l_call_map.erase(it); + return true; + } + return false; +} // eo remove_call(unsigned long) + + +CallerPtr get_call(unsigned long id_value) +{ + CallMap::iterator it= l_call_map.find(id_value); + if (it != l_call_map.end()) + { + return it->second; + } + return CallerPtr(); +} // eo get_call(unsigned long) + + +bool has_call( unsigned long id_value ) +{ + CallMap::iterator it= l_call_map.find(id_value); + return (it != l_call_map.end() ); +} // eo has_call(unsigned long) + + + +} // eo namespace + + + +/* +** implementation of class CallOutId +*/ + +CallOutId::CallOutId() +: m_value(0) +{ +} // eo CallOutId::CallOutId() + + +CallOutId::CallOutId(unsigned long value) +: m_value(value) +{ +} // eo CallOutId::CallOutId(unsigned long) + + +bool CallOutId::thaw() const +{ + if (m_caller_weak_ptr.expired()) + { + return false; + } + CallerPtr call_ptr= get_call(m_value); + if (call_ptr) + { + return call_ptr->thaw(); + } + return false; +} // eo CallOutId::thaw() const + + +bool CallOutId::remove() +{ + if (m_caller_weak_ptr.expired()) + { + return false; + } + unsigned long value= m_value; + m_value= 0; + return remove_call(value); +} // eo CallOutId::remove() + + +bool CallOutId::active() const +{ + return m_value!=0 and not m_caller_weak_ptr.expired() and has_call(m_value); +} // eo CallOutId::active() const + + +/** + * @brief retruns if the call is frozen. + * @return @a true iff the call is frozen. + */ +bool CallOutId::frozen() const +{ + CallerPtr caller= m_caller_weak_ptr.lock(); + if (not caller) + { + return false; + } + return caller->frozen(); +} // eo CallOutId::frozen() const + + +/** + * @brief returns the remaining time until the call is done or thrown away (on frozen calls). + * @return the remaining time. + * + * The result only makes sense if the call is still active. + */ +MilliTime CallOutId::remaining_time() +{ + CallerPtr caller= m_caller_weak_ptr.lock(); + if ( not active() or not caller ) + { + return MilliTime(); + } + MilliTime t; + get_current_monotonic_time(t); + MilliTime result= caller->getWhenTime(); + result-= t; + MilliTime t_null; + return (result < t_null ? t_null : result); +} // eo CallOutId::remaining_time() + + +namespace Detail +{ + +/* +** implementation of class Caller +*/ + +Caller::Caller( boost::function< void() > f, long delta_sec, long delta_msec, bool frozen) +: TimerBase() +, m_call_out_id( create_call_out_id_value() ) +, m_func(f) +, m_waiting(frozen) +{ + SCOPETRACKER(); + setDeltaWhenTime( delta_sec, delta_msec); +} // eo Caller::Caller(boost::function< void() >,long) + + +Caller::~Caller() +{ + SCOPETRACKER(); +} // eo Caller::~Caller() + + +void Caller::execute() +{ + SCOPETRACKER(); + // NOTE: since the func may throw an exception, we first get a shared pointer + // (to prevent early deletion) and then we remove us from the call map. + CallerPtr ptr= shared_from_this(); + m_call_out_id.remove(); + + if (m_func and not m_waiting) + { + m_func(); // may throw..., but at this point it doesn't harm. + //( it may harm at other places,...) + } +} // eo Caller::execute() + + +bool Caller::thaw() +{ + if (m_waiting) + { + m_waiting= false; + setDeltaWhenTime( 0, 0 ); + return true; + } + return false; +} // eo Caller::thaw() + + +bool Caller::joinId() +{ + if (m_call_out_id.m_caller_weak_ptr.expired()) + { + m_call_out_id.m_caller_weak_ptr= shared_from_this(); + activate(); + return true; + } + return false; +} // eo Caller::joinId() + + +bool Caller::frozen() const +{ + return m_waiting; +} // eo Caller::frozen() const + + +} // eo namespace Detail + + + +/** + * @brief remove a pending call by id. + * + * @param id the call id which should be removed. + * @return @a true iff the call was removed, @a false if no call with the given id was found. + */ +bool removeCallOut( CallOutId& id ) +{ + return id.remove(); +} // eo removeCallOut(CallOutId&) + + + +template<> +CallOutId callOut( boost::function< void() > f, long delta_sec) +{ + CallerPtr caller( new Detail::Caller(f,delta_sec) ); + add_call(caller); + return caller->getCallOutId(); +} // eo callOut(boost::function< void() >,long) + +template<> +CallOutId callOut( boost::function< void() > f, int delta_sec) +{ + return callOut(f,delta_sec); +} // eo callOut(boost::function< void() >,int) + + +template<> +CallOutId callOut( boost::function< void() > f, double delta_sec ) +{ + long delta_sec_i = (long)delta_sec; + long delta_sec_m = (long)((delta_sec - (double)delta_sec_i)*1000.0); + CallerPtr caller( new Detail::Caller(f,delta_sec_i, delta_sec_m) ); + add_call(caller); + return caller->getCallOutId(); +} // eo callOut(boost::function< void() >,double) + + +template<> +CallOutId callOut( boost::function< void() > f, float delta_sec ) +{ + return callOut(f,delta_sec); +} // eo callOut(boost::function< void() >,float) + + + + +template<> +CallOutId frozenCall( boost::function< void() > f, long delta_sec) +{ + CallerPtr caller( new Detail::Caller(f,delta_sec, 0, true) ); + add_call(caller); + return caller->getCallOutId(); +} // eo frozenCall(boost::function< void() >,long) + +template<> +CallOutId frozenCall( boost::function< void() > f, int delta_sec) +{ + return frozenCall(f,delta_sec); +} // eo frozenCall(boost::function< void() >,int) + + +template<> +CallOutId frozenCall( boost::function< void() > f, double delta_sec ) +{ + long delta_sec_i = (long)delta_sec; + long delta_sec_m = (long)((delta_sec - (double)delta_sec_i)*1000.0); + CallerPtr caller( new Detail::Caller(f,delta_sec_i, delta_sec_m, true) ); + add_call(caller); + return caller->getCallOutId(); +} // eo frozenCall(boost::function< void() >,double) + + +template<> +CallOutId frozenCall( boost::function< void() > f, float delta_sec ) +{ + return frozenCall(f,delta_sec); +} // eo frozenCall(boost::function< void() >,float) + + +} // eo namespace SimpleIo +} // eo namespace I2n diff --git a/asnycio/simplecallout.hpp b/asnycio/simplecallout.hpp new file mode 100644 index 0000000..c895bbc --- /dev/null +++ b/asnycio/simplecallout.hpp @@ -0,0 +1,158 @@ +/** + * @file + * @brief provides a method for delayed execution of functions. + * + * + * @copyright © Copyright 2008 by Intra2net AG + * @license commercial + * + * info@intra2net.com + */ + +#ifndef __SIMPLEIO_SIMPLECALLOUT_HPP_ +#define __SIMPLEIO_SIMPLECALLOUT_HPP_ + +#include "simpleio.hpp" + +#include +#include +#include + + +namespace I2n +{ +namespace SimpleIo +{ + +// forward declarations: +namespace Detail { + +class Caller; + +typedef boost::shared_ptr< Caller > CallerPtr; +typedef boost::weak_ptr< Caller > CallerWeakPtr; + +} // eo namespace Detail + + +/** + * @brief represents an id for a deferred call. + * + * Also provides methods for modifying the call + * (like thaw or delete it). + */ +class CallOutId +{ + friend class Detail::Caller; + + public: + CallOutId(); + + unsigned long getValue() const {return m_value;} + + bool thaw() const; + bool remove(); + + bool active() const; + bool frozen() const; + + MilliTime remaining_time(); + + private: + + CallOutId(unsigned long value); + + private: + + unsigned long m_value; + + Detail::CallerWeakPtr m_caller_weak_ptr; + +}; // eo class CallOutId + + + +/* +** +*/ + +namespace Detail { + +/** + * @brief tool class for holding and executing a deferred call. + * + */ +class Caller +: public TimerBase +, public boost::enable_shared_from_this< Caller > +{ + public: + Caller( boost::function< void() > f, long delta_sec, long delta_msec=0, bool frozen=false ); + virtual ~Caller(); + + CallOutId getCallOutId() const { return m_call_out_id; } + + bool thaw(); + + bool joinId(); + + bool frozen() const; + + protected: + + virtual void execute(); + + private: + + CallOutId m_call_out_id; + boost::function< void() > m_func; + bool m_waiting; +}; // eo class Caller + + +} // eo namespace Detail + +/* +** +*/ + +/** + * @brief initiates a deferred call of a function. + * + * @param f the function which should be called. + * @param delta_sec the delta time (in seconds) when the function should be called. + * @return an id to identify the call (may be used for preliminary removal of the call) + */ +template< typename F > +CallOutId callOut( boost::function< void() > f, F delta_sec ); + +template<> CallOutId callOut( boost::function< void() > f, long delta_sec ); +template<> CallOutId callOut( boost::function< void() > f, double delta_sec ); +template<> CallOutId callOut( boost::function< void() > f, float delta_sec ); +template<> CallOutId callOut( boost::function< void() > f, int delta_sec ); + + +/** + * @brief initiates a frozen call of a function. + * + * @param f the function which should be called. + * @param delta_sec the delta time (in seconds) when the call will be (silently) removed. + * @return an id to identify the call; neccessary for thaw the call. + */ +template< typename F > +CallOutId frozenCall( boost::function< void() > f, F delta_sec); + +template<> CallOutId frozenCall( boost::function< void() > f, long delta_sec ); +template<> CallOutId frozenCall( boost::function< void() > f, double delta_sec ); +template<> CallOutId frozenCall( boost::function< void() > f, float delta_sec ); +template<> CallOutId frozenCall( boost::function< void() > f, int delta_sec ); + + + +bool removeCallOut( CallOutId& id ); + + +} // eo namespace SimpleIo +} // eo namespace I2n + +#endif diff --git a/asnycio/simpleio.cpp b/asnycio/simpleio.cpp new file mode 100644 index 0000000..9bf0cad --- /dev/null +++ b/asnycio/simpleio.cpp @@ -0,0 +1,1695 @@ +/** @file + * + * + * @copyright Copyright © 2007-2008 by Intra2net AG + * @license commercial + * @contact info@intra2net.com + */ + +//#define NOISEDEBUG + +#include "simpleio.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include + +#ifdef NOISEDEBUG +#include +#include +#define DOUT(msg) std::cout << msg << std::endl +#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl +//#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl +#define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl +#else +#define DOUT(msg) do {} while (0) +#define FODOUT(obj,msg) do {} while (0) +#define ODOUT(msg) do {} while (0) +#endif + +namespace +{ + + +/* + * configuration: + */ + + +const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min + + +/** + * contains internal helper structs and functions for io handling. + */ +namespace internal_io +{ + +/** + * extends struct pollfd with some convenience functions + */ +struct PollFd : public ::pollfd +{ + PollFd() + { + fd= 0; + events= revents= 0; + } // eo PollFd + + + /** + * initializes the struct with a given file descriptor and clears the event mask(s). + * @param _fd + */ + PollFd(int _fd) + { + fd= _fd; + events= revents= 0; + } // eo PollFd + + + /** + * set that we want to be notified about new incoming data + */ + void setPOLLIN() { events |= POLLIN; } + + /** + * set that we want to be notified if we can send (more) data. + */ + void setPOLLOUT() { events |= POLLOUT; } + +}; // eo struct PollFd + + +typedef std::vector PollVector; +typedef std::map FdPollMap; +typedef std::map FdIOMap; + + +/** + * struct for interfacing our local structures with poll() + */ +struct PollDataCluster +{ + PollVector m_poll_vector; + FdPollMap m_fd_poll_map; + FdIOMap m_read_fd_io_map; + FdIOMap m_write_fd_io_map; + + void add_read_fd( int fd, I2n::SimpleIo::IOImplementation* io); + void add_write_fd( int fd, I2n::SimpleIo::IOImplementation* io); + + pollfd* get_pollfd_ptr(); + unsigned int get_num_pollfds() const; + +}; // eo struct PollDataCluster + + +template +class PtrList : public std::list +{ + typedef std::list inherited; + public: + bool dirty; + + static int Instances; + + public: + + PtrList() + : dirty(false) + { + ++Instances; + } // eo PtrList + + + ~PtrList() + { + ODOUT(""); + --Instances; + } + + + /** + * add a new item pointer to the list. + * + * @param item item pointer which should be added + */ + void add_item(T* item) + { + typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item); + if (it != inherited::end()) + { + // nothing to do since item is already in the list + return; + } + push_back(item); + } // eo add + + + /** + * remove an item pointer from the list by setting NULL at the current position of the item and mark + * the list as dirty. + * + * @param item the io object which should be removed from the list. + */ + void remove_item(T* item) + { + typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item); + if (it == inherited::end()) + { + // nothing to do: + return; + } + *it = NULL; // forget the pointer + dirty= true; // ..and mark the list as dirty (i.e. has NULL elements) + } // eo remove + + + /** + * cleans the list of objects by removing the NULL elements (if any). + * + * @note this function should only be called when it is ensured that no other functions using iterators of this list. + */ + void clean_list() + { + if (!dirty) + { + // nothing to do + return; + } + // remove the NULL elements now: + erase( + std::remove( inherited::begin(), inherited::end(), (T*)NULL), + inherited::end() ); + dirty= false; + } // eo clean_list + + +}; // eo class PtrList + + +typedef PtrList IOList; +typedef PtrList TimerList; + +template<> int IOList::Instances= 0; +template<> int TimerList::Instances= 0; + + +/** + * the (internal) global list of io objects (object pointers) + */ +IOList& g_io_list() +{ + static IOList _the_io_list; + return _the_io_list; +}; + + +/** + * the (internal) global list of timer objects (object pointers) + */ +TimerList& g_timer_list() +{ + static TimerList _the_timer_list; + return _the_timer_list; +} + +/* + * implementation of PollDataCluster + */ + + +/** + * add a new file descriptor to the read list. + * + * @param fd the file descriptor. + * @param io the io object which uses the fd for reading. + */ +void PollDataCluster::add_read_fd( int fd, I2n::SimpleIo::IOImplementation* io) +{ + FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd); + if (itPollMap != m_fd_poll_map.end()) + { + m_poll_vector[ itPollMap->second ].setPOLLIN(); + } + else + { + PollFd item(fd); + item.setPOLLIN(); + m_fd_poll_map[fd] = m_poll_vector.size(); + m_poll_vector.push_back( item ); + } + m_read_fd_io_map[fd]= io; +} // eo PollDataCluster::add_read_fd + + +/** + * add a new file descriptor to the write list. + * + * @param fd the file descriptor. + * @param io the io object which uses the fd for writing. + */ +void PollDataCluster::add_write_fd( int fd, I2n::SimpleIo::IOImplementation* io) +{ + FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd); + if (itPollMap != m_fd_poll_map.end()) + { + m_poll_vector[ itPollMap->second ].setPOLLOUT(); + } + else + { + PollFd item(fd); + item.setPOLLOUT(); + m_fd_poll_map[fd] = m_poll_vector.size(); + m_poll_vector.push_back( item ); + } + m_write_fd_io_map[fd]= io; +} // eo PollDataCluster::add_write_fd + + +/** + * returns a pointer to a pollfd array; suitable for passing to poll() + * + * @return pointer to pollfd array + */ +pollfd* PollDataCluster::get_pollfd_ptr() +{ + return m_poll_vector.empty() ? NULL : &m_poll_vector.front(); +} // eo get_pollfd_ptr + + +/** + * returns the number of entries in the pollfd array; suitable for passing to poll() + * + * @return the number of entries in the pollfd array + */ +unsigned int PollDataCluster::get_num_pollfds() const +{ + return m_poll_vector.size(); +} // eo get_num_pollfds + + + +} // eo namespace internal_io + + +/* + * some internal tool functions and structures + */ + +struct FilterMatch { + I2n::SimpleIo::FilterBasePtr m_filter; + + FilterMatch(I2n::SimpleIo::FilterBasePtr filter) + : m_filter(filter) + {} + + bool operator () (const I2n::SimpleIo::FilterBasePtr& item) + { + return item && item == m_filter; + } + +}; // eo struct FilterMatch + + +void get_current_real_time(long& current_sec, long& current_msec) +{ + struct timeval tv; + gettimeofday(&tv,NULL); + current_sec= tv.tv_sec; + current_msec= (tv.tv_usec / 1000); + if (current_msec >= 1000) + { + current_sec += (current_msec / 1000); + current_msec%= 1000; + } +} // eo get_current_real_time + + +void get_current_monotonic_time(long& current_sec, long& current_msec) +{ + long nsec; + if (monotonic_clock_gettime(current_sec,nsec)) + { + current_msec= nsec / 1000000L; + } + else + { + //fallback... + get_current_real_time(current_sec,current_msec); + } +} // eo get_current_monotonic_time + + + +} // eo anonymous namespace + + + + +namespace I2n +{ +namespace SimpleIo +{ + + +/** + * @brief gets the current time as MilliTime structure. + * @param[out] mt reference to the MilliTime strcucture which is filled with the result. + */ +void get_current_real_time(MilliTime& mt) +{ + long sec, msec; + ::get_current_real_time(sec,msec); + mt.set(sec,msec); +} // eo get_current_real_time + + +/** + * @brief gets the current time as MilliTime structure. + * @param[out] mt reference to the MilliTime strcucture which is filled with the result. + */ +void get_current_monotonic_time(MilliTime& mt) +{ + long sec, msec; + ::get_current_monotonic_time(sec,msec); + mt.set(sec,msec); +} // eo get_current_monotonic_time + + +/* + * implementation of MilliTime + */ + +MilliTime::MilliTime(long sec, long msec) +: mt_sec(sec), mt_msec(msec) +{ + normalize(); +} // eo MilliTime::MilliTime + + +void MilliTime::set(long sec, long msec) +{ + mt_sec= sec; + mt_msec= msec; + normalize(); +} // eo MilliTime::set + + +/** + * normalizes the values, so that mt_msec has a value between 0 and 999. + */ +void MilliTime::normalize() +{ + if (mt_msec < 0) + { + mt_sec += (mt_msec / 1000) - 1; + mt_msec = (mt_msec % 1000) + 1000; + } + else if (mt_msec>=1000) + { + mt_sec+= (mt_msec / 1000); + mt_msec %= 1000; + } +} // eo MilliTime::normalize + + +/** + * determine if the represented point in time is before another one. + * @param other the other point in time. + * @return true if the own point in time is before the other one. + */ +bool MilliTime::operator < (MilliTime& other) +{ + normalize(); + other.normalize(); + return + (mt_sec < other.mt_sec) + || (( mt_sec == other.mt_sec) && (mt_msec < other.mt_msec)); +} // eo MilliTime::operator < + + +/** + * determine if two point in times are equal. + * @param other the point in time to compare with. + * @return true if the represented times are equal. + */ +bool MilliTime::operator == (MilliTime& other) +{ + normalize(); + other.normalize(); + return (( mt_sec == other.mt_sec) && (mt_msec == other.mt_msec)); +} // eo MilliTime::operator < + +/** + * @brief subtracts a time delta from the object. + * @param lhs the time delta to subtract. + * @return reference to the object itself. + */ +MilliTime& MilliTime::operator -= (const MilliTime& lhs) +{ + mt_sec -= lhs.mt_sec; + mt_msec -= lhs.mt_msec; +} // eo operator -= + + +/** + * @brief adds a time delta from the object. + * @param lhs the time delta to add. + * @return reference to the object itself. + */ +MilliTime& MilliTime::operator += (const MilliTime& lhs) +{ + mt_sec += lhs.mt_sec; + mt_msec += lhs.mt_msec; +} // eo operator += + + +/* + * implementation of TimerBase + */ + +/** + * constructor. Adds the object to the internal timer list. + */ +TimerBase::TimerBase() +: m_active(false) +, m_marked(false) +{ + internal_io::g_timer_list().add_item(this); +} // eo TimerBase::TimerBase + + +/** + * destructor. Removes the object from the internal timer list. + */ +TimerBase::~TimerBase() +{ + ODOUT("enter"); + if (internal_io::TimerList::Instances) + { + ODOUT("remove from list"); + internal_io::g_timer_list().remove_item(this); + } +} // eo TimerBase::~TimerBase + + +/** + * @brief returns the point in time when the time is executed in real time. + * @return the point in time when the timer is to be executed. + */ +MilliTime TimerBase::getRealWhenTime() const +{ + MilliTime mono_time; + MilliTime real_time; + get_current_monotonic_time(mono_time); + get_current_real_time(real_time); + MilliTime result= m_when - mono_time + real_time; + return result; +} // eo TimerBase::getRealWhenTime() const + + +/** + * sets the time when the event should be executed. + * @param sec the seconds part of the point in time. + * @param msec the milliseconds part of the point in time. + */ +void TimerBase::setWhenTime(long sec, long msec) +{ + m_when.set(sec,msec); + m_marked= false; +} // eo TimerBase::setWhenTime + + +/** + * sets the time when the event should be executed. + * @param mt the point in time. + */ +void TimerBase::setWhenTime(const MilliTime& mt) +{ + m_when= mt; + m_marked= false; +} // eo TimerBase::setWhenTime + + +/** + * sets the time delta measured from current time when the event should be executed. + * @param sec the seconds of the time delta + * @param msec the milli seconds of the time delta + */ +void TimerBase::setDeltaWhenTime(long sec, long msec) +{ + setDeltaWhenTime( MilliTime(sec,msec) ); +} // eo TimerBase::setWhenTime + + + +/** + * sets the time delta measured from current time when the event should be executed. + * @param mt the time delta + */ +void TimerBase::setDeltaWhenTime(const MilliTime& mt) +{ + get_current_monotonic_time(m_when); + m_when+= mt; + m_marked= false; +} // eo TimerBase::setWhenTime + + +/** + * set the active state of the timer event. + * @param active determines if the object should be active (default: yes). + */ +void TimerBase::activate(bool active) +{ + m_active = active; + if (!active) + { + // clear the mark if we are not active. + m_marked= false; + } +} // eo TimerBase::activate + + +/** @fn void TimerBase::deactivate() + * deactivates the event by clearing the active state. + */ + + +/** + * called when the timer event occured. + */ +void TimerBase::execute() +{ +} // eo TimerBase::execute + + +/* + * implementation of FilterBase class + */ + + +FilterBase::FilterBase() +: m_io(NULL) +{ +} // eo FilterBase::FilterBase() + + +/** + * injects incoming data. + * @param data the new data + */ +void FilterBase::injectIncomingData(const std::string& data) +{ + if (m_io) + { + FilterBasePtr ptr= get_ptr_as< FilterBase >(); + if (ptr) + { + m_io->injectIncomingData(ptr,data); + } + } +} // FilterBase::injectIncomingData(const std::string&) + + +/** + * injects outgoing data. + * @param data the new data + */ +void FilterBase::injectOutgoingData(const std::string& data) +{ + if (m_io) + { + FilterBasePtr ptr= get_ptr_as< FilterBase >(); + if (ptr) + { + m_io->injectOutgoingData(ptr,data); + } + } +} // eo FilterBase::injectOutgoingData(const std::string&) + + + +/** + * called when EOF detected on incoming channel (or incoming channel closed) + */ +void FilterBase::endOfIncomingData() +{ +} // eo FilterBase::endOfIncomingData() + +/** + * called when the filter should reset. + * This is used when a new channel is opened or when the filter is taken out of a filter chain. + */ +void FilterBase::reset() +{ +} // eo FilterBase::reset() + + +/* + * implementation of IOImplementation class + */ + + +/** + * constructor for the base io class. + * + * Also adds the object to internal list of io objects (which is used by the backend). + * + * @param read_fd the file descriptor which should be used for reading (default -1 for no value) + * @param write_fd the file descriptor which should be used for writing (default -1 for no value) + */ +IOImplementation::IOImplementation(int read_fd, int write_fd) +: m_read_fd(-1) +, m_write_fd(-1) +, m_eof(false) +, m_not_writable(false) +, m_input_buffer() +, m_output_buffer() +, m_marked_for_reading(false) +, m_marked_for_writing(false) +{ + internal_io::g_io_list().add_item(this); + if (read_fd >= 0) + { + setReadFd( read_fd ); + } + if (write_fd >= 0) + { + setWriteFd( write_fd ); + } +} // eo IOImplementation::IOImplementation + + +/** + * destructor of the base io class. + * + * Removes the object from the interal list of io objects. + */ +IOImplementation::~IOImplementation() +{ + close(); + if (internal_io::IOList::Instances) + { + internal_io::g_io_list().remove_item(this); + } + // now clear the filters: + while (! m_filter_chain.empty() ) + { + FilterChain::iterator it = m_filter_chain.begin(); + (*it)->reset(); + (*it)->m_io= NULL; + //TODO: signal the filter that it is removed ?! + m_filter_chain.erase(it); + } +} // eo IOImplementation::~IOImplementation + + +/** + * adds another filter to the filter chain. + * @param filter pointer to the new filter. + */ +void IOImplementation::addFilter +( + FilterBasePtr filter +) +{ + if (!filter) + { + return; // nothing to do + } + if (filter->m_io) + { + filter->m_io->removeFilter(filter); + } + m_filter_chain.push_back( filter ); +} // eo IOImplementation::addFilter + + +/** + * removes a filter from the filter chain. + * @param filter the pointer to the filter which is removed. + * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for + * deleting the filter if it was dynamically allocated. + */ +void IOImplementation::removeFilter +( + FilterBasePtr filter +) +{ + FilterChain::iterator it = + std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) ); + if (it != m_filter_chain.end()) + { + filter->reset(); + filter->m_io= NULL; + //TODO: signal the filter that it is removed ?! + m_filter_chain.erase(it); + } +} // eo IOImplementation::removeFilter + + +/** + * closes the file descriptors (/ the connection). + * + * @param direction the direction which should be closed (default: @a Direction::both for all). + */ +void IOImplementation::close(Direction direction) +{ + bool had_read_fd= (m_read_fd >= 0); + m_errno= 0; + if (direction == Direction::unspecified) direction= Direction::both; + if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0) + { // special case: half closed (socket) connections... + // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part + // (read_fd or write_fd) this class works as desired. + switch(direction) + { + case Direction::in: + { + int res= ::shutdown(m_read_fd, SHUT_RD); + if (res<0) + { + m_errno= errno; + } + m_read_fd= -1; + if (!m_eof) + { + for(FilterChain::iterator it= m_filter_chain.begin(); + it != m_filter_chain.end(); + ++it) + { + (*it)->endOfIncomingData(); + } + } + } + return; + + case Direction::out: + { + int res= ::shutdown(m_write_fd, SHUT_WR); + if (res<0) + { + m_errno= errno; + } + m_write_fd= -1; + m_output_buffer.clear(); + } + return; + } + } + if (m_write_fd >= 0 && (direction & Direction::out) ) + { + int res1 = ::close(m_write_fd); + if (m_write_fd == m_read_fd) + { + m_read_fd= -1; + } + m_write_fd= -1; + m_output_buffer.clear(); + if (res1<0) m_errno= errno; + } + if (m_read_fd >=0 && (direction & Direction::in) ) + { + int res1 = ::close(m_read_fd); + m_read_fd= -1; + if (res1<0) m_errno= errno; + } + if (had_read_fd && !m_eof && (m_read_fd<0)) + { + for(FilterChain::iterator it= m_filter_chain.begin(); + it != m_filter_chain.end(); + ++it) + { + (*it)->endOfIncomingData(); + } + } +} // eo IOImplementation::close + + +/** + * determines if the io class wants to read data. + * Default implementation checks only for a valid file descriptor value. + * + * @return @a true if the objects wants to read data + */ +bool IOImplementation::wantRead() +{ + return (m_read_fd >= 0) && ! m_eof; +} // eo IOImplementation::wantRead + + +/** + * determines if the io class wants to write data. + * Default implementation checks for a valid file descriptor value and if the object + * cannot write data immediately. + * + * @return @a true if the objects wants to write data + */ +bool IOImplementation::wantWrite() +{ + return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable; +} // eo IOImplementation::wantWrite + + +/** + * delivers if opened. + * The default returns @a true if at least one file descriptor (read or write) is valid. + * @return @a true if opened. + */ +bool IOImplementation::opened() const +{ + return (m_read_fd>=0) || (m_write_fd>=0); +} // eo IOImplementation::opened() const + + +/** + * returns if the read side detected an end of file (EOF). + * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid). + */ +bool IOImplementation::eof() const +{ + return (m_read_fd < 0) || m_eof; +} // eo IOImplementatio::eof() const + + +/** + * @brief returns of the write side didn't detect that it cannot write. + * @return @a true if we can write. + */ +bool IOImplementation::writable() const +{ + return (m_write_fd >=0 ) and not m_not_writable; +} // eo IOImplementation::writable() const + + +/** + * returns if the output buffer is empty. + * @return + */ +bool IOImplementation::empty() const +{ + return m_output_buffer.empty(); +} // eo IOImplementation::empty + +/** + * puts data into the output buffer and sends it immediately if possible, + * + * The data is passed through the filter chain before it's stored in the output buffer + * (i.e. the output buffer contains data as it should be send directly to the descriptor). + * @param _data the data which should be send. + */ +void IOImplementation::lowSend(const std::string& _data) +{ + std::string data(_data); + + for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin(); + it_filter!= m_filter_chain.rend(); + ++it_filter) + { + data= (*it_filter)->filterOutgoingData(data); + } + m_output_buffer+= data; + + // if we can send immediately, do it: + if (! m_output_buffer.empty() && m_marked_for_writing) + { + doWrite(); + } +} // eo IOImplementation::lowSend + + +/** + * called by the backend when there is data to read for this object. + * + * Reads the data from the connection (read file descriptor) and passes the data through the filter chain. + * The final data is appended to the input buffer and the signal @a m_signal_read() is called. + * + * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called. + * + * @note overload this method only when You know what You are doing! + * (overloading is necessary when handling server sockets.) + */ +void IOImplementation::doRead() +{ + // static read buffer; should be ok as long as we don't use threads + static char buffer[8*1024]; // 8 KiB + + m_errno = 0; + if (m_read_fd<0 || !m_marked_for_reading) + { + ODOUT("exit0; read_fd="< " << count); + + // interpret what we got: + if (count < 0) // error + { + m_errno = errno; + int fd= m_read_fd; + + switch(m_errno) + { + case EINVAL: + case EBADF: + case ECONNRESET: + case ENETRESET: + if (fd == m_read_fd) + { + close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in ); + } + break; + } + } + else if (count==0) // EOF + { + // remember the read fd: + int fd = m_read_fd; + // remember the EOF: + m_eof= true; + // signal EOF + m_signal_eof(); + // if the fd is still the same: close it. + if (fd == m_read_fd) + { + close( Direction::in ); + } + } + else // we have valid data + { + std::string data(buffer,count); + ODOUT(" got \"" << data << "\""); + for(FilterChain::iterator it_filter= m_filter_chain.begin(); + it_filter != m_filter_chain.end(); + ++it_filter) + { + data= (*it_filter)->filterIncomingData(data); + } + m_input_buffer+= data; + m_signal_read(); + } +} // eo IOImplementation::doRead + + +/** + * interface for filter classes to inject data into the filter chain (emulating new incoming data). + * @param from_filter the filter which injects the new data. + * @param _data the new data. + */ +void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data) +{ + FilterChain::iterator it_filter = + std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) ); + if (it_filter == m_filter_chain.end()) + { + // dont accept data inject from a unknown filter + return; + } + // well: pass the data through the remaining filters: + // NOTE: processing is (nearly) the same as in IOImplementation::doRead() + std::string data(_data); + for(++it_filter; + it_filter != m_filter_chain.end(); + ++it_filter) + { + data= (*it_filter)->filterIncomingData(data); + } + m_input_buffer+= data; + m_signal_read(); +} // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&) + + +/** + * interface for filter classes to inject data into the filter chain (emulating new outgoing data). + * @param from_filter the filter which injects the new data. + * @param _data the new data. + */ +void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data) +{ + FilterChain::reverse_iterator it_filter = + std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) ); + if (it_filter == m_filter_chain.rend()) + { + // dont accept data inject from a unknown filter + return; + } + // well: pass the data through the remaining filters: + // NOTE: processing is (nearly) the same as in IOImplementation::lowSend() + std::string data(_data); + for(++it_filter; + it_filter!= m_filter_chain.rend(); + ++it_filter) + { + data= (*it_filter)->filterOutgoingData(data); + } + m_output_buffer+= data; + + // if we can send immediately, do it: + if (! m_output_buffer.empty() && m_marked_for_writing) + { + doWrite(); + } +} // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&) + + +/** + * set the read file descriptor. + * Although a derived class can also set the read fd directly; this method should be used + * for this task since it updates some flags on the fd for async operation. + * @param fd the new read file descriptor. + */ +void IOImplementation::setReadFd(int fd) +{ + // test if we already have a valid descriptor (and may have to close it): + if (m_read_fd >=0 ) + { + if (m_read_fd == fd) + { + // fd was already right; consider it to be ok. + return; + } + close(Direction::in); + } + // reset our errno: + m_errno= 0; + // if the new descriptor looks valid, set some flags: + if (fd >= 0) + { + long flags= ::fcntl(fd, F_GETFL); + if (flags != -1) + { + // set the flags for non blocking, async operation + flags |= O_NONBLOCK|O_ASYNC; + ::fcntl(fd,F_SETFL, flags); + } + else if ( errno == EBADF ) + { + // well, we seemed to be fed with an invalid descriptor...: + m_errno = errno; + fd= -1; + } + } + if (fd >= 0) // if still valid: + { + // set the close-on-exec flag + ::fcntl(fd,F_SETFD, FD_CLOEXEC); + } + m_read_fd= fd; + m_marked_for_reading= false; + m_eof= false; +} // eo IOImplementation::setReadFd(int) + + + +/** + * set the write file descriptor. + * Although a derived class can also set the write fd directly; this method should be used + * for this task since it updates some flags on the fd for async operation. + * @param fd the new write file descriptor. + */ +void IOImplementation::setWriteFd(int fd) +{ + if (m_write_fd >=0 ) + { + if (m_write_fd == fd) + { + // fd was already right; consider it to be ok. + return; + } + close(Direction::out); + } + // reset our errno: + m_errno= 0; + // if the new descriptor looks valid, set some flags: + if (fd >= 0) + { + long flags= ::fcntl(fd, F_GETFL); + if (flags != -1) + { + // set the flags for non blocking, async operation + flags |= O_NONBLOCK|O_ASYNC; + ::fcntl(fd,F_SETFL, flags); + } + else if (errno == EBADF) + { + // well, we seemed to be fed with an invalid descriptor...: + m_errno = errno; + fd= -1; + } + } + if (fd >= 0) // if still valid: + { + // set the close-on-exec flag + ::fcntl(fd,F_SETFD, FD_CLOEXEC); + } + m_write_fd = fd; + m_marked_for_writing= false; + m_not_writable= false; +} // eo IOImplementation::setWriteFd(int) + + + +/** + * called by the backend when this object can write data. + * + * If some data was sended, the signal @a m_signal_write is called. + * + * @internal tries to write all buffered data to output; if this succeeds, + * the connection is assumed to be still able to accept more data. + * (i.e. the internal write mark is kept!) + * + * @note overload this method only when You know what You are doing! +*/ +void IOImplementation::doWrite() +{ + m_errno = 0; + if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty()) + { + return; + } + + ODOUT("doWrite, d=\"" << m_output_buffer << "\""); + + //reset mark: + m_marked_for_writing= false; + + // now write the data + ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size()); + + ODOUT("::write -> " << count); + + if (count < 0) // error + { + m_errno= errno; + int fd= m_write_fd; + + switch(m_errno) + { + case EPIPE: + m_not_writable= true; + // emit a signal + m_signal_not_writable(); + // fall through + case EINVAL: + case EBADF: + case ECONNRESET: + case ENETRESET: + if (fd == m_write_fd) + { + close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out ); + } + break; + } + } + else + { + m_output_buffer.erase(0, count); + if (m_output_buffer.empty()) + { + // special case: if we were able to send all the data, we keep the write mark: + m_marked_for_writing= true; + } + } + if (count > 0) + { + m_signal_write(); + } +} // eo IOImplementation::doWrite + + +/* + * implementation of SimpleIO + */ + + +SimpleIO::SimpleIO(int read_fd, int write_fd) +: inherited(read_fd, write_fd) +{ + m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this)); +} // eo SimpleIO::SimpleIO() + + +SimpleIO::~SimpleIO() +{ +} // eo SimpleIO::~SimpleIO() + + +/** + * sends a string. + * @param data the string. + */ +void SimpleIO::sendString(const std::string& data) +{ + lowSend(data); +} // eo SimpleIO::sendString(const std::string&) + + +/** + * emits the signal signalReceived with the received data. + * This slot is connected to IOImplementation::m_signal_read. + */ +void SimpleIO::slotReceived() +{ + std::string data; + data.swap(m_input_buffer); + signal_received_string(data); +} // eo SimpleIO::slotReceived() + + + +/* + * implementation of SimpleIO2 + */ + + +SimpleIO2::SimpleIO2(int read_fd, int write_fd) +: inherited(read_fd, write_fd) +{ + m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this)); +} // eo SimpleIO2::SimpleIO2() + + +SimpleIO2::~SimpleIO2() +{ +} // eo SimpleIO2::~SimpleIO2() + + +/** + * sends a string. + * @param data the string. + */ +void SimpleIO2::sendString(const std::string& data) +{ + lowSend(data); +} // eo SimpleIO2::sendString(const std::string&) + + +/** + * emits the signal signalReceived with the received data. + * This slot is connected to IOImplementation::m_signal_read. + */ +void SimpleIO2::slotReceived() +{ + std::string data; + data.swap(m_input_buffer); + signal_received_string(data); +} // eo SimpleIO2::slotReceived() + + + +/* + * implementation of class Backend (singleton) + */ + +Backend* Backend::g_backend= NULL; + +int Backend::m_count_active_steps=0; + + +Backend::Backend() +: m_count_active_loops(0) +, m_count_stop_requests(0) +{ + SystemTools::ignore_signal( SystemTools::Signal::PIPE ); +} // eo Backend::Backend + + +Backend::~Backend() +{ + SystemTools::restore_signal_handler( SystemTools::Signal::PIPE ); +} // eo Backend::~Backend() + +/** + * delivers pointer to the current backend, instantiating a new backend if there was no current one. + * + * This should be the only way to access the backend which should be a singleton. + * + * @return the pointer to the current backend. + */ +Backend* Backend::getBackend() +{ + if (!g_backend) + { + g_backend = new Backend(); + } + return g_backend; +} // eo Backend::getBackend + + + + +/** + * performs one backend cycle. + * + * Collects all file descriptors from the active io objects which should be selected for reading and/or writing. + * Also determines the timer events which become due and adjusts the timeout. + * Constructs the necessary structures and calls poll(). + * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events) + * + * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured. + * @return @a true if there was at least one active object; otherwise @a false + * + * @note this method is a little beast. + * + * @internal + * The cycle is divided into four steps: collecting; poll; mark and execute. + * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage + * are calling @a Backup::doOneStep or open their own local backend loop. + * + * @todo handle some more error cases. + * @todo provide a plugin interface for external handler. + * (currently inclusion of external handler is possible by (ab)using timer classes) + */ +bool Backend::doOneStep(int timeout) +{ + ODOUT( "timeout=" << timeout ); + internal_io::PollDataCluster poll_data; + bool had_active_object = false; + + ++m_count_active_steps; + + try { + + FdSetType local_read_fds; + FdSetType local_write_fds; + + // step 1 ; collect + + { // step 1.1: collect fds for read/write operations + for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin(); + itIOList != internal_io::g_io_list().end(); + ++itIOList) + { + if (! *itIOList) continue; // skip NULL entries + int read_fd = (*itIOList)->m_read_fd; + int write_fd = (*itIOList)->m_write_fd; + bool want_read = (read_fd >= 0) and (*itIOList)->wantRead(); + bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite(); + if (read_fd >= 0 ) + { + local_read_fds.insert( read_fd ); + } + if (write_fd >= 0) + { + local_write_fds.insert( write_fd ); + } + if (!want_read && !want_write) continue; + if (want_read) + { + FODOUT( (*itIOList), "wants to read (fd=" << read_fd << ")"); + poll_data.add_read_fd(read_fd, *itIOList); + } + if (want_write) + { + FODOUT( (*itIOList), "wants to write (fd=" << write_fd << ")"); + poll_data.add_write_fd(write_fd, *itIOList); + } + had_active_object= true; + } + } + + { // step 1.2: collect timer events + MilliTime current_time; + MilliTime min_event_time; + + get_current_monotonic_time(current_time); + bool min_event_time_set; + + if (timeout >= 0) + { + min_event_time = current_time + MilliTime(0,timeout); + min_event_time_set= true; + } + else + { + min_event_time = current_time + MilliTime(86400,0); + min_event_time_set= false; + } + // TODO + + for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin(); + it_timer != internal_io::g_timer_list().end() + && (!had_active_object || !min_event_time_set || current_time < min_event_time); + ++ it_timer) + { + if (! *it_timer) continue; // skip NULL entries + if (! (*it_timer)->m_active) continue; // skip if not enabled + if ( !min_event_time_set || (*it_timer)->m_when < min_event_time) + { + min_event_time = (*it_timer)->m_when; + min_event_time_set= true; + } + had_active_object= true; + } + + if (min_event_time_set) + { // we have at a minimal event time, so (re)compute the timeout value: + MilliTime delta= (min_event_time - current_time); + long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h + if (delta_ms <= 0L) + { + timeout= 0L; + } + else + { + timeout= delta_ms + (delta_ms<5 ? 1 : 3); + } + } + } + + // step 2 : poll + ODOUT(" poll timeout is " << timeout); + { + MilliTime current_time; + get_current_monotonic_time(current_time); + ODOUT(" current time is sec="< " << poll_result); + { + MilliTime current_time; + get_current_monotonic_time(current_time); + ODOUT(" current time is sec="<m_marked_for_reading= true; + } + } + if ( 0!= (itPollItem->revents & POLLOUT)) + { + IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ]; + if (io && io->m_write_fd==itPollItem->fd) + { + io->m_marked_for_writing= true; + } + } + if ( 0!= (itPollItem->revents & POLLERR)) + { + IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ]; + if (0!= (itPollItem->events & POLLOUT)) + { + if (io && io->m_write_fd==itPollItem->fd) + { + io->m_marked_for_writing= false; + //io->close( Direction::out ); + } + } + } + // TODO error handling (POLLERR, POLLHUP, POLLNVAL) + } + } + + //Step 3.2: mark timer objects + { + MilliTime current_time; + + get_current_monotonic_time(current_time); + ODOUT(" current time is sec="<m_active) continue; // skip if not enabled + if ( (*it_timer)->m_when <= current_time) + { + ODOUT(" ==> MARK"); + (*it_timer)->m_marked = true; + } + } + } + + + // step 4 : execute + + // step 4.1: execute io + ODOUT("execute stage"); + for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin(); + it_io != internal_io::g_io_list().end(); + ++ it_io) + { + ODOUT(" check obj " << *it_io); + if (NULL == *it_io) continue; + if ((*it_io)->m_marked_for_writing) + { + FODOUT((*it_io),"exec doWrite"); + (*it_io)->doWrite(); + if ((*it_io) == NULL) continue; // skip remaining if we lost the object + if ((*it_io)->m_errno) + { + continue; + } + } + if ((*it_io)->m_marked_for_reading) + { + FODOUT((*it_io),"exec doRead"); + (*it_io)->doRead(); + if ((*it_io) == NULL) continue; // skip remaining if we lost the object + if ((*it_io)->m_errno) + { + continue; + } + } + } + + // step 4.2: execute timer events + { + for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin(); + it_timer != internal_io::g_timer_list().end(); + ++ it_timer) + { + if (! *it_timer) continue; // skip NULL entries + if (! (*it_timer)->m_active) continue; // skip if not enabled + if (! (*it_timer)->m_marked) continue; // skip if not marked + + // reset the mark and deactivate object now since the execute() method might activate it again + (*it_timer)->m_marked= false; + (*it_timer)->m_active= false; + + // now execute the event: + (*it_timer)->execute(); + } + } + + } // eo try + catch(...) + { + // clean up our counter + --m_count_active_steps; + // and forward the exception + throw; + } + + if ( 0 == --m_count_active_steps) + { + internal_io::g_io_list().clean_list(); + internal_io::g_timer_list().clean_list(); + } + + return had_active_object; +} // eo Backend::doOneStep + + +/** + * enters a backend loop. + * + * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more + * active objects (io objects or timer objects). + */ +void Backend::run() +{ + ++m_count_active_loops; + do + { + try + { + if (!doOneStep(c_max_poll_wait)) + { + // stop if there are no more active objects. + stop(); + } + } + catch(...) + { + // clean up our counter + --m_count_active_loops; + // and forward the exception + throw; + } + } + while (0 == m_count_stop_requests); + --m_count_active_loops; + --m_count_stop_requests; +} // eo Backend::run + + +/** + * @brief stops the latest loop currently run by Backend::run(). + * @see Backend::run() + */ +void Backend::stop() +{ + if (m_count_active_loops) + { + ++m_count_stop_requests; + } +} // eo Backend::stop() + + + +} // eo namespace SimpleIo +} // eo namespace I2n diff --git a/asnycio/simpleio.hpp b/asnycio/simpleio.hpp new file mode 100644 index 0000000..d3727be --- /dev/null +++ b/asnycio/simpleio.hpp @@ -0,0 +1,463 @@ +/** + * @file + * @brief simple basic IO handling. + * + * @copyright © Copyright 2007-2008 by Intra2net AG + * @license commercial + * @contact info@intra2net.com + * + * Deals with POSIX file descriptors; provides an additional abstraction + * level above select() or poll() interface. + * Also provides basic functionality for dealing with timer events. + */ + +#ifndef __I2N_SIMPLEIO_HPP__ +#define __I2N_SIMPLEIO_HPP__ + +#include +#include +#include + +#include + +#include +#include + + +namespace I2n +{ +namespace SimpleIo +{ + + +/* + * forward declarations + */ +class Backend; +class IOImplementation; + +/* + * end of forward declarations + */ + + +/** + * direction for io operations. + */ +struct Direction +{ + enum _Direction + { + unspecified= 0, + in = 1, + out = 2, + both= 3 + } m_direction; + + Direction( _Direction direction = unspecified) : m_direction(direction) {} + + operator _Direction () const { return m_direction; } +}; // eo struct IODirection; + + +/** + * structure for storing (a point in time as) seconds and milliseconds. + */ +struct MilliTime +{ + long mt_sec; + long mt_msec; + + MilliTime(long sec=0, long msec=0); + + void set(long sec, long msec=0); + + inline long long get_milliseconds() const + { + return ((long long)mt_sec * 1000L + mt_msec); + } // eo get_milliseconds + + void normalize(); + + bool operator < (MilliTime& other); + bool operator == (MilliTime& other); + + MilliTime& operator -= (const MilliTime& lhs); + MilliTime& operator += (const MilliTime& lhs); + +}; // eo struct MilliTime + + +inline MilliTime operator + (const MilliTime& rhs, const MilliTime& lhs) +{ + MilliTime t(rhs); + return t+= lhs; +} // eo operator + (const MilliTime& rhs, const MilliTime lhs) + +inline MilliTime operator - (const MilliTime& rhs, const MilliTime& lhs) +{ + MilliTime t(rhs); + return t-= lhs; +} // eo operator - (const MilliTime& rhs, const MilliTime lhs) + + +inline bool operator <= (MilliTime& rhs, MilliTime& lhs) +{ + return (rhs PtrType; + public: + FilterBase(); + virtual ~FilterBase() {}; + + protected: + + virtual std::string filterIncomingData(const std::string& data)= 0; + virtual std::string filterOutgoingData(const std::string& data)= 0; + + virtual void endOfIncomingData(); + virtual void reset(); + + protected: + + void injectIncomingData(const std::string& data); + void injectOutgoingData(const std::string& data); + + private: + /// pointer to the io object which uses this filter: + IOImplementation *m_io; +}; // eo class FilterBase + +typedef FilterBase::PtrType FilterBasePtr; + + +/** + * identity filter; does nothing with the data (in other words: it's useless ;-) ). + */ +class FilterIdentity : public FilterBase +{ + protected: + virtual std::string filterIncomingData(const std::string& data) { return data; } + virtual std::string filterOutgoingData(const std::string& data) { return data; } +}; // eo class FilterIdentity + + +/** + * @brief null filter; deletes everything it receives. + * + * usefull when output from a subprocess (like stderr) should be ignored... + */ +class FilterNull : public FilterBase +{ + protected: + virtual std::string filterIncomingData(const std::string& data) { return std::string(); } + virtual std::string filterOutgoingData(const std::string& data) { return std::string(); } +}; // eo FilterNull + + +/** + * the base class of the IO classes. + * + * provides the functionality to read from a file desriptor and write to a file descriptor (which can be + * identical (like for socket io), but also can be different (like pipes from/to a process)). + * The data stream can be filtered through plugged filter objects which are building a filter chain. + * Incoming data is filtered forward through the chain; outgoing data is filtered backward through the chain. + * (So the first filter is the one which modifies the data closest to the connections). + * + * @note the functionality is provided in conjunction with the @a Backend class which handles parts of + * the low level IO. + * + * @note this is a base class; it provides most "interesting" functionality in the protected section only. + * This way, derived classes can decide which of that functionality they want to export in their own public + * interfaces and which to keep hidden. + */ +class IOImplementation +: public boost::signals::trackable +, virtual public SharedBase +{ + friend class Backend; + friend class FilterBase; + + public: + + typedef std::list< FilterBasePtr > FilterChain; + + typedef boost::signal< void() > SignalType; + + typedef boost::shared_ptr< IOImplementation > PtrType; + + public: + IOImplementation(int read_fd=-1, int write_fd=-1); + virtual ~IOImplementation(); + + virtual void close(Direction direction = Direction::both); + + virtual bool wantRead(); + virtual bool wantWrite(); + + virtual bool opened() const; + virtual bool eof() const; + virtual bool writable() const; + virtual bool empty() const; + + protected: + + void addFilter(FilterBasePtr filter); + void removeFilter(FilterBasePtr); + + + void lowSend(const std::string& data); + + std::string::size_type getOutputBufferSize() const { return m_output_buffer.size(); } + + void setWriteFd(int fd); + void setReadFd(int fd); + + inline int readFd() const + { + return m_read_fd; + } + + inline int writeFd() const + { + return m_write_fd; + } + + inline bool isMarkedForReading() const { return m_marked_for_reading; } + inline bool isMarkedForWriting() const { return m_marked_for_writing; } + + + protected: + virtual void doRead(); + virtual void doWrite(); + + void resetReadMark() { m_marked_for_reading= false; } + void resetWriteMark() { m_marked_for_writing= false; } + + void injectIncomingData(FilterBasePtr from_filter, const std::string& _data); + void injectOutgoingData(FilterBasePtr from_filter, const std::string& _data); + + protected: + /// last error number + int m_errno; + /// the input buffer (i.e. the data read from @a m_read_fd) + std::string m_input_buffer; + + /// the chain of filter which are applied to the data received are the data which should be send. + FilterChain m_filter_chain; + + /// signal which is fired when end of file is detected + SignalType m_signal_eof; + /// signal which is fired when write is no longer possible + SignalType m_signal_not_writable; + /// signal which is fired when new data was read + SignalType m_signal_read; + /// signal which is fired when data was written + SignalType m_signal_write; + + /// end of file (on @a m_read_fd) detected (used additionally when m_read_fd is valid) + bool m_eof; + + /// unable-to-write (on @a m_write_fd) detected (used additionally when m_write_fd is valid) + bool m_not_writable; + + private: + /// the file descriptor to read from (-1 if none is given) + int m_read_fd; + /// the file descriptor to write to (-1 if none is given) + int m_write_fd; + /// output buffer; contains the data which needs to be written. + std::string m_output_buffer; + + private: + /// @a true when data is available to be read + bool m_marked_for_reading; + /// @a true when data can be written + bool m_marked_for_writing; + +}; // eo class IOImplementation + + + +/** + * same as IOImplementation, but makes fd access functions public. + */ +class IOImplementation2 : public IOImplementation +{ + typedef IOImplementation inherited; + public: + IOImplementation2(int read_fd=-1, int write_fd=-1) : inherited(read_fd, write_fd) {} + + void setWriteFd(int fd) { inherited::setWriteFd(fd); } + void setReadFd(int fd) { inherited::setReadFd(fd); } + + int readFd() const { return inherited::readFd(); } + int writeFd() const { return inherited::writeFd(); } + +}; // eo class IOImplementation2 + + +/** + * provides sending data and receiving data via a signal. + * + * @note the received data is passed as parameter to the signal and no longer stored in the received buffer. + */ +class SimpleIO : public IOImplementation +{ + typedef IOImplementation inherited; + public: + SimpleIO(int read_fd=-1, int write_fd=-1); + virtual ~SimpleIO(); + + void sendString(const std::string& data); + + boost::signal signal_received_string; + + private: + + void slotReceived(); +}; // eo class SimpleIO + + +/** + * provides sending data and receiving data via a signal. + * + * @note the received data is passed as parameter to the signal and no longer stored in the received buffer. + */ +class SimpleIO2 : public IOImplementation2 +{ + typedef IOImplementation2 inherited; + public: + SimpleIO2(int read_fd=-1, int write_fd=-1); + virtual ~SimpleIO2(); + + void sendString(const std::string& data); + + boost::signal signal_received_string; + + private: + + void slotReceived(); +}; // eo class SimpleIO2 + + +/** + * provides the backend for io handling. + * + * This (singleton) object provides the management of io events. It collects all wishes for reading/writing + * from the io objects and information from the timer events. + * It poll()s for the events and distributes them to the objects, + * + * The class provides the possibility for executing one io cycle (/step) or to run a backend loop. + * + * @note the Backend class needs to be a friend of IOImplementation since it accesses private members + * of IOImplementation while performing the io cycles. + */ +class Backend +{ + public: + typedef std::set< int > FdSetType; + + public: + + bool doOneStep(int ms_timeout= -1); + void run(); + void stop(); + + protected: + Backend(); + Backend(const Backend& other); + ~Backend(); + + protected: + + /// the number of currently active backend loops + int m_count_active_loops; + /// the number of pending stop requests (where each one should exit one active backend loop) + int m_count_stop_requests; + + /// the number of currently active backend cycles(/ steps) + static int m_count_active_steps; + + public: + static Backend* getBackend(); + + protected: + /// pointer to the active backend (which is delivered by Backend::getBackend) + static Backend* g_backend; +}; // eo class Backend + + + +/* +** tool functions: +*/ + + +void get_current_real_time(MilliTime& mt); +void get_current_monotonic_time(MilliTime& mt); + + +} // eo namespace SimpleIo +} // eo namespace I2n + +#endif diff --git a/asnycio/simplepipe.cpp b/asnycio/simplepipe.cpp new file mode 100644 index 0000000..4eba4bc --- /dev/null +++ b/asnycio/simplepipe.cpp @@ -0,0 +1,98 @@ +/** @file + * + * (c) Copyright 2007 by Intra2net AG + * + * info@intra2net.com + */ + +#include "simplepipe.hpp" + +#include +#include + +#include +#include +#include +#include + + +namespace I2n +{ +namespace SimpleIo +{ + + +/* + * Implementation of SimplePipe + */ + +SimplePipe::SimplePipe() +{ + m_signal_read.connect(boost::bind(&SimplePipe::slotReceived,this)); +} // eo SimplePipe::SimplePipe() + + +SimplePipe::~SimplePipe() +{ +} // eo SimplePipe::~SimplePipe() + + +/** + * makes a pipe. + * This method connects itself with a newly created peer object with a bidirectional pipe. + * @return the peer pipe object. + */ +bool SimplePipe::makePipe(SimplePipe& peer) +{ + close(); // just in case... + + int fds[2]; + + int res= ::socketpair( AF_UNIX, SOCK_STREAM, 0, fds); + + if (res) + { + m_errno= errno; + return false; + } + else + { + m_errno= 0; + } + + peer.close(); // just in case + + setWriteFd(fds[0]); + setReadFd(fds[0]); + + peer.setWriteFd(fds[1]); + peer.setReadFd(fds[1]); + + return true; +} // eo SimplePipe.:makePipe() + + +/** + * sends a string through the pipe. + * @param data the data which should be sent to the other side. + */ +void SimplePipe::sendString(const std::string& data) +{ + lowSend(data); +} // eo SimplePipe::sendString(const std::string&) + + +/** + * emits the signal signalReceived with the received data. + * This slot is connected to IOImplementation::m_signal_read. + */ +void SimplePipe::slotReceived() +{ + std::string data; + data.swap(m_input_buffer); + signal_received_string(data); +} // eo SimplePipe::slotReceived() + + +} // eo namespace SimpleIo +} // eo namespace I2n diff --git a/asnycio/simplepipe.hpp b/asnycio/simplepipe.hpp new file mode 100644 index 0000000..80b7cc6 --- /dev/null +++ b/asnycio/simplepipe.hpp @@ -0,0 +1,44 @@ +/** @file + * + * (c) Copyright 2007 by Intra2net AG + * + * info@intra2net.com + */ + +#ifndef _SIMPLEIO_SIMPLEPIPE_HPP_ +#define _SIMPLEIO_SIMPLEPIPE_HPP_ + +#include "simpleio.hpp" + +namespace I2n +{ +namespace SimpleIo +{ + + +class SimplePipe : public IOImplementation +{ + public: + SimplePipe(); + virtual ~SimplePipe(); + + bool makePipe(SimplePipe& peer); + + void sendString(const std::string& data); + + boost::signal signal_received_string; + + protected: + + private: + + void slotReceived(); + +}; // eo SimplePipe + + +} // eo namespace SimpleIo +} // eo namespace I2n + + +#endif diff --git a/asnycio/simpleprocess.cpp b/asnycio/simpleprocess.cpp new file mode 100644 index 0000000..8e3754d --- /dev/null +++ b/asnycio/simpleprocess.cpp @@ -0,0 +1,817 @@ +/** @file + * + * + * (c) Copyright 2007-2008 by Intra2net AG + * + * info@intra2net.com + */ + +//#define NOISEDEBUG + +#include "simpleprocess.hpp" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + + +#ifdef NOISEDEBUG +#include +#include +#define DOUT(msg) std::cout << msg << std::endl +#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl +#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl +#else +#define DOUT(msg) do {} while (0) +#define FODOUT(obj,msg) do {} while (0) +#define ODOUT(msg) do {} while (0) +#endif + + +namespace +{ + +using namespace I2n::SimpleIo; + +/** + * local configuration values + */ +namespace config +{ + + /// the capacity of the child status list (/ vector) + const unsigned int pid_pool_capacity= 512; + +} // eo namespace config + + + +/// the previous handler for the child signal (SIGCHLD) +void (*oldChildHandler)(int) = NULL; + +/// method pointer for activating process manager +void (ProcessManager::*_activate_manager)(); + +PidStateList pending_pid_states; + + +/** + * signal handler for child signal (SIGCHLD) + * @param sig the signal number as provided by the OS + */ +void handleSigChild(int sig) +{ + int status; + pid_t pid; + while ( (pid = waitpid(-1,&status,WNOHANG)) > 0) + { + pending_pid_states.push_back( PidStatePair(pid,status) ); + } + if (_activate_manager) + { + // tricky way to access a protected method without being a (official) friend: + ( ProcessManager::getInstance()->*_activate_manager)(); + } + //TODO: ? + signal(sig,handleSigChild); +} // eo handleSigChild + + +namespace process +{ + +typedef std::pair PidProcPair; +typedef std::list< PidProcPair > PidProcList; + + +template< typename F, typename S > +struct CmpFirst +{ + F _f; + CmpFirst ( F f ) : _f(f) {} + bool operator () ( const std::pair& v ) const { return v.first == _f; } +}; // eo struct CmpFirst + + +std::list g_process_list; +PidProcList g_pid_list; + + +void addProcessInstance( ProcessImplementation* obj ) +{ + g_process_list.push_back(obj); +} // eo addProcessInstance(ProcessImplementation*) + + +void removeProcessInstance( ProcessImplementation* obj ) +{ + // remove obj from list + g_process_list.remove(obj); + // clear pointers in pid list + for(PidProcList::iterator it= g_pid_list.begin(); + it != g_pid_list.end(); + ++it) + { + if (it->second == obj) + { + it->second= NULL; + } + } +} // eo removeProcessInstance(ProcessImplementation*) + + +void addChildProcess( pid_t pid, ProcessImplementation* obj) +{ + g_pid_list.push_back ( PidProcPair(pid,obj) ); +} // eo addChildProcess(pid_t,ProcessImplementation*) + + +void removeChildProcess ( pid_t pid, ProcessImplementation* obj) +{ + PidProcList::iterator it= std::find( + g_pid_list.begin(), g_pid_list.end(), + PidProcPair(pid,obj)); + if (it != g_pid_list.end()) + { + g_pid_list.erase(it); + } +} // eo removeChildProcess(pid_t,ProcessImplementation*) + + +bool findChildProcess ( pid_t pid, ProcessImplementation* & obj ) +{ + PidProcList::iterator it = std::find_if( + g_pid_list.begin(), g_pid_list.end(), + CmpFirst(pid) ); + if (it == g_pid_list.end()) + { + return false; + } + obj = it->second; + return true; +} // eo findChildProcess(pid_t,ProcessImplementation*&) + + +} // eo namespace process + + + + + +/* +** misc tools +*/ + + +/** + * convenience tool for closing file descriptors... + */ +struct FdCloser +{ + int m_fd; + + FdCloser(int fd=-1) : m_fd(fd) {} + + ~FdCloser() + { + if (m_fd >= 0) ::close(m_fd); + } + + void release() { m_fd= -1; } + +}; // eo struct FdCloser + + + +} // eo namespace + + +namespace I2n +{ +namespace SimpleIo +{ + + +/* + * global functions + */ + +/** + * installs the handler for the child signal (SIGCHLD). + * Installing this handler is mandatory for the process subsystem to work correctly. + * @return @a true iff the child handler is successfully installed. + */ +bool installChildHandler() +{ + if (oldChildHandler) + { + // already installed + return true; + } + if (! ProcessManager::getInstance() ) + { + // we need an instance of the process manager + return false; + } + pending_pid_states.reserve( config::pid_pool_capacity ); + oldChildHandler = signal( Signal::CHLD, handleSigChild ); + if (oldChildHandler == SIG_ERR) + { + oldChildHandler= NULL; + return false; + } + return true; +} // eo installChildHandler + + +/** + * uninstalls the child handler. + * @return @a true iff the old child handler is reestablished. + */ +bool restoreChildHandler() +{ + if (!oldChildHandler) + { + return false; + } + void(*res)(int) = signal( Signal::CHLD, oldChildHandler); + + if (res == SIG_ERR) + { + return false; + } + oldChildHandler= NULL; + return true; +} // eo restoreChildHandler + + + + +/* + * Implementation of ProcessImplementation + */ + +IOImplementation2* ProcessImplementation::_StderrOnStdout = ((IOImplementation2*) 1); +IOImplementation2* ProcessImplementation::_UseParentsStderr = ((IOImplementation2*) 0); + + +/** + * @brief constructor for the process implementation. + * + * the constructor takes the path to the executable and (initial) cli arguments. + * + * @param path path to the executable. + * @param args initial command line arguments. + */ +ProcessImplementation::ProcessImplementation( + const std::string& path, + const std::vector& args + ) +: IOImplementation(-1,-1) +, m_path(path) +, m_nice_inc(0) +, m_create_new_session(false) +, m_pid(0) +, m_state(ProcessState::stopped) +, m_exit_code(0) +{ + m_args.push_back(path); + std::copy( args.begin(), args.end(), std::back_inserter(m_args) ); + process::addProcessInstance(this); +} // eo ProcessImplementation::ProcessImplementation(const std::string&) + + +ProcessImplementation::~ProcessImplementation() +{ + if (m_pid > 0 && m_state!=ProcessState::stopped) + { + stopProcess(true); + } + process::removeProcessInstance(this); +} // eo ProcessImplementation::~ProcessImplementation() + + +void ProcessImplementation::close(Direction direction) +{ + inherited::close(direction); + if (!inherited::opened() && (m_state != ProcessState::stopped) ) + { + stopProcess(false); + } +} // eo ProcessImplementation::close(Direction) + + +/** + * returns an object for adding new arguments to the argument list. + * @return the adder object. + */ +PushBackFiller ProcessImplementation::getArgAdder() +{ + return PushBackFiller(m_args); +} // eo ProcessImplementation::getArgAdder() + + +/** + * @brief set if the process should create a new session when started. + * @param enable determine if the process should start a new session. + * @return @a true iff the value of enable was accepted. + * + * If the process is already running, a new value is not accepted. + */ +bool ProcessImplementation::setCreateNewSession( bool enable ) +{ + if (m_state != ProcessState::stopped and enable != m_create_new_session) + { + return false; + } + m_create_new_session= enable; + return true; +} // eo ProcessImplementation::setCreateNewSession(bool); + + +/** + * @brief sets a new nice increment. + * @param nice the desired nice increment. + * @return @a true if the value was accepted and - in case the process was already started - + * the nice value was successfully changed. + */ +bool ProcessImplementation::setNice(int nice) +{ + errno= 0; + if (m_state != ProcessState::stopped) + { + int delta= m_nice_inc + nice; + m_nice_inc= nice; + int res= ::nice(delta); + if (res == -1 and errno !=0 ) + { + return false; + } + } + else + { + m_nice_inc = nice; + } + return true; +} // eo ProcessImplementation::setNice(int) + + +/** + * @brief sets the work dir the process should be started with. + * @param workdir the workdir + * @return @a true if the new workdir was accepted. + * + * The method will return @a false if the process is already started. + * The workdir can only be set before the process is started. + */ +bool ProcessImplementation::setWorkDir(const std::string& workdir) +{ + if ( m_state != ProcessState::stopped and workdir != m_workdir) + { + return false; + } + if (not workdir.empty()) + { + I2n::Stat stat(workdir); + if (not stat or not stat.is_directory()) + { + return false; + } + } + m_workdir= workdir; + return true; +} // eo ProcessImplementation::setWorkDir(const std::string&) + + +/** + * @brief sets new arguments for the process (the path to the binary is kept). + * + * @param args the new cli arguments for the subprocess (replacing the old ones). + */ +void ProcessImplementation::resetArgs( const std::vector< std::string >& args ) +{ + if (m_args.size() > 1) + { + m_args.erase( ++m_args.begin(), m_args.end()); + } + std::copy( args.begin(), args.end(), std::back_inserter(m_args) ); +} // eo ProcessImplementation::resetArgs(const std::vectors< std::string >&) + + +/** + * starts the new process. + * provides pipes for sending data to/ receiving data from the new process. + * Basically forks and execs the new process. + * + * @param stderr if not NULL the given object will be connected to stderr of the new process. + * The object can then be used for reading the data from the process' stderr; but cannot be written to. + * (The object will be closed if it was open). + * If the constant @a ProcessImplementation::StderrOnStdout is passed then stderr of the new process will + * be written to the same channel as stdout (i.e. can be read from the process class instance like the + * normal output). + * If NULL then the stderr channel from the parent process will also be used by the child. + * @return @a true iff the new subprocess started. + */ +bool ProcessImplementation::startProcess( IOImplementation2 *stderr ) +{ + bool stderr2stdout= false; + m_errno = 0; + m_input_buffer.clear(); + if (m_pid > 0 && m_state != ProcessState::stopped) + { + // process still/already running... + return false; + } + m_exit_code= 0; + + if (stderr == _StderrOnStdout) + { + stderr2stdout= true; + stderr= NULL; + } + + int to_process_pipe[2]; + int from_process_pipe[2]; + int from_process_stderr_pipe[2]= { -1, -1 }; + + if ( ::pipe(to_process_pipe) ) + { + m_errno= errno; + return false; + } + FdCloser closeTo0( to_process_pipe[0] ); + FdCloser closeTo1( to_process_pipe[1] ); + if ( ::pipe (from_process_pipe) ) + { + m_errno= errno; + return false; + } + FdCloser closeFrom0( from_process_pipe[0] ); + FdCloser closeFrom1( from_process_pipe[1] ); + if (stderr) + { + if (stderr->opened()) stderr->close(); + if ( ::pipe (from_process_stderr_pipe) ) + { + m_errno= errno; + return false; + } + } + FdCloser closeFromErr0( from_process_stderr_pipe[0] ); + FdCloser closeFromErr1( from_process_stderr_pipe[1] ); + + m_pid = ::fork(); + + if ( m_pid == (pid_t)-1 ) + { + m_errno= errno; + m_pid= 0; + // error; something went wrong + return false; + } + else if (m_pid > 0) + { + // we are in the parent part + + // keep the fd's we need and (later) close the other ones: + closeTo1.release(); // don't close this fd! + setWriteFd(to_process_pipe[1]); + closeFrom0.release(); // don't close this fd! + setReadFd(from_process_pipe[0]); + + if (stderr) + { + closeFromErr0.release(); // don't close this fd! + stderr->setReadFd(from_process_stderr_pipe[0]); + } + + m_state= ProcessState::running; + process::addChildProcess(m_pid,this); + DOUT(" started child with pid " << m_pid); + return true; + } + else // pid > 0 + { + // we are in the child part + + // dup the fd's for stdin/-out/-err into place: + ::dup2(to_process_pipe[0],0); + ::dup2(from_process_pipe[1],1); + if (stderr) + { + ::dup2(from_process_stderr_pipe[1],2); + ::close(from_process_stderr_pipe[0]); ::close(from_process_stderr_pipe[1]); + } + else if (stderr2stdout) + { + ::dup2(from_process_pipe[1],2); + } + // close what we don't need: + ::close(to_process_pipe[0]); ::close(to_process_pipe[1]); + ::close(from_process_pipe[0]); ::close(from_process_pipe[1]); + + // set workdir if requested: + if (not m_workdir.empty()) + { + int r= ::chdir( m_workdir.c_str() ); + if (r !=0 ) + { + //TODO? + exit(255); + } + } + + // + // collect args: + char **argv= new char*[m_args.size()+1]; + int i=0; + for(std::vector::iterator it= m_args.begin(); + it != m_args.end(); + ++it,++i) + { + argv[i]= strdup( it->c_str() ); + } + argv[i]= NULL; + // update nice level: + if (m_nice_inc) + { + nice(m_nice_inc); + } + // create a new session id if requested: + if (m_create_new_session) + { + setsid(); + } + // execute: + execv(m_path.c_str(), argv); + // exit if exec failed + exit(255); + //cleanup! ... just joking; we exec or we exit, in either case the system cleans + // everything which needs to be cleaned up. + } + return false; // keep the compiler happy... +} // eo ProcessImplementation::startProcess() + + +/** + * convenience method for starting the child process. + * This method uses predefined enum values for the stderr handling mode. + * + * @param stderr_mode the desired stderr mode. + * @return @a true iff the child process was created. + */ +bool ProcessImplementation::startProcess( ProcessImplementation::StderrMode stderr_mode ) +{ + switch (stderr_mode) + { + case UseParentsStderr: + return startProcess( _UseParentsStderr ); + + case StderrOnStdout: + return startProcess( _StderrOnStdout ); + } + return false; +}; // eo ProcessImplementation::startProcess(ProcessImplementation::StderrMode) + + +/** + * stops the process. + * + * @todo think about a more intelligent handling... + */ +void ProcessImplementation::stopProcess(bool force) +{ + // TODO: do it somewhat more intelligent?! + if (force) + { + kill(Signal::KILL); + //TODO: set running state? + } + else + { + kill(Signal::TERM); + } +} // eo ProcessImplementation::stop(bool) + + + +/** + * sends a signal to the child process. + * @param signal the Signal which should be send. + * @return @a true if the signal was sent; @a false if an error occured. + */ +bool ProcessImplementation::kill(Signal signal) +{ + m_errno = 0; + if (m_pid == 0 || m_pid == (pid_t)-1) + { + m_errno= ESRCH; + return false; + } + int res = ::kill(m_pid, signal); + if (res < 0) + { + m_errno= errno; + return false; + } + if (signal == Signal::CONT && m_state == ProcessState::suspended) + { + m_state = ProcessState::running; + } + return true; +} // eo ProcessImplementation::kill(Signal) + + + +/** + * set a new child state with information gobbled by the child signal handler. + * + * @note This method should only be called by the process manager! + * + * @param pid the pid of the child process. + * @param status the new status value (as delivered by waitpid()) + */ +void ProcessImplementation::setChildState(pid_t pid, int status) +{ + DOUT("setChildState("<first; + int status = it->second; + ODOUT(" pid=" << pid << ", status=" << status); + ProcessImplementation *process_obj; + if (process::findChildProcess(pid,process_obj)) + { + ODOUT(" local managed child, process_obj="<< process_obj); + // pid found in list: + if (!WIFSTOPPED(status) +#ifdef WIFCONTINUED + && !WIFCONTINUED(status) +#endif + ) + { + // take it from list if the child exited: + process::removeChildProcess(pid,process_obj); + } + if (process_obj) + { + // give the process object a chance to handle the state change: + process_obj->setChildState(pid, status); + } + } + else + { + ODOUT("foreign child"); + // pid not found in list: + /* NOTE: in a non threaded environment this pid must be from a child process which is not + managed by this process classes; since this method is called after all setup of a child process + is done (; especially entering the new child pid into our internal lists). + */ + m_foreign_pid_states.push_back(*it); + } + } + + // handle the foreign childs: + { + /* idea: + * fetch a (pid,status) from the list, erase it (to avoid reentrance problems) + * and fire the signal. If someone forks childs outside this module then he can + * connect to the signal and receive all necessary status information gobbled by + * our child handler. + */ + while (! m_foreign_pid_states.empty()) + { + PidStateList::iterator it= m_foreign_pid_states.begin(); + pid_t pid = it->first; + int status = it->second; + m_foreign_pid_states.erase(it); + m_foreign_child_state_changed_signal(pid,status); + } + } + +} // eo ProcessManager::execute + + +} // eo namespace SimpleIo +} // eo namespace I2n diff --git a/asnycio/simpleprocess.hpp b/asnycio/simpleprocess.hpp new file mode 100644 index 0000000..d42c755 --- /dev/null +++ b/asnycio/simpleprocess.hpp @@ -0,0 +1,198 @@ +/** @file + * + * simple process handling based on simple io classes. + * + * (c) Copyright 2007-2008 by Intra2net AG + * + * info@intra2net.com + */ + +#ifndef _CONND_SIMPLEPROCESS_HPP_ +#define _CONND_SIMPLEPROCESS_HPP_ + +#include +#include + +#include + +#include +#include +#include "simpleio.hpp" + + +namespace I2n +{ +namespace SimpleIo +{ + +using SystemTools::Signal; +using SystemTools::ScopedSignalBlocker; + + +class ProcessManager; + + +typedef std::pair< pid_t, int > PidStatePair; +typedef std::vector< PidStatePair > PidStateList; + + +/** + * represents process states. + */ +struct ProcessState +{ + enum _ProcessState + { + stopped = 0, + running, + suspended + }; // eo enum _ProcessState + + _ProcessState m_state; + + ProcessState(_ProcessState _state = stopped) : m_state(_state) {} + + operator _ProcessState() const { return m_state; } +}; // eo struct ProcessState + + +/** + * specialisation of the io implementation class which fork/exec's a subprocess and + * connects with the new child's stdin/stdout. + * + * @note the signal @a IOImplementation::m_signal_eof of the base class can be used to detect when the + * new child closes it's stdout (which usually means that the child ended). + */ +class ProcessImplementation : public IOImplementation +{ + typedef IOImplementation inherited; + + friend class ProcessManager; + + public: + + enum StderrMode + { + /// "magic" constant to pass to start() when childs stderr should be the same as parents stderr. + UseParentsStderr= 0, + /// "magic" constant to pass to start() when stderr should be the same as stdout for the new process. + StderrOnStdout + }; // eo enum StderrMode + + public: + ProcessImplementation( + const std::string& path, + const std::vector& args = std::vector()); + virtual ~ProcessImplementation(); + + virtual void close(Direction direction = Direction::both); + + virtual bool startProcess( IOImplementation2* stderr ); + bool startProcess( StderrMode stderr_mode = UseParentsStderr ); + + virtual void stopProcess(bool force=false); + + PushBackFiller getArgAdder(); + + bool setCreateNewSession( bool enable= true); + + bool setNice(int nice); + + bool setWorkDir(const std::string& workdir); + + void resetArgs( const std::vector< std::string >& args = std::vector< std::string >() ); + + /// returns the current process state + ProcessState processState() const { return m_state; } + + ///returns the exit code of the process (if in stopped state) + int exitCode() const { return m_exit_code; } + + + protected: + + bool kill(const Signal signal); + + void setChildState(pid_t pid, int status); + + protected: + /// the path to the binary + std::string m_path; + /// argument list (starting with argv0, usually the name of the binary) + std::vector m_args; + /// increment of the nice level when the new child is started + int m_nice_inc; + /// determines if the child should start a new session. + bool m_create_new_session; + /// determines the workdir where the child process should be started with. + std::string m_workdir; + + /// the pid of the child process + pid_t m_pid; + /// the state of the child process + ProcessState m_state; + /// the exit code of the child (-1 if not available yet) + int m_exit_code; + + /// signal which is fired when the child terminated + SignalType m_signal_terminated; + + + /// "magic" constant to pass to start() when childs stderr should be the same as parents stderr. + static IOImplementation2* _UseParentsStderr; + /// "magic" constant to pass to start() when stderr should be the same as stdout for the new process. + static IOImplementation2* _StderrOnStdout; + + private: + +}; // eo class ProcessImplementation + + +/** + * manages overall process related stuff. + * + * @note this class is implemented as a singleton. + * @note this class uses the io timer interface to be called within the backend loops when necessary. + */ +class ProcessManager : public TimerBase +{ + public: + + static ProcessManager* getInstance(); + + protected: + ProcessManager(); + ProcessManager(const ProcessManager&); + + virtual void execute(); + + void activateMe(); + + public: + + /** + * the signal which is fired when waitpid() returns a status for a child process + * which is not managed by this process subsystem. + * Another module which forks child processes can connect to this signal to receive + * the information when these child processes are terminated. + */ + boost::signal m_foreign_child_state_changed_signal; + + protected: + + static ProcessManager *the_instance; + + PidStateList m_foreign_pid_states; + + private: +}; + + +bool installChildHandler(); +bool restoreChildHandler(); + + +} // eo namespace SimpleIo +} // eo I2n + +#endif diff --git a/asnycio/simplesocket.cpp b/asnycio/simplesocket.cpp new file mode 100644 index 0000000..def93c0 --- /dev/null +++ b/asnycio/simplesocket.cpp @@ -0,0 +1,397 @@ +/** @file + * + * (c) Copyright 2008 by Intra2net AG + * + * info@intra2net.com + * + * @todo unlink unix server socket on close. + */ + +#include "simplesocket.hpp" +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace I2n +{ +namespace SimpleIo +{ + + +namespace +{ + +struct sockaddr_un dummy_un; + +union MegaAddr { + struct sockaddr m_addr; + struct sockaddr_in m_addr_in; + struct sockaddr_un m_addr_un; // NOTE (historically) too small... + // storage is large enough to hold all sockaddr_* variants with the (historical) exception of _un ! + struct sockaddr_storage m_addr_store; + // a char array large enough to hold _un (with an path up to the maximum allowed size!) + // (the +1 is added for a later 0-termination of the path) + char m_buffer[ sizeof(dummy_un) - sizeof(dummy_un.sun_path) + PATH_MAX + 1 ]; +}; + + +} // eo namespace + + + +/* +** implementation of ServerSocketBaseImplementation +*/ + + +ServerSocketBaseImplementation::ServerSocketBaseImplementation() +: IOImplementation() +{ +} // eo ServerSocketBaseImplementation::ServerSocketBaseImplementation() + + +/** + * @brief handled incoming connections on the server port. + * + * accepts the new connection, stores the peer address in an internal buffer + * and calls a (derived) acceptNewConnection method to create an apropriate + * IO class instance. + * If no io instance is created the connection is closed. + */ +void ServerSocketBaseImplementation::doRead() +{ + MegaAddr addr; + socklen_t addrlen = sizeof(addr); + + // reset errno: + m_errno= 0; + + // reset the mark: + resetReadMark(); + + int fd = ::accept( readFd(), &addr.m_addr, &addrlen); + if (fd < 0) + { + // handle errors. + m_errno= errno; + switch (m_errno) + { + case EBADF: + case EINVAL: + case ENOTSOCK: + // should not happen... + close(); + break; + default:; + } + return; + } + + if (addrlen < sizeof(addr)) + { + // in case of unix domain socket: terminate the path! + // NOTE we are doing this here since we don't pass the length info. + addr.m_buffer[addrlen]= 0; + } + else + { + //something went terribly wrong!! + // the resulting address structure is larger than it ever could be... + ::close(fd); + return; + } + + IOImplementationPtr connection= acceptNewConnection(fd, &addr.m_addr); + if(!connection) + { + ::close(fd); + return; + } + if (m_new_connection_base_callback) + { + m_new_connection_base_callback(connection); + } +} // eo ServerSocketBaseImplementation::doRead() + + +/** + * @brief handles write events. + * + * since server sockets never ever get a write mark, something must + * be wrong and the connection is closed! + */ +void ServerSocketBaseImplementation::doWrite() +{ + // if this method is called, something went wrong... + close(); + //TODO throw something?! +} // eo ServerSocketBaseImplementation::doWrite() + + + +/** + * @brief sets a function which is called when a new connection was established. + * + * The function gets a (shared) pointer to the new connetion as parameter and is + * expected to store it when it accepts the connection. + * (Else the conenction gets deleted after the function was called.) + * + * @param func the function which hsould be called on new conenctions. + */ +void ServerSocketBaseImplementation::setNewConnectionBaseCallback( + const NewConnectionBaseCallbackFunc& func) +{ + m_new_connection_base_callback= func; +} // eo ServerSocketBaseImplementation::setNewConnectionBaseCallback(NewConnectionBaseCallbackFunc&) + + + +/** + * @brief callback for new connections. + * + * The base method always returns an empty pointer. + * + * Derived classes must override this method and do something useful with + * the passed file descriptor. + * + * @param fd the file descriptor of the new connection. + * @param addr pointer to the address structure filled from ::accept() + * @return shared pointer to the new IO class instance (; empty if not accepted) + */ +IOImplementationPtr ServerSocketBaseImplementation::acceptNewConnection( + int fd, boost::any addr) +{ + // needs to be defined in derived class! + return IOImplementationPtr(); +} // eo ServerSocketBaseImplementation::acceptNewConnection(int,boost::any) + + + +/* +** implementation of UnixIOSocket +*/ + + +UnixIOSocket::UnixIOSocket() +{ +} // eo UnixIOSocket::UnixIOSocket() + + + +UnixIOSocket::UnixIOSocket(const std::string& path) +: m_peer_pid(0) +, m_peer_uid(0) +, m_peer_gid(0) +{ + open(path); +} // eo UnixIOSocket::UnixIOSocket(const std::string&) + + +UnixIOSocket::UnixIOSocket( + int fd, const std::string& path, + unsigned int peer_pid, unsigned int peer_uid, unsigned int peer_gid) +: IOImplementation(fd,fd) +, m_path(path) +, m_peer_pid(peer_pid) +, m_peer_uid(peer_uid) +, m_peer_gid(peer_gid) +{ +} // eo UnixIOSocket::UnixIOSocket(int,const std::string&,unsigned,unsigned,unsigned) + + +/** + * @brief opens a (client) connection to an unix domain socket. + * + * @param path the path the server is listening on. + * @return @a true iff the connection was successfully opened. + */ +bool UnixIOSocket::open(const std::string& path) +{ + if (opened()) close(); + + m_errno= 0; + m_path.clear(); + + if (path.empty() || path.size() >= PATH_MAX) + { + return false; + } + + int fd= ::socket(PF_UNIX, SOCK_STREAM, 0); + if (fd<0) + { + m_errno= errno; + return false; + } + + { + MegaAddr addr; + addr.m_addr_un.sun_family= AF_UNIX; + strncpy(addr.m_addr_un.sun_path, path.c_str(), PATH_MAX); + if (::connect(fd,(sockaddr*)&addr.m_addr_un, SUN_LEN(&addr.m_addr_un)) < 0) + { + m_errno= errno; + ::close(fd); + return false; + } + } + m_path= path; + setReadFd(fd); + setWriteFd(fd); + return true; +} // eo UnixIOSocket::open(const std::string&,int) + + +/* +** implementation of UnixServerSocketBase +*/ + + +UnixServerSocketBase::UnixServerSocketBase() +{ +} // eo UnixServerSocketBase::UnixServerSocketBase + + +UnixServerSocketBase::UnixServerSocketBase(const std::string& path, int mode) +{ + open(path,mode); +} // eo UnixServerSocketBase::UnixServerSocketBase(const std::string&,int) + + +/** + * @brief opens the server part of an unix domain socket. + * + * @param path the path the new port should listen on. + * @param mode the mode for the path. + * @return @a true iff the port was successfully opened. + */ +bool UnixServerSocketBase::open(const std::string& path, int mode) +{ + if (opened()) close(); + m_errno= 0; + if (path.empty() || path.size() >= PATH_MAX) + { + return false; + } + + int fd= ::socket(PF_UNIX, SOCK_STREAM, 0); + if (fd<0) + { + m_errno= errno; + return false; + } + + { + MegaAddr addr; + addr.m_addr_un.sun_family= AF_UNIX; + strncpy(addr.m_addr_un.sun_path, path.c_str(), PATH_MAX); + ::I2n::unlink(path); // just in case... + mode_t old_mask= ::umask( (mode & 0777) ^ 0777); + if (::bind(fd,(sockaddr*)&addr.m_addr_un, SUN_LEN(&addr.m_addr_un)) < 0) + { + m_errno= errno; + ::umask(old_mask); + ::close(fd); + return false; + } + ::umask(old_mask); + } + m_path= path; + + { + int res= ::listen(fd,8); + if (res < 0) + { + m_errno= errno; + ::close(fd); + return false; + } + } + setReadFd(fd); + return true; +} // eo UnixServerSocketBase::open(const std::string&,int) + + +/** + * @brief called from base class to create a new connection instance. + * + * This method accepts only connections to unix domain sockets. + * It also tries to determine the peer pid, uid and gid. + * + * @param fd the file descriptor of a freshly accepted connection. + * @param addr conatins "pointer to struct sockaddr" + * @return @a a (shared) pointer to the new connection isntance; empty if none was + * created. + */ +IOImplementationPtr UnixServerSocketBase::acceptNewConnection(int fd, boost::any addr) +{ + struct sockaddr *addr_ptr= NULL; + try { + addr_ptr = boost::any_cast(addr); + } + catch (boost::bad_any_cast&) + { + return IOImplementationPtr(); + } + // check for the right family: + if (addr_ptr->sa_family != AF_UNIX) + { + return IOImplementationPtr(); + } + struct sockaddr_un *un_ptr = reinterpret_cast(addr_ptr); + std::string peer_path( un_ptr->sun_path ); + unsigned peer_pid=0; + unsigned peer_gid=0; + unsigned peer_uid=0; +#ifdef __linux__ + { // the linux way to get peer info (pid,gid,uid): + struct ucred cred; + socklen_t cred_len = sizeof(cred); + if (getsockopt(fd,SOL_SOCKET,SO_PEERCRED,&cred,&cred_len) == 0) + { + peer_pid= cred.pid; + peer_uid= cred.uid; + peer_gid= cred.gid; + } + } +#else +#error dont know how to determine peer info. +#endif + + UnixIOSocketPtr ptr( createIOSocket(fd, peer_path, peer_pid, peer_uid, peer_gid) ); + return ptr; +} // eo UnixServerSocketBase::acceptNewConnection(int,boost::any); + + +/** + * @brief "real" creator of the connection instance. + * + * called by UnixServerSocketBase::acceptNewConnection to create the new io instance. + * + * @param fd file descriptor for the socket + * @param path path as delivered by peer. + * @param peer_pid peer pid. + * @param peer_uid peer uid. + * @param peer_gid peer gid. + * @return (shared) pointer to the new io instance. + */ +UnixIOSocketPtr UnixServerSocketBase::createIOSocket( + int fd, const std::string& path, + unsigned int peer_pid, + unsigned int peer_uid, unsigned int peer_gid) +{ + return UnixIOSocketPtr( + new UnixIOSocket(fd, path, peer_pid, peer_uid, peer_gid) + ); +} // eo UnixServerSocketBase::createIOSocket(int,const std::string&,unsigned,unsigned,unsigned) + + + +}// eo namespace SimpleIo +}// eo namespace I2n diff --git a/asnycio/simplesocket.hpp b/asnycio/simplesocket.hpp new file mode 100644 index 0000000..745c787 --- /dev/null +++ b/asnycio/simplesocket.hpp @@ -0,0 +1,215 @@ +/** @file + * @brief socket classes for the SimpleIo framework. + * + * + * (c) Copyright 2008 by Intra2net AG + * + * info@intra2net.com + */ + +#ifndef __SIMPLEIO__SIMPLESOCKET_HPP__ +#define __SIMPLEIO__SIMPLESOCKET_HPP__ + +#include "simpleio.hpp" + +#include +#include +#include +#include +#include +#include +#include + + +namespace I2n +{ +namespace SimpleIo +{ + + +typedef boost::shared_ptr< IOImplementation > IOImplementationPtr; + + +/** + * @brief base class for server sockets. + * + * Contains all the stuff which is common for different types of server sockets. + */ +class ServerSocketBaseImplementation +: public IOImplementation +{ + public: + typedef boost::function< void(IOImplementationPtr) > NewConnectionBaseCallbackFunc; + + public: + + void setNewConnectionBaseCallback( const NewConnectionBaseCallbackFunc& func); + + protected: + ServerSocketBaseImplementation(); + + + virtual void doRead(); + virtual void doWrite(); + + virtual IOImplementationPtr acceptNewConnection(int fd, boost::any addr); + + + protected: + + NewConnectionBaseCallbackFunc m_new_connection_base_callback; + +}; // eo class ServerSocketBaseImplementation + + +typedef boost::shared_ptr< ServerSocketBaseImplementation > ServerSocketBaseImplementationPtr; + + +/* +** unix domain sockets +*/ + + +template< + class IOClass +> +class UnixServerSocket; + + +/** + * @brief spezialized IO class for unix domain sockets. + * + */ +class UnixIOSocket +: public IOImplementation +{ + public: + UnixIOSocket(); + UnixIOSocket(const std::string& path); + + bool open(const std::string& path); + + protected: + friend class UnixServerSocketBase; + friend class UnixServerSocket; + + UnixIOSocket( + int fd, const std::string& path, + unsigned int peer_pid, unsigned int peer_uid, unsigned int peer_gid); + + protected: + + std::string m_path; + unsigned int m_peer_pid; + unsigned int m_peer_uid; + unsigned int m_peer_gid; + +}; // eo class UnixIOSocket + +typedef boost::shared_ptr< UnixIOSocket > UnixIOSocketPtr; + + + +/** + * @brief spezialized server socket class for unix domain sockets. + * + */ +class UnixServerSocketBase +: public ServerSocketBaseImplementation +{ + public: + UnixServerSocketBase(); + UnixServerSocketBase(const std::string& path, int mode=0600); + + bool open(const std::string& path, int mode= 0600); + + protected: + + virtual IOImplementationPtr acceptNewConnection(int fd, boost::any addr); + + virtual UnixIOSocketPtr createIOSocket( + int fd, const std::string& path, + unsigned int peer_pid, + unsigned int peer_uid, unsigned int peer_gid); + + protected: + + std::string m_path; + +}; // eo class UnixServerSocketBase + + +/** + * @brief unix server socket class which "produces" connections of a determined type. + * + + @param IOClass the type of the connections. + */ +template< + class IOClass +> +class UnixServerSocket +: public UnixServerSocketBase +{ + BOOST_STATIC_ASSERT(( boost::is_base_of::value )); + + public: + typedef boost::shared_ptr< IOClass > IOClassPtr; + typedef boost::function< void(IOClassPtr) > NewConnectionCallbackFunc; + + public: + + UnixServerSocket() + : UnixServerSocketBase() + {} + + UnixServerSocket(const std::string& path, int mode=0600) + : UnixServerSocketBase(path,mode) + {} + + void setNewConnectionCallback( const NewConnectionCallbackFunc& func) + { + if (func) + { + UnixServerSocketBase::setNewConnectionBaseCallback( + boost::bind( + func, + boost::bind( + &UnixServerSocket::my_ptr_cast, + _1 + ) + ) + ); + } + else + { + UnixServerSocketBase::setNewConnectionBaseCallback( + NewConnectionBaseCallbackFunc() + ); + } + } + + protected: + + virtual UnixIOSocketPtr createIOSocket( + int fd, const std::string& path, + unsigned int peer_pid, + unsigned int peer_uid, unsigned int peer_gid) + { + return UnixIOSocketPtr( + new IOClass(fd, path, peer_pid, peer_uid, peer_gid) + ); + } + + static IOClassPtr my_ptr_cast(IOImplementationPtr ptr) + { + return boost::dynamic_pointer_cast(ptr); + } + +}; // eo class UnixServerSocket + + +}// eo namespace SimpleIo +}// eo namespace I2n + + +#endif diff --git a/asnycio/simpletimer.cpp b/asnycio/simpletimer.cpp new file mode 100644 index 0000000..f716981 --- /dev/null +++ b/asnycio/simpletimer.cpp @@ -0,0 +1,107 @@ +/** @file + * + * (c) Copyright 2007 by Intra2net AG + * + * info@intra2net.com + */ + +//#define NOISEDEBUG + + +#include "simpletimer.hpp" + + +#ifdef NOISEDEBUG +#include +#include +#define DOUT(msg) std::cout << msg << std::endl +#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl +#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl +#else +#define DOUT(msg) do {} while (0) +#define FODOUT(obj,msg) do {} while (0) +#define ODOUT(msg) do {} while (0) +#endif + + +namespace I2n +{ +namespace SimpleIo +{ + + +/* + * Implementation of SimpleTimer + */ + +SimpleTimer::SimpleTimer() +{ +} // eo SimpleTimer::SimpleTimer() + + +SimpleTimer::SimpleTimer(const MilliTime& delta, const TimerSignal::slot_function_type& action) +{ + addAction(action); + startTimer(delta); +} // eo SimpleTimer::SimpleTimer(const MilliTime&, const TimerSignal::slot_type&) + + +SimpleTimer::SimpleTimer(long milli_seonds_delta, const TimerSignal::slot_function_type& action) +{ + addAction(action); + startTimerMS(milli_seonds_delta); +} // eo SimpleTimer::SimpleTimer(long, const TimerSignal::slot_type&) + + +void SimpleTimer::execute() +{ + ODOUT("execute()!"); + m_timer_signal(); + m_timer_signal_p(this); +} // eo SimpleTimer::execute + + +void SimpleTimer::addAction( const TimerSignal::slot_function_type& action ) +{ + m_timer_signal.connect(action); +} // eo SimpleTimer::addAction(const TimerSignal::slot_type&) + + +void SimpleTimer::addActionP( const TimerSignalP::slot_function_type& action ) +{ + m_timer_signal_p.connect(action); +} // eo SimpleTimer::addAction(const TimerSignalP::slot_type&) + + +void SimpleTimer::startTimer( const MilliTime& delta ) +{ + m_delta= delta; + setDeltaWhenTime( delta ); + activate(true); +#ifdef NOISEDEBUG + MilliTime now, t; + get_current_time(now); + t= getWhenTime(); + MilliTime dt= t-now; + ODOUT("startTimer"); + ODOUT(" now: sec="<< now.mt_sec << ", msec="< + +namespace I2n +{ +namespace SimpleIo +{ + + +class SimpleTimer : public TimerBase +{ + public: + + typedef boost::signal TimerSignal; + typedef boost::signal TimerSignalP; + + public: + + SimpleTimer(); + + // convenience constructors for simple timeouts: + SimpleTimer(const MilliTime& delta, const TimerSignal::slot_function_type& action); + SimpleTimer(long milli_seonds_delta, const TimerSignal::slot_function_type& action); + + // add actions: + void addAction( const TimerSignal::slot_function_type& action ); + void addActionP( const TimerSignalP::slot_function_type& action ); + + // start the timer: + void startTimer( const MilliTime& delta ); + void startTimerMS( long milli_seconds ); + + // stop the timer: + void stopTimer(); + + protected: + MilliTime m_delta; + + TimerSignal m_timer_signal; + TimerSignalP m_timer_signal_p; + + virtual void execute(); +}; // eo class SimpleTimer + + +} // eo namespace SimpleIo +} // eo namespace I2n + +#endif -- 1.7.1