From 4e4b52fdfdf45fd203255a9fdd875aba39ade139 Mon Sep 17 00:00:00 2001 From: Reinhard Pfau Date: Mon, 20 Oct 2008 15:33:25 +0000 Subject: [PATCH] libasyncio: (reinhard) more moves. UNUSABLE YET. --- asnycio/Makefile.am | 13 - asnycio/libsimpleio.pc.in | 11 - asnycio/simplecallout.cpp | 336 --------- asnycio/simplecallout.hpp | 158 ----- asnycio/simpleio.cpp | 1695 --------------------------------------------- asnycio/simpleio.hpp | 463 ------------- asnycio/simplepipe.cpp | 98 --- asnycio/simplepipe.hpp | 44 -- asnycio/simpleprocess.cpp | 817 ---------------------- asnycio/simpleprocess.hpp | 198 ------ asnycio/simplesocket.cpp | 397 ----------- asnycio/simplesocket.hpp | 215 ------ asnycio/simpletimer.cpp | 107 --- asnycio/simpletimer.hpp | 62 -- asyncio/Makefile.am | 13 + asyncio/libsimpleio.pc.in | 11 + asyncio/simplecallout.cpp | 336 +++++++++ asyncio/simplecallout.hpp | 158 +++++ asyncio/simpleio.cpp | 1695 +++++++++++++++++++++++++++++++++++++++++++++ asyncio/simpleio.hpp | 463 +++++++++++++ asyncio/simplepipe.cpp | 98 +++ asyncio/simplepipe.hpp | 44 ++ asyncio/simpleprocess.cpp | 817 ++++++++++++++++++++++ asyncio/simpleprocess.hpp | 198 ++++++ asyncio/simplesocket.cpp | 397 +++++++++++ asyncio/simplesocket.hpp | 215 ++++++ asyncio/simpletimer.cpp | 107 +++ asyncio/simpletimer.hpp | 62 ++ 28 files changed, 4614 insertions(+), 4614 deletions(-) delete mode 100644 asnycio/Makefile.am delete mode 100644 asnycio/libsimpleio.pc.in delete mode 100644 asnycio/simplecallout.cpp delete mode 100644 asnycio/simplecallout.hpp delete mode 100644 asnycio/simpleio.cpp delete mode 100644 asnycio/simpleio.hpp delete mode 100644 asnycio/simplepipe.cpp delete mode 100644 asnycio/simplepipe.hpp delete mode 100644 asnycio/simpleprocess.cpp delete mode 100644 asnycio/simpleprocess.hpp delete mode 100644 asnycio/simplesocket.cpp delete mode 100644 asnycio/simplesocket.hpp delete mode 100644 asnycio/simpletimer.cpp delete mode 100644 asnycio/simpletimer.hpp create mode 100644 asyncio/Makefile.am create mode 100644 asyncio/libsimpleio.pc.in create mode 100644 asyncio/simplecallout.cpp create mode 100644 asyncio/simplecallout.hpp create mode 100644 asyncio/simpleio.cpp create mode 100644 asyncio/simpleio.hpp create mode 100644 asyncio/simplepipe.cpp create mode 100644 asyncio/simplepipe.hpp create mode 100644 asyncio/simpleprocess.cpp create mode 100644 asyncio/simpleprocess.hpp create mode 100644 asyncio/simplesocket.cpp create mode 100644 asyncio/simplesocket.hpp create mode 100644 asyncio/simpletimer.cpp create mode 100644 asyncio/simpletimer.hpp diff --git a/asnycio/Makefile.am b/asnycio/Makefile.am deleted file mode 100644 index 2694ef2..0000000 --- a/asnycio/Makefile.am +++ /dev/null @@ -1,13 +0,0 @@ -INCLUDES = @LIBI2NCOMMON_CFLAGS@ @BOOST_CPPFLAGS@ -METASOURCES = AUTO -lib_LTLIBRARIES = libsimpleio.la -libsimpleio_la_SOURCES = simplecallout.cpp simpleio.cpp simplepipe.cpp \ - simpleprocess.cpp simplesocket.cpp simpletimer.cpp -include_HEADERS = simplecallout.hpp simpleio.hpp simplepipe.hpp \ - simpleprocess.hpp simplesocket.hpp simpletimer.hpp -libsimpleio_la_LIBADD = @LIBI2NCOMMON_LIBS@ @BOOST_LDFLAGS@ @BOOST_SIGNALS_LIB@ - -libsimpleio_la_LDFLAGS = -version-info @LIBSIMPLEIO_LIB_VERSION@ - -pkgconfigdir=$(libdir)/pkgconfig -pkgconfig_DATA= libsimpleio.pc diff --git a/asnycio/libsimpleio.pc.in b/asnycio/libsimpleio.pc.in deleted file mode 100644 index a671d47..0000000 --- a/asnycio/libsimpleio.pc.in +++ /dev/null @@ -1,11 +0,0 @@ -prefix=@prefix@ -exec_prefix=@exec_prefix@ -libdir=@libdir@ -includedir=@includedir@ - -Name: libsimpleio -Description: asynchrounous io lib -Version: @VERSION@ -Requires: libi2ncommon -Libs: -L${libdir} -lsimpleio -Cflags: -I${includedir} diff --git a/asnycio/simplecallout.cpp b/asnycio/simplecallout.cpp deleted file mode 100644 index 13f5c63..0000000 --- a/asnycio/simplecallout.cpp +++ /dev/null @@ -1,336 +0,0 @@ -/** - * @file - * - * @copyright © Copyright 2008 by Intra2net AG - * @license commercial - * - * info@intra2net.com - */ - -#include "simplecallout.hpp" - -#include -#include - -#include - - -namespace I2n -{ -namespace SimpleIo -{ - -namespace -{ - -typedef boost::shared_ptr< Detail::Caller > CallerPtr; - -typedef std::map< unsigned long, CallerPtr > CallMap; - - -unsigned long l_last_id=0; - -CallMap l_call_map; - - -/** - * @brief creates a new id value for a call out. - * @return the new id. - * - * The id value is basically just a counter. - * It is implemented in a way that it can not be 0 and can deal with wrap arounds. - */ -unsigned long create_call_out_id_value() -{ - while ( l_call_map.find(++l_last_id) != l_call_map.end() and l_last_id != 0); - return l_last_id; -} // eo create_call_out_id_value - - -void add_call( CallerPtr caller ) -{ - if (caller->joinId()) - { - l_call_map[ caller->getCallOutId().getValue() ] = caller; - } -} // eo add_call - - -bool remove_call( unsigned long id_value ) -{ - CallMap::iterator it= l_call_map.find(id_value); - if (it != l_call_map.end()) - { - l_call_map.erase(it); - return true; - } - return false; -} // eo remove_call(unsigned long) - - -CallerPtr get_call(unsigned long id_value) -{ - CallMap::iterator it= l_call_map.find(id_value); - if (it != l_call_map.end()) - { - return it->second; - } - return CallerPtr(); -} // eo get_call(unsigned long) - - -bool has_call( unsigned long id_value ) -{ - CallMap::iterator it= l_call_map.find(id_value); - return (it != l_call_map.end() ); -} // eo has_call(unsigned long) - - - -} // eo namespace - - - -/* -** implementation of class CallOutId -*/ - -CallOutId::CallOutId() -: m_value(0) -{ -} // eo CallOutId::CallOutId() - - -CallOutId::CallOutId(unsigned long value) -: m_value(value) -{ -} // eo CallOutId::CallOutId(unsigned long) - - -bool CallOutId::thaw() const -{ - if (m_caller_weak_ptr.expired()) - { - return false; - } - CallerPtr call_ptr= get_call(m_value); - if (call_ptr) - { - return call_ptr->thaw(); - } - return false; -} // eo CallOutId::thaw() const - - -bool CallOutId::remove() -{ - if (m_caller_weak_ptr.expired()) - { - return false; - } - unsigned long value= m_value; - m_value= 0; - return remove_call(value); -} // eo CallOutId::remove() - - -bool CallOutId::active() const -{ - return m_value!=0 and not m_caller_weak_ptr.expired() and has_call(m_value); -} // eo CallOutId::active() const - - -/** - * @brief retruns if the call is frozen. - * @return @a true iff the call is frozen. - */ -bool CallOutId::frozen() const -{ - CallerPtr caller= m_caller_weak_ptr.lock(); - if (not caller) - { - return false; - } - return caller->frozen(); -} // eo CallOutId::frozen() const - - -/** - * @brief returns the remaining time until the call is done or thrown away (on frozen calls). - * @return the remaining time. - * - * The result only makes sense if the call is still active. - */ -MilliTime CallOutId::remaining_time() -{ - CallerPtr caller= m_caller_weak_ptr.lock(); - if ( not active() or not caller ) - { - return MilliTime(); - } - MilliTime t; - get_current_monotonic_time(t); - MilliTime result= caller->getWhenTime(); - result-= t; - MilliTime t_null; - return (result < t_null ? t_null : result); -} // eo CallOutId::remaining_time() - - -namespace Detail -{ - -/* -** implementation of class Caller -*/ - -Caller::Caller( boost::function< void() > f, long delta_sec, long delta_msec, bool frozen) -: TimerBase() -, m_call_out_id( create_call_out_id_value() ) -, m_func(f) -, m_waiting(frozen) -{ - SCOPETRACKER(); - setDeltaWhenTime( delta_sec, delta_msec); -} // eo Caller::Caller(boost::function< void() >,long) - - -Caller::~Caller() -{ - SCOPETRACKER(); -} // eo Caller::~Caller() - - -void Caller::execute() -{ - SCOPETRACKER(); - // NOTE: since the func may throw an exception, we first get a shared pointer - // (to prevent early deletion) and then we remove us from the call map. - CallerPtr ptr= shared_from_this(); - m_call_out_id.remove(); - - if (m_func and not m_waiting) - { - m_func(); // may throw..., but at this point it doesn't harm. - //( it may harm at other places,...) - } -} // eo Caller::execute() - - -bool Caller::thaw() -{ - if (m_waiting) - { - m_waiting= false; - setDeltaWhenTime( 0, 0 ); - return true; - } - return false; -} // eo Caller::thaw() - - -bool Caller::joinId() -{ - if (m_call_out_id.m_caller_weak_ptr.expired()) - { - m_call_out_id.m_caller_weak_ptr= shared_from_this(); - activate(); - return true; - } - return false; -} // eo Caller::joinId() - - -bool Caller::frozen() const -{ - return m_waiting; -} // eo Caller::frozen() const - - -} // eo namespace Detail - - - -/** - * @brief remove a pending call by id. - * - * @param id the call id which should be removed. - * @return @a true iff the call was removed, @a false if no call with the given id was found. - */ -bool removeCallOut( CallOutId& id ) -{ - return id.remove(); -} // eo removeCallOut(CallOutId&) - - - -template<> -CallOutId callOut( boost::function< void() > f, long delta_sec) -{ - CallerPtr caller( new Detail::Caller(f,delta_sec) ); - add_call(caller); - return caller->getCallOutId(); -} // eo callOut(boost::function< void() >,long) - -template<> -CallOutId callOut( boost::function< void() > f, int delta_sec) -{ - return callOut(f,delta_sec); -} // eo callOut(boost::function< void() >,int) - - -template<> -CallOutId callOut( boost::function< void() > f, double delta_sec ) -{ - long delta_sec_i = (long)delta_sec; - long delta_sec_m = (long)((delta_sec - (double)delta_sec_i)*1000.0); - CallerPtr caller( new Detail::Caller(f,delta_sec_i, delta_sec_m) ); - add_call(caller); - return caller->getCallOutId(); -} // eo callOut(boost::function< void() >,double) - - -template<> -CallOutId callOut( boost::function< void() > f, float delta_sec ) -{ - return callOut(f,delta_sec); -} // eo callOut(boost::function< void() >,float) - - - - -template<> -CallOutId frozenCall( boost::function< void() > f, long delta_sec) -{ - CallerPtr caller( new Detail::Caller(f,delta_sec, 0, true) ); - add_call(caller); - return caller->getCallOutId(); -} // eo frozenCall(boost::function< void() >,long) - -template<> -CallOutId frozenCall( boost::function< void() > f, int delta_sec) -{ - return frozenCall(f,delta_sec); -} // eo frozenCall(boost::function< void() >,int) - - -template<> -CallOutId frozenCall( boost::function< void() > f, double delta_sec ) -{ - long delta_sec_i = (long)delta_sec; - long delta_sec_m = (long)((delta_sec - (double)delta_sec_i)*1000.0); - CallerPtr caller( new Detail::Caller(f,delta_sec_i, delta_sec_m, true) ); - add_call(caller); - return caller->getCallOutId(); -} // eo frozenCall(boost::function< void() >,double) - - -template<> -CallOutId frozenCall( boost::function< void() > f, float delta_sec ) -{ - return frozenCall(f,delta_sec); -} // eo frozenCall(boost::function< void() >,float) - - -} // eo namespace SimpleIo -} // eo namespace I2n diff --git a/asnycio/simplecallout.hpp b/asnycio/simplecallout.hpp deleted file mode 100644 index c895bbc..0000000 --- a/asnycio/simplecallout.hpp +++ /dev/null @@ -1,158 +0,0 @@ -/** - * @file - * @brief provides a method for delayed execution of functions. - * - * - * @copyright © Copyright 2008 by Intra2net AG - * @license commercial - * - * info@intra2net.com - */ - -#ifndef __SIMPLEIO_SIMPLECALLOUT_HPP_ -#define __SIMPLEIO_SIMPLECALLOUT_HPP_ - -#include "simpleio.hpp" - -#include -#include -#include - - -namespace I2n -{ -namespace SimpleIo -{ - -// forward declarations: -namespace Detail { - -class Caller; - -typedef boost::shared_ptr< Caller > CallerPtr; -typedef boost::weak_ptr< Caller > CallerWeakPtr; - -} // eo namespace Detail - - -/** - * @brief represents an id for a deferred call. - * - * Also provides methods for modifying the call - * (like thaw or delete it). - */ -class CallOutId -{ - friend class Detail::Caller; - - public: - CallOutId(); - - unsigned long getValue() const {return m_value;} - - bool thaw() const; - bool remove(); - - bool active() const; - bool frozen() const; - - MilliTime remaining_time(); - - private: - - CallOutId(unsigned long value); - - private: - - unsigned long m_value; - - Detail::CallerWeakPtr m_caller_weak_ptr; - -}; // eo class CallOutId - - - -/* -** -*/ - -namespace Detail { - -/** - * @brief tool class for holding and executing a deferred call. - * - */ -class Caller -: public TimerBase -, public boost::enable_shared_from_this< Caller > -{ - public: - Caller( boost::function< void() > f, long delta_sec, long delta_msec=0, bool frozen=false ); - virtual ~Caller(); - - CallOutId getCallOutId() const { return m_call_out_id; } - - bool thaw(); - - bool joinId(); - - bool frozen() const; - - protected: - - virtual void execute(); - - private: - - CallOutId m_call_out_id; - boost::function< void() > m_func; - bool m_waiting; -}; // eo class Caller - - -} // eo namespace Detail - -/* -** -*/ - -/** - * @brief initiates a deferred call of a function. - * - * @param f the function which should be called. - * @param delta_sec the delta time (in seconds) when the function should be called. - * @return an id to identify the call (may be used for preliminary removal of the call) - */ -template< typename F > -CallOutId callOut( boost::function< void() > f, F delta_sec ); - -template<> CallOutId callOut( boost::function< void() > f, long delta_sec ); -template<> CallOutId callOut( boost::function< void() > f, double delta_sec ); -template<> CallOutId callOut( boost::function< void() > f, float delta_sec ); -template<> CallOutId callOut( boost::function< void() > f, int delta_sec ); - - -/** - * @brief initiates a frozen call of a function. - * - * @param f the function which should be called. - * @param delta_sec the delta time (in seconds) when the call will be (silently) removed. - * @return an id to identify the call; neccessary for thaw the call. - */ -template< typename F > -CallOutId frozenCall( boost::function< void() > f, F delta_sec); - -template<> CallOutId frozenCall( boost::function< void() > f, long delta_sec ); -template<> CallOutId frozenCall( boost::function< void() > f, double delta_sec ); -template<> CallOutId frozenCall( boost::function< void() > f, float delta_sec ); -template<> CallOutId frozenCall( boost::function< void() > f, int delta_sec ); - - - -bool removeCallOut( CallOutId& id ); - - -} // eo namespace SimpleIo -} // eo namespace I2n - -#endif diff --git a/asnycio/simpleio.cpp b/asnycio/simpleio.cpp deleted file mode 100644 index 9bf0cad..0000000 --- a/asnycio/simpleio.cpp +++ /dev/null @@ -1,1695 +0,0 @@ -/** @file - * - * - * @copyright Copyright © 2007-2008 by Intra2net AG - * @license commercial - * @contact info@intra2net.com - */ - -//#define NOISEDEBUG - -#include "simpleio.hpp" - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include - -#include -#include - -#include - -#ifdef NOISEDEBUG -#include -#include -#define DOUT(msg) std::cout << msg << std::endl -#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl -//#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl -#define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl -#else -#define DOUT(msg) do {} while (0) -#define FODOUT(obj,msg) do {} while (0) -#define ODOUT(msg) do {} while (0) -#endif - -namespace -{ - - -/* - * configuration: - */ - - -const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min - - -/** - * contains internal helper structs and functions for io handling. - */ -namespace internal_io -{ - -/** - * extends struct pollfd with some convenience functions - */ -struct PollFd : public ::pollfd -{ - PollFd() - { - fd= 0; - events= revents= 0; - } // eo PollFd - - - /** - * initializes the struct with a given file descriptor and clears the event mask(s). - * @param _fd - */ - PollFd(int _fd) - { - fd= _fd; - events= revents= 0; - } // eo PollFd - - - /** - * set that we want to be notified about new incoming data - */ - void setPOLLIN() { events |= POLLIN; } - - /** - * set that we want to be notified if we can send (more) data. - */ - void setPOLLOUT() { events |= POLLOUT; } - -}; // eo struct PollFd - - -typedef std::vector PollVector; -typedef std::map FdPollMap; -typedef std::map FdIOMap; - - -/** - * struct for interfacing our local structures with poll() - */ -struct PollDataCluster -{ - PollVector m_poll_vector; - FdPollMap m_fd_poll_map; - FdIOMap m_read_fd_io_map; - FdIOMap m_write_fd_io_map; - - void add_read_fd( int fd, I2n::SimpleIo::IOImplementation* io); - void add_write_fd( int fd, I2n::SimpleIo::IOImplementation* io); - - pollfd* get_pollfd_ptr(); - unsigned int get_num_pollfds() const; - -}; // eo struct PollDataCluster - - -template -class PtrList : public std::list -{ - typedef std::list inherited; - public: - bool dirty; - - static int Instances; - - public: - - PtrList() - : dirty(false) - { - ++Instances; - } // eo PtrList - - - ~PtrList() - { - ODOUT(""); - --Instances; - } - - - /** - * add a new item pointer to the list. - * - * @param item item pointer which should be added - */ - void add_item(T* item) - { - typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item); - if (it != inherited::end()) - { - // nothing to do since item is already in the list - return; - } - push_back(item); - } // eo add - - - /** - * remove an item pointer from the list by setting NULL at the current position of the item and mark - * the list as dirty. - * - * @param item the io object which should be removed from the list. - */ - void remove_item(T* item) - { - typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item); - if (it == inherited::end()) - { - // nothing to do: - return; - } - *it = NULL; // forget the pointer - dirty= true; // ..and mark the list as dirty (i.e. has NULL elements) - } // eo remove - - - /** - * cleans the list of objects by removing the NULL elements (if any). - * - * @note this function should only be called when it is ensured that no other functions using iterators of this list. - */ - void clean_list() - { - if (!dirty) - { - // nothing to do - return; - } - // remove the NULL elements now: - erase( - std::remove( inherited::begin(), inherited::end(), (T*)NULL), - inherited::end() ); - dirty= false; - } // eo clean_list - - -}; // eo class PtrList - - -typedef PtrList IOList; -typedef PtrList TimerList; - -template<> int IOList::Instances= 0; -template<> int TimerList::Instances= 0; - - -/** - * the (internal) global list of io objects (object pointers) - */ -IOList& g_io_list() -{ - static IOList _the_io_list; - return _the_io_list; -}; - - -/** - * the (internal) global list of timer objects (object pointers) - */ -TimerList& g_timer_list() -{ - static TimerList _the_timer_list; - return _the_timer_list; -} - -/* - * implementation of PollDataCluster - */ - - -/** - * add a new file descriptor to the read list. - * - * @param fd the file descriptor. - * @param io the io object which uses the fd for reading. - */ -void PollDataCluster::add_read_fd( int fd, I2n::SimpleIo::IOImplementation* io) -{ - FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd); - if (itPollMap != m_fd_poll_map.end()) - { - m_poll_vector[ itPollMap->second ].setPOLLIN(); - } - else - { - PollFd item(fd); - item.setPOLLIN(); - m_fd_poll_map[fd] = m_poll_vector.size(); - m_poll_vector.push_back( item ); - } - m_read_fd_io_map[fd]= io; -} // eo PollDataCluster::add_read_fd - - -/** - * add a new file descriptor to the write list. - * - * @param fd the file descriptor. - * @param io the io object which uses the fd for writing. - */ -void PollDataCluster::add_write_fd( int fd, I2n::SimpleIo::IOImplementation* io) -{ - FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd); - if (itPollMap != m_fd_poll_map.end()) - { - m_poll_vector[ itPollMap->second ].setPOLLOUT(); - } - else - { - PollFd item(fd); - item.setPOLLOUT(); - m_fd_poll_map[fd] = m_poll_vector.size(); - m_poll_vector.push_back( item ); - } - m_write_fd_io_map[fd]= io; -} // eo PollDataCluster::add_write_fd - - -/** - * returns a pointer to a pollfd array; suitable for passing to poll() - * - * @return pointer to pollfd array - */ -pollfd* PollDataCluster::get_pollfd_ptr() -{ - return m_poll_vector.empty() ? NULL : &m_poll_vector.front(); -} // eo get_pollfd_ptr - - -/** - * returns the number of entries in the pollfd array; suitable for passing to poll() - * - * @return the number of entries in the pollfd array - */ -unsigned int PollDataCluster::get_num_pollfds() const -{ - return m_poll_vector.size(); -} // eo get_num_pollfds - - - -} // eo namespace internal_io - - -/* - * some internal tool functions and structures - */ - -struct FilterMatch { - I2n::SimpleIo::FilterBasePtr m_filter; - - FilterMatch(I2n::SimpleIo::FilterBasePtr filter) - : m_filter(filter) - {} - - bool operator () (const I2n::SimpleIo::FilterBasePtr& item) - { - return item && item == m_filter; - } - -}; // eo struct FilterMatch - - -void get_current_real_time(long& current_sec, long& current_msec) -{ - struct timeval tv; - gettimeofday(&tv,NULL); - current_sec= tv.tv_sec; - current_msec= (tv.tv_usec / 1000); - if (current_msec >= 1000) - { - current_sec += (current_msec / 1000); - current_msec%= 1000; - } -} // eo get_current_real_time - - -void get_current_monotonic_time(long& current_sec, long& current_msec) -{ - long nsec; - if (monotonic_clock_gettime(current_sec,nsec)) - { - current_msec= nsec / 1000000L; - } - else - { - //fallback... - get_current_real_time(current_sec,current_msec); - } -} // eo get_current_monotonic_time - - - -} // eo anonymous namespace - - - - -namespace I2n -{ -namespace SimpleIo -{ - - -/** - * @brief gets the current time as MilliTime structure. - * @param[out] mt reference to the MilliTime strcucture which is filled with the result. - */ -void get_current_real_time(MilliTime& mt) -{ - long sec, msec; - ::get_current_real_time(sec,msec); - mt.set(sec,msec); -} // eo get_current_real_time - - -/** - * @brief gets the current time as MilliTime structure. - * @param[out] mt reference to the MilliTime strcucture which is filled with the result. - */ -void get_current_monotonic_time(MilliTime& mt) -{ - long sec, msec; - ::get_current_monotonic_time(sec,msec); - mt.set(sec,msec); -} // eo get_current_monotonic_time - - -/* - * implementation of MilliTime - */ - -MilliTime::MilliTime(long sec, long msec) -: mt_sec(sec), mt_msec(msec) -{ - normalize(); -} // eo MilliTime::MilliTime - - -void MilliTime::set(long sec, long msec) -{ - mt_sec= sec; - mt_msec= msec; - normalize(); -} // eo MilliTime::set - - -/** - * normalizes the values, so that mt_msec has a value between 0 and 999. - */ -void MilliTime::normalize() -{ - if (mt_msec < 0) - { - mt_sec += (mt_msec / 1000) - 1; - mt_msec = (mt_msec % 1000) + 1000; - } - else if (mt_msec>=1000) - { - mt_sec+= (mt_msec / 1000); - mt_msec %= 1000; - } -} // eo MilliTime::normalize - - -/** - * determine if the represented point in time is before another one. - * @param other the other point in time. - * @return true if the own point in time is before the other one. - */ -bool MilliTime::operator < (MilliTime& other) -{ - normalize(); - other.normalize(); - return - (mt_sec < other.mt_sec) - || (( mt_sec == other.mt_sec) && (mt_msec < other.mt_msec)); -} // eo MilliTime::operator < - - -/** - * determine if two point in times are equal. - * @param other the point in time to compare with. - * @return true if the represented times are equal. - */ -bool MilliTime::operator == (MilliTime& other) -{ - normalize(); - other.normalize(); - return (( mt_sec == other.mt_sec) && (mt_msec == other.mt_msec)); -} // eo MilliTime::operator < - -/** - * @brief subtracts a time delta from the object. - * @param lhs the time delta to subtract. - * @return reference to the object itself. - */ -MilliTime& MilliTime::operator -= (const MilliTime& lhs) -{ - mt_sec -= lhs.mt_sec; - mt_msec -= lhs.mt_msec; -} // eo operator -= - - -/** - * @brief adds a time delta from the object. - * @param lhs the time delta to add. - * @return reference to the object itself. - */ -MilliTime& MilliTime::operator += (const MilliTime& lhs) -{ - mt_sec += lhs.mt_sec; - mt_msec += lhs.mt_msec; -} // eo operator += - - -/* - * implementation of TimerBase - */ - -/** - * constructor. Adds the object to the internal timer list. - */ -TimerBase::TimerBase() -: m_active(false) -, m_marked(false) -{ - internal_io::g_timer_list().add_item(this); -} // eo TimerBase::TimerBase - - -/** - * destructor. Removes the object from the internal timer list. - */ -TimerBase::~TimerBase() -{ - ODOUT("enter"); - if (internal_io::TimerList::Instances) - { - ODOUT("remove from list"); - internal_io::g_timer_list().remove_item(this); - } -} // eo TimerBase::~TimerBase - - -/** - * @brief returns the point in time when the time is executed in real time. - * @return the point in time when the timer is to be executed. - */ -MilliTime TimerBase::getRealWhenTime() const -{ - MilliTime mono_time; - MilliTime real_time; - get_current_monotonic_time(mono_time); - get_current_real_time(real_time); - MilliTime result= m_when - mono_time + real_time; - return result; -} // eo TimerBase::getRealWhenTime() const - - -/** - * sets the time when the event should be executed. - * @param sec the seconds part of the point in time. - * @param msec the milliseconds part of the point in time. - */ -void TimerBase::setWhenTime(long sec, long msec) -{ - m_when.set(sec,msec); - m_marked= false; -} // eo TimerBase::setWhenTime - - -/** - * sets the time when the event should be executed. - * @param mt the point in time. - */ -void TimerBase::setWhenTime(const MilliTime& mt) -{ - m_when= mt; - m_marked= false; -} // eo TimerBase::setWhenTime - - -/** - * sets the time delta measured from current time when the event should be executed. - * @param sec the seconds of the time delta - * @param msec the milli seconds of the time delta - */ -void TimerBase::setDeltaWhenTime(long sec, long msec) -{ - setDeltaWhenTime( MilliTime(sec,msec) ); -} // eo TimerBase::setWhenTime - - - -/** - * sets the time delta measured from current time when the event should be executed. - * @param mt the time delta - */ -void TimerBase::setDeltaWhenTime(const MilliTime& mt) -{ - get_current_monotonic_time(m_when); - m_when+= mt; - m_marked= false; -} // eo TimerBase::setWhenTime - - -/** - * set the active state of the timer event. - * @param active determines if the object should be active (default: yes). - */ -void TimerBase::activate(bool active) -{ - m_active = active; - if (!active) - { - // clear the mark if we are not active. - m_marked= false; - } -} // eo TimerBase::activate - - -/** @fn void TimerBase::deactivate() - * deactivates the event by clearing the active state. - */ - - -/** - * called when the timer event occured. - */ -void TimerBase::execute() -{ -} // eo TimerBase::execute - - -/* - * implementation of FilterBase class - */ - - -FilterBase::FilterBase() -: m_io(NULL) -{ -} // eo FilterBase::FilterBase() - - -/** - * injects incoming data. - * @param data the new data - */ -void FilterBase::injectIncomingData(const std::string& data) -{ - if (m_io) - { - FilterBasePtr ptr= get_ptr_as< FilterBase >(); - if (ptr) - { - m_io->injectIncomingData(ptr,data); - } - } -} // FilterBase::injectIncomingData(const std::string&) - - -/** - * injects outgoing data. - * @param data the new data - */ -void FilterBase::injectOutgoingData(const std::string& data) -{ - if (m_io) - { - FilterBasePtr ptr= get_ptr_as< FilterBase >(); - if (ptr) - { - m_io->injectOutgoingData(ptr,data); - } - } -} // eo FilterBase::injectOutgoingData(const std::string&) - - - -/** - * called when EOF detected on incoming channel (or incoming channel closed) - */ -void FilterBase::endOfIncomingData() -{ -} // eo FilterBase::endOfIncomingData() - -/** - * called when the filter should reset. - * This is used when a new channel is opened or when the filter is taken out of a filter chain. - */ -void FilterBase::reset() -{ -} // eo FilterBase::reset() - - -/* - * implementation of IOImplementation class - */ - - -/** - * constructor for the base io class. - * - * Also adds the object to internal list of io objects (which is used by the backend). - * - * @param read_fd the file descriptor which should be used for reading (default -1 for no value) - * @param write_fd the file descriptor which should be used for writing (default -1 for no value) - */ -IOImplementation::IOImplementation(int read_fd, int write_fd) -: m_read_fd(-1) -, m_write_fd(-1) -, m_eof(false) -, m_not_writable(false) -, m_input_buffer() -, m_output_buffer() -, m_marked_for_reading(false) -, m_marked_for_writing(false) -{ - internal_io::g_io_list().add_item(this); - if (read_fd >= 0) - { - setReadFd( read_fd ); - } - if (write_fd >= 0) - { - setWriteFd( write_fd ); - } -} // eo IOImplementation::IOImplementation - - -/** - * destructor of the base io class. - * - * Removes the object from the interal list of io objects. - */ -IOImplementation::~IOImplementation() -{ - close(); - if (internal_io::IOList::Instances) - { - internal_io::g_io_list().remove_item(this); - } - // now clear the filters: - while (! m_filter_chain.empty() ) - { - FilterChain::iterator it = m_filter_chain.begin(); - (*it)->reset(); - (*it)->m_io= NULL; - //TODO: signal the filter that it is removed ?! - m_filter_chain.erase(it); - } -} // eo IOImplementation::~IOImplementation - - -/** - * adds another filter to the filter chain. - * @param filter pointer to the new filter. - */ -void IOImplementation::addFilter -( - FilterBasePtr filter -) -{ - if (!filter) - { - return; // nothing to do - } - if (filter->m_io) - { - filter->m_io->removeFilter(filter); - } - m_filter_chain.push_back( filter ); -} // eo IOImplementation::addFilter - - -/** - * removes a filter from the filter chain. - * @param filter the pointer to the filter which is removed. - * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for - * deleting the filter if it was dynamically allocated. - */ -void IOImplementation::removeFilter -( - FilterBasePtr filter -) -{ - FilterChain::iterator it = - std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) ); - if (it != m_filter_chain.end()) - { - filter->reset(); - filter->m_io= NULL; - //TODO: signal the filter that it is removed ?! - m_filter_chain.erase(it); - } -} // eo IOImplementation::removeFilter - - -/** - * closes the file descriptors (/ the connection). - * - * @param direction the direction which should be closed (default: @a Direction::both for all). - */ -void IOImplementation::close(Direction direction) -{ - bool had_read_fd= (m_read_fd >= 0); - m_errno= 0; - if (direction == Direction::unspecified) direction= Direction::both; - if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0) - { // special case: half closed (socket) connections... - // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part - // (read_fd or write_fd) this class works as desired. - switch(direction) - { - case Direction::in: - { - int res= ::shutdown(m_read_fd, SHUT_RD); - if (res<0) - { - m_errno= errno; - } - m_read_fd= -1; - if (!m_eof) - { - for(FilterChain::iterator it= m_filter_chain.begin(); - it != m_filter_chain.end(); - ++it) - { - (*it)->endOfIncomingData(); - } - } - } - return; - - case Direction::out: - { - int res= ::shutdown(m_write_fd, SHUT_WR); - if (res<0) - { - m_errno= errno; - } - m_write_fd= -1; - m_output_buffer.clear(); - } - return; - } - } - if (m_write_fd >= 0 && (direction & Direction::out) ) - { - int res1 = ::close(m_write_fd); - if (m_write_fd == m_read_fd) - { - m_read_fd= -1; - } - m_write_fd= -1; - m_output_buffer.clear(); - if (res1<0) m_errno= errno; - } - if (m_read_fd >=0 && (direction & Direction::in) ) - { - int res1 = ::close(m_read_fd); - m_read_fd= -1; - if (res1<0) m_errno= errno; - } - if (had_read_fd && !m_eof && (m_read_fd<0)) - { - for(FilterChain::iterator it= m_filter_chain.begin(); - it != m_filter_chain.end(); - ++it) - { - (*it)->endOfIncomingData(); - } - } -} // eo IOImplementation::close - - -/** - * determines if the io class wants to read data. - * Default implementation checks only for a valid file descriptor value. - * - * @return @a true if the objects wants to read data - */ -bool IOImplementation::wantRead() -{ - return (m_read_fd >= 0) && ! m_eof; -} // eo IOImplementation::wantRead - - -/** - * determines if the io class wants to write data. - * Default implementation checks for a valid file descriptor value and if the object - * cannot write data immediately. - * - * @return @a true if the objects wants to write data - */ -bool IOImplementation::wantWrite() -{ - return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable; -} // eo IOImplementation::wantWrite - - -/** - * delivers if opened. - * The default returns @a true if at least one file descriptor (read or write) is valid. - * @return @a true if opened. - */ -bool IOImplementation::opened() const -{ - return (m_read_fd>=0) || (m_write_fd>=0); -} // eo IOImplementation::opened() const - - -/** - * returns if the read side detected an end of file (EOF). - * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid). - */ -bool IOImplementation::eof() const -{ - return (m_read_fd < 0) || m_eof; -} // eo IOImplementatio::eof() const - - -/** - * @brief returns of the write side didn't detect that it cannot write. - * @return @a true if we can write. - */ -bool IOImplementation::writable() const -{ - return (m_write_fd >=0 ) and not m_not_writable; -} // eo IOImplementation::writable() const - - -/** - * returns if the output buffer is empty. - * @return - */ -bool IOImplementation::empty() const -{ - return m_output_buffer.empty(); -} // eo IOImplementation::empty - -/** - * puts data into the output buffer and sends it immediately if possible, - * - * The data is passed through the filter chain before it's stored in the output buffer - * (i.e. the output buffer contains data as it should be send directly to the descriptor). - * @param _data the data which should be send. - */ -void IOImplementation::lowSend(const std::string& _data) -{ - std::string data(_data); - - for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin(); - it_filter!= m_filter_chain.rend(); - ++it_filter) - { - data= (*it_filter)->filterOutgoingData(data); - } - m_output_buffer+= data; - - // if we can send immediately, do it: - if (! m_output_buffer.empty() && m_marked_for_writing) - { - doWrite(); - } -} // eo IOImplementation::lowSend - - -/** - * called by the backend when there is data to read for this object. - * - * Reads the data from the connection (read file descriptor) and passes the data through the filter chain. - * The final data is appended to the input buffer and the signal @a m_signal_read() is called. - * - * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called. - * - * @note overload this method only when You know what You are doing! - * (overloading is necessary when handling server sockets.) - */ -void IOImplementation::doRead() -{ - // static read buffer; should be ok as long as we don't use threads - static char buffer[8*1024]; // 8 KiB - - m_errno = 0; - if (m_read_fd<0 || !m_marked_for_reading) - { - ODOUT("exit0; read_fd="< " << count); - - // interpret what we got: - if (count < 0) // error - { - m_errno = errno; - int fd= m_read_fd; - - switch(m_errno) - { - case EINVAL: - case EBADF: - case ECONNRESET: - case ENETRESET: - if (fd == m_read_fd) - { - close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in ); - } - break; - } - } - else if (count==0) // EOF - { - // remember the read fd: - int fd = m_read_fd; - // remember the EOF: - m_eof= true; - // signal EOF - m_signal_eof(); - // if the fd is still the same: close it. - if (fd == m_read_fd) - { - close( Direction::in ); - } - } - else // we have valid data - { - std::string data(buffer,count); - ODOUT(" got \"" << data << "\""); - for(FilterChain::iterator it_filter= m_filter_chain.begin(); - it_filter != m_filter_chain.end(); - ++it_filter) - { - data= (*it_filter)->filterIncomingData(data); - } - m_input_buffer+= data; - m_signal_read(); - } -} // eo IOImplementation::doRead - - -/** - * interface for filter classes to inject data into the filter chain (emulating new incoming data). - * @param from_filter the filter which injects the new data. - * @param _data the new data. - */ -void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data) -{ - FilterChain::iterator it_filter = - std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) ); - if (it_filter == m_filter_chain.end()) - { - // dont accept data inject from a unknown filter - return; - } - // well: pass the data through the remaining filters: - // NOTE: processing is (nearly) the same as in IOImplementation::doRead() - std::string data(_data); - for(++it_filter; - it_filter != m_filter_chain.end(); - ++it_filter) - { - data= (*it_filter)->filterIncomingData(data); - } - m_input_buffer+= data; - m_signal_read(); -} // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&) - - -/** - * interface for filter classes to inject data into the filter chain (emulating new outgoing data). - * @param from_filter the filter which injects the new data. - * @param _data the new data. - */ -void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data) -{ - FilterChain::reverse_iterator it_filter = - std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) ); - if (it_filter == m_filter_chain.rend()) - { - // dont accept data inject from a unknown filter - return; - } - // well: pass the data through the remaining filters: - // NOTE: processing is (nearly) the same as in IOImplementation::lowSend() - std::string data(_data); - for(++it_filter; - it_filter!= m_filter_chain.rend(); - ++it_filter) - { - data= (*it_filter)->filterOutgoingData(data); - } - m_output_buffer+= data; - - // if we can send immediately, do it: - if (! m_output_buffer.empty() && m_marked_for_writing) - { - doWrite(); - } -} // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&) - - -/** - * set the read file descriptor. - * Although a derived class can also set the read fd directly; this method should be used - * for this task since it updates some flags on the fd for async operation. - * @param fd the new read file descriptor. - */ -void IOImplementation::setReadFd(int fd) -{ - // test if we already have a valid descriptor (and may have to close it): - if (m_read_fd >=0 ) - { - if (m_read_fd == fd) - { - // fd was already right; consider it to be ok. - return; - } - close(Direction::in); - } - // reset our errno: - m_errno= 0; - // if the new descriptor looks valid, set some flags: - if (fd >= 0) - { - long flags= ::fcntl(fd, F_GETFL); - if (flags != -1) - { - // set the flags for non blocking, async operation - flags |= O_NONBLOCK|O_ASYNC; - ::fcntl(fd,F_SETFL, flags); - } - else if ( errno == EBADF ) - { - // well, we seemed to be fed with an invalid descriptor...: - m_errno = errno; - fd= -1; - } - } - if (fd >= 0) // if still valid: - { - // set the close-on-exec flag - ::fcntl(fd,F_SETFD, FD_CLOEXEC); - } - m_read_fd= fd; - m_marked_for_reading= false; - m_eof= false; -} // eo IOImplementation::setReadFd(int) - - - -/** - * set the write file descriptor. - * Although a derived class can also set the write fd directly; this method should be used - * for this task since it updates some flags on the fd for async operation. - * @param fd the new write file descriptor. - */ -void IOImplementation::setWriteFd(int fd) -{ - if (m_write_fd >=0 ) - { - if (m_write_fd == fd) - { - // fd was already right; consider it to be ok. - return; - } - close(Direction::out); - } - // reset our errno: - m_errno= 0; - // if the new descriptor looks valid, set some flags: - if (fd >= 0) - { - long flags= ::fcntl(fd, F_GETFL); - if (flags != -1) - { - // set the flags for non blocking, async operation - flags |= O_NONBLOCK|O_ASYNC; - ::fcntl(fd,F_SETFL, flags); - } - else if (errno == EBADF) - { - // well, we seemed to be fed with an invalid descriptor...: - m_errno = errno; - fd= -1; - } - } - if (fd >= 0) // if still valid: - { - // set the close-on-exec flag - ::fcntl(fd,F_SETFD, FD_CLOEXEC); - } - m_write_fd = fd; - m_marked_for_writing= false; - m_not_writable= false; -} // eo IOImplementation::setWriteFd(int) - - - -/** - * called by the backend when this object can write data. - * - * If some data was sended, the signal @a m_signal_write is called. - * - * @internal tries to write all buffered data to output; if this succeeds, - * the connection is assumed to be still able to accept more data. - * (i.e. the internal write mark is kept!) - * - * @note overload this method only when You know what You are doing! -*/ -void IOImplementation::doWrite() -{ - m_errno = 0; - if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty()) - { - return; - } - - ODOUT("doWrite, d=\"" << m_output_buffer << "\""); - - //reset mark: - m_marked_for_writing= false; - - // now write the data - ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size()); - - ODOUT("::write -> " << count); - - if (count < 0) // error - { - m_errno= errno; - int fd= m_write_fd; - - switch(m_errno) - { - case EPIPE: - m_not_writable= true; - // emit a signal - m_signal_not_writable(); - // fall through - case EINVAL: - case EBADF: - case ECONNRESET: - case ENETRESET: - if (fd == m_write_fd) - { - close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out ); - } - break; - } - } - else - { - m_output_buffer.erase(0, count); - if (m_output_buffer.empty()) - { - // special case: if we were able to send all the data, we keep the write mark: - m_marked_for_writing= true; - } - } - if (count > 0) - { - m_signal_write(); - } -} // eo IOImplementation::doWrite - - -/* - * implementation of SimpleIO - */ - - -SimpleIO::SimpleIO(int read_fd, int write_fd) -: inherited(read_fd, write_fd) -{ - m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this)); -} // eo SimpleIO::SimpleIO() - - -SimpleIO::~SimpleIO() -{ -} // eo SimpleIO::~SimpleIO() - - -/** - * sends a string. - * @param data the string. - */ -void SimpleIO::sendString(const std::string& data) -{ - lowSend(data); -} // eo SimpleIO::sendString(const std::string&) - - -/** - * emits the signal signalReceived with the received data. - * This slot is connected to IOImplementation::m_signal_read. - */ -void SimpleIO::slotReceived() -{ - std::string data; - data.swap(m_input_buffer); - signal_received_string(data); -} // eo SimpleIO::slotReceived() - - - -/* - * implementation of SimpleIO2 - */ - - -SimpleIO2::SimpleIO2(int read_fd, int write_fd) -: inherited(read_fd, write_fd) -{ - m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this)); -} // eo SimpleIO2::SimpleIO2() - - -SimpleIO2::~SimpleIO2() -{ -} // eo SimpleIO2::~SimpleIO2() - - -/** - * sends a string. - * @param data the string. - */ -void SimpleIO2::sendString(const std::string& data) -{ - lowSend(data); -} // eo SimpleIO2::sendString(const std::string&) - - -/** - * emits the signal signalReceived with the received data. - * This slot is connected to IOImplementation::m_signal_read. - */ -void SimpleIO2::slotReceived() -{ - std::string data; - data.swap(m_input_buffer); - signal_received_string(data); -} // eo SimpleIO2::slotReceived() - - - -/* - * implementation of class Backend (singleton) - */ - -Backend* Backend::g_backend= NULL; - -int Backend::m_count_active_steps=0; - - -Backend::Backend() -: m_count_active_loops(0) -, m_count_stop_requests(0) -{ - SystemTools::ignore_signal( SystemTools::Signal::PIPE ); -} // eo Backend::Backend - - -Backend::~Backend() -{ - SystemTools::restore_signal_handler( SystemTools::Signal::PIPE ); -} // eo Backend::~Backend() - -/** - * delivers pointer to the current backend, instantiating a new backend if there was no current one. - * - * This should be the only way to access the backend which should be a singleton. - * - * @return the pointer to the current backend. - */ -Backend* Backend::getBackend() -{ - if (!g_backend) - { - g_backend = new Backend(); - } - return g_backend; -} // eo Backend::getBackend - - - - -/** - * performs one backend cycle. - * - * Collects all file descriptors from the active io objects which should be selected for reading and/or writing. - * Also determines the timer events which become due and adjusts the timeout. - * Constructs the necessary structures and calls poll(). - * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events) - * - * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured. - * @return @a true if there was at least one active object; otherwise @a false - * - * @note this method is a little beast. - * - * @internal - * The cycle is divided into four steps: collecting; poll; mark and execute. - * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage - * are calling @a Backup::doOneStep or open their own local backend loop. - * - * @todo handle some more error cases. - * @todo provide a plugin interface for external handler. - * (currently inclusion of external handler is possible by (ab)using timer classes) - */ -bool Backend::doOneStep(int timeout) -{ - ODOUT( "timeout=" << timeout ); - internal_io::PollDataCluster poll_data; - bool had_active_object = false; - - ++m_count_active_steps; - - try { - - FdSetType local_read_fds; - FdSetType local_write_fds; - - // step 1 ; collect - - { // step 1.1: collect fds for read/write operations - for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin(); - itIOList != internal_io::g_io_list().end(); - ++itIOList) - { - if (! *itIOList) continue; // skip NULL entries - int read_fd = (*itIOList)->m_read_fd; - int write_fd = (*itIOList)->m_write_fd; - bool want_read = (read_fd >= 0) and (*itIOList)->wantRead(); - bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite(); - if (read_fd >= 0 ) - { - local_read_fds.insert( read_fd ); - } - if (write_fd >= 0) - { - local_write_fds.insert( write_fd ); - } - if (!want_read && !want_write) continue; - if (want_read) - { - FODOUT( (*itIOList), "wants to read (fd=" << read_fd << ")"); - poll_data.add_read_fd(read_fd, *itIOList); - } - if (want_write) - { - FODOUT( (*itIOList), "wants to write (fd=" << write_fd << ")"); - poll_data.add_write_fd(write_fd, *itIOList); - } - had_active_object= true; - } - } - - { // step 1.2: collect timer events - MilliTime current_time; - MilliTime min_event_time; - - get_current_monotonic_time(current_time); - bool min_event_time_set; - - if (timeout >= 0) - { - min_event_time = current_time + MilliTime(0,timeout); - min_event_time_set= true; - } - else - { - min_event_time = current_time + MilliTime(86400,0); - min_event_time_set= false; - } - // TODO - - for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin(); - it_timer != internal_io::g_timer_list().end() - && (!had_active_object || !min_event_time_set || current_time < min_event_time); - ++ it_timer) - { - if (! *it_timer) continue; // skip NULL entries - if (! (*it_timer)->m_active) continue; // skip if not enabled - if ( !min_event_time_set || (*it_timer)->m_when < min_event_time) - { - min_event_time = (*it_timer)->m_when; - min_event_time_set= true; - } - had_active_object= true; - } - - if (min_event_time_set) - { // we have at a minimal event time, so (re)compute the timeout value: - MilliTime delta= (min_event_time - current_time); - long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h - if (delta_ms <= 0L) - { - timeout= 0L; - } - else - { - timeout= delta_ms + (delta_ms<5 ? 1 : 3); - } - } - } - - // step 2 : poll - ODOUT(" poll timeout is " << timeout); - { - MilliTime current_time; - get_current_monotonic_time(current_time); - ODOUT(" current time is sec="< " << poll_result); - { - MilliTime current_time; - get_current_monotonic_time(current_time); - ODOUT(" current time is sec="<m_marked_for_reading= true; - } - } - if ( 0!= (itPollItem->revents & POLLOUT)) - { - IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ]; - if (io && io->m_write_fd==itPollItem->fd) - { - io->m_marked_for_writing= true; - } - } - if ( 0!= (itPollItem->revents & POLLERR)) - { - IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ]; - if (0!= (itPollItem->events & POLLOUT)) - { - if (io && io->m_write_fd==itPollItem->fd) - { - io->m_marked_for_writing= false; - //io->close( Direction::out ); - } - } - } - // TODO error handling (POLLERR, POLLHUP, POLLNVAL) - } - } - - //Step 3.2: mark timer objects - { - MilliTime current_time; - - get_current_monotonic_time(current_time); - ODOUT(" current time is sec="<m_active) continue; // skip if not enabled - if ( (*it_timer)->m_when <= current_time) - { - ODOUT(" ==> MARK"); - (*it_timer)->m_marked = true; - } - } - } - - - // step 4 : execute - - // step 4.1: execute io - ODOUT("execute stage"); - for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin(); - it_io != internal_io::g_io_list().end(); - ++ it_io) - { - ODOUT(" check obj " << *it_io); - if (NULL == *it_io) continue; - if ((*it_io)->m_marked_for_writing) - { - FODOUT((*it_io),"exec doWrite"); - (*it_io)->doWrite(); - if ((*it_io) == NULL) continue; // skip remaining if we lost the object - if ((*it_io)->m_errno) - { - continue; - } - } - if ((*it_io)->m_marked_for_reading) - { - FODOUT((*it_io),"exec doRead"); - (*it_io)->doRead(); - if ((*it_io) == NULL) continue; // skip remaining if we lost the object - if ((*it_io)->m_errno) - { - continue; - } - } - } - - // step 4.2: execute timer events - { - for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin(); - it_timer != internal_io::g_timer_list().end(); - ++ it_timer) - { - if (! *it_timer) continue; // skip NULL entries - if (! (*it_timer)->m_active) continue; // skip if not enabled - if (! (*it_timer)->m_marked) continue; // skip if not marked - - // reset the mark and deactivate object now since the execute() method might activate it again - (*it_timer)->m_marked= false; - (*it_timer)->m_active= false; - - // now execute the event: - (*it_timer)->execute(); - } - } - - } // eo try - catch(...) - { - // clean up our counter - --m_count_active_steps; - // and forward the exception - throw; - } - - if ( 0 == --m_count_active_steps) - { - internal_io::g_io_list().clean_list(); - internal_io::g_timer_list().clean_list(); - } - - return had_active_object; -} // eo Backend::doOneStep - - -/** - * enters a backend loop. - * - * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more - * active objects (io objects or timer objects). - */ -void Backend::run() -{ - ++m_count_active_loops; - do - { - try - { - if (!doOneStep(c_max_poll_wait)) - { - // stop if there are no more active objects. - stop(); - } - } - catch(...) - { - // clean up our counter - --m_count_active_loops; - // and forward the exception - throw; - } - } - while (0 == m_count_stop_requests); - --m_count_active_loops; - --m_count_stop_requests; -} // eo Backend::run - - -/** - * @brief stops the latest loop currently run by Backend::run(). - * @see Backend::run() - */ -void Backend::stop() -{ - if (m_count_active_loops) - { - ++m_count_stop_requests; - } -} // eo Backend::stop() - - - -} // eo namespace SimpleIo -} // eo namespace I2n diff --git a/asnycio/simpleio.hpp b/asnycio/simpleio.hpp deleted file mode 100644 index d3727be..0000000 --- a/asnycio/simpleio.hpp +++ /dev/null @@ -1,463 +0,0 @@ -/** - * @file - * @brief simple basic IO handling. - * - * @copyright © Copyright 2007-2008 by Intra2net AG - * @license commercial - * @contact info@intra2net.com - * - * Deals with POSIX file descriptors; provides an additional abstraction - * level above select() or poll() interface. - * Also provides basic functionality for dealing with timer events. - */ - -#ifndef __I2N_SIMPLEIO_HPP__ -#define __I2N_SIMPLEIO_HPP__ - -#include -#include -#include - -#include - -#include -#include - - -namespace I2n -{ -namespace SimpleIo -{ - - -/* - * forward declarations - */ -class Backend; -class IOImplementation; - -/* - * end of forward declarations - */ - - -/** - * direction for io operations. - */ -struct Direction -{ - enum _Direction - { - unspecified= 0, - in = 1, - out = 2, - both= 3 - } m_direction; - - Direction( _Direction direction = unspecified) : m_direction(direction) {} - - operator _Direction () const { return m_direction; } -}; // eo struct IODirection; - - -/** - * structure for storing (a point in time as) seconds and milliseconds. - */ -struct MilliTime -{ - long mt_sec; - long mt_msec; - - MilliTime(long sec=0, long msec=0); - - void set(long sec, long msec=0); - - inline long long get_milliseconds() const - { - return ((long long)mt_sec * 1000L + mt_msec); - } // eo get_milliseconds - - void normalize(); - - bool operator < (MilliTime& other); - bool operator == (MilliTime& other); - - MilliTime& operator -= (const MilliTime& lhs); - MilliTime& operator += (const MilliTime& lhs); - -}; // eo struct MilliTime - - -inline MilliTime operator + (const MilliTime& rhs, const MilliTime& lhs) -{ - MilliTime t(rhs); - return t+= lhs; -} // eo operator + (const MilliTime& rhs, const MilliTime lhs) - -inline MilliTime operator - (const MilliTime& rhs, const MilliTime& lhs) -{ - MilliTime t(rhs); - return t-= lhs; -} // eo operator - (const MilliTime& rhs, const MilliTime lhs) - - -inline bool operator <= (MilliTime& rhs, MilliTime& lhs) -{ - return (rhs PtrType; - public: - FilterBase(); - virtual ~FilterBase() {}; - - protected: - - virtual std::string filterIncomingData(const std::string& data)= 0; - virtual std::string filterOutgoingData(const std::string& data)= 0; - - virtual void endOfIncomingData(); - virtual void reset(); - - protected: - - void injectIncomingData(const std::string& data); - void injectOutgoingData(const std::string& data); - - private: - /// pointer to the io object which uses this filter: - IOImplementation *m_io; -}; // eo class FilterBase - -typedef FilterBase::PtrType FilterBasePtr; - - -/** - * identity filter; does nothing with the data (in other words: it's useless ;-) ). - */ -class FilterIdentity : public FilterBase -{ - protected: - virtual std::string filterIncomingData(const std::string& data) { return data; } - virtual std::string filterOutgoingData(const std::string& data) { return data; } -}; // eo class FilterIdentity - - -/** - * @brief null filter; deletes everything it receives. - * - * usefull when output from a subprocess (like stderr) should be ignored... - */ -class FilterNull : public FilterBase -{ - protected: - virtual std::string filterIncomingData(const std::string& data) { return std::string(); } - virtual std::string filterOutgoingData(const std::string& data) { return std::string(); } -}; // eo FilterNull - - -/** - * the base class of the IO classes. - * - * provides the functionality to read from a file desriptor and write to a file descriptor (which can be - * identical (like for socket io), but also can be different (like pipes from/to a process)). - * The data stream can be filtered through plugged filter objects which are building a filter chain. - * Incoming data is filtered forward through the chain; outgoing data is filtered backward through the chain. - * (So the first filter is the one which modifies the data closest to the connections). - * - * @note the functionality is provided in conjunction with the @a Backend class which handles parts of - * the low level IO. - * - * @note this is a base class; it provides most "interesting" functionality in the protected section only. - * This way, derived classes can decide which of that functionality they want to export in their own public - * interfaces and which to keep hidden. - */ -class IOImplementation -: public boost::signals::trackable -, virtual public SharedBase -{ - friend class Backend; - friend class FilterBase; - - public: - - typedef std::list< FilterBasePtr > FilterChain; - - typedef boost::signal< void() > SignalType; - - typedef boost::shared_ptr< IOImplementation > PtrType; - - public: - IOImplementation(int read_fd=-1, int write_fd=-1); - virtual ~IOImplementation(); - - virtual void close(Direction direction = Direction::both); - - virtual bool wantRead(); - virtual bool wantWrite(); - - virtual bool opened() const; - virtual bool eof() const; - virtual bool writable() const; - virtual bool empty() const; - - protected: - - void addFilter(FilterBasePtr filter); - void removeFilter(FilterBasePtr); - - - void lowSend(const std::string& data); - - std::string::size_type getOutputBufferSize() const { return m_output_buffer.size(); } - - void setWriteFd(int fd); - void setReadFd(int fd); - - inline int readFd() const - { - return m_read_fd; - } - - inline int writeFd() const - { - return m_write_fd; - } - - inline bool isMarkedForReading() const { return m_marked_for_reading; } - inline bool isMarkedForWriting() const { return m_marked_for_writing; } - - - protected: - virtual void doRead(); - virtual void doWrite(); - - void resetReadMark() { m_marked_for_reading= false; } - void resetWriteMark() { m_marked_for_writing= false; } - - void injectIncomingData(FilterBasePtr from_filter, const std::string& _data); - void injectOutgoingData(FilterBasePtr from_filter, const std::string& _data); - - protected: - /// last error number - int m_errno; - /// the input buffer (i.e. the data read from @a m_read_fd) - std::string m_input_buffer; - - /// the chain of filter which are applied to the data received are the data which should be send. - FilterChain m_filter_chain; - - /// signal which is fired when end of file is detected - SignalType m_signal_eof; - /// signal which is fired when write is no longer possible - SignalType m_signal_not_writable; - /// signal which is fired when new data was read - SignalType m_signal_read; - /// signal which is fired when data was written - SignalType m_signal_write; - - /// end of file (on @a m_read_fd) detected (used additionally when m_read_fd is valid) - bool m_eof; - - /// unable-to-write (on @a m_write_fd) detected (used additionally when m_write_fd is valid) - bool m_not_writable; - - private: - /// the file descriptor to read from (-1 if none is given) - int m_read_fd; - /// the file descriptor to write to (-1 if none is given) - int m_write_fd; - /// output buffer; contains the data which needs to be written. - std::string m_output_buffer; - - private: - /// @a true when data is available to be read - bool m_marked_for_reading; - /// @a true when data can be written - bool m_marked_for_writing; - -}; // eo class IOImplementation - - - -/** - * same as IOImplementation, but makes fd access functions public. - */ -class IOImplementation2 : public IOImplementation -{ - typedef IOImplementation inherited; - public: - IOImplementation2(int read_fd=-1, int write_fd=-1) : inherited(read_fd, write_fd) {} - - void setWriteFd(int fd) { inherited::setWriteFd(fd); } - void setReadFd(int fd) { inherited::setReadFd(fd); } - - int readFd() const { return inherited::readFd(); } - int writeFd() const { return inherited::writeFd(); } - -}; // eo class IOImplementation2 - - -/** - * provides sending data and receiving data via a signal. - * - * @note the received data is passed as parameter to the signal and no longer stored in the received buffer. - */ -class SimpleIO : public IOImplementation -{ - typedef IOImplementation inherited; - public: - SimpleIO(int read_fd=-1, int write_fd=-1); - virtual ~SimpleIO(); - - void sendString(const std::string& data); - - boost::signal signal_received_string; - - private: - - void slotReceived(); -}; // eo class SimpleIO - - -/** - * provides sending data and receiving data via a signal. - * - * @note the received data is passed as parameter to the signal and no longer stored in the received buffer. - */ -class SimpleIO2 : public IOImplementation2 -{ - typedef IOImplementation2 inherited; - public: - SimpleIO2(int read_fd=-1, int write_fd=-1); - virtual ~SimpleIO2(); - - void sendString(const std::string& data); - - boost::signal signal_received_string; - - private: - - void slotReceived(); -}; // eo class SimpleIO2 - - -/** - * provides the backend for io handling. - * - * This (singleton) object provides the management of io events. It collects all wishes for reading/writing - * from the io objects and information from the timer events. - * It poll()s for the events and distributes them to the objects, - * - * The class provides the possibility for executing one io cycle (/step) or to run a backend loop. - * - * @note the Backend class needs to be a friend of IOImplementation since it accesses private members - * of IOImplementation while performing the io cycles. - */ -class Backend -{ - public: - typedef std::set< int > FdSetType; - - public: - - bool doOneStep(int ms_timeout= -1); - void run(); - void stop(); - - protected: - Backend(); - Backend(const Backend& other); - ~Backend(); - - protected: - - /// the number of currently active backend loops - int m_count_active_loops; - /// the number of pending stop requests (where each one should exit one active backend loop) - int m_count_stop_requests; - - /// the number of currently active backend cycles(/ steps) - static int m_count_active_steps; - - public: - static Backend* getBackend(); - - protected: - /// pointer to the active backend (which is delivered by Backend::getBackend) - static Backend* g_backend; -}; // eo class Backend - - - -/* -** tool functions: -*/ - - -void get_current_real_time(MilliTime& mt); -void get_current_monotonic_time(MilliTime& mt); - - -} // eo namespace SimpleIo -} // eo namespace I2n - -#endif diff --git a/asnycio/simplepipe.cpp b/asnycio/simplepipe.cpp deleted file mode 100644 index 4eba4bc..0000000 --- a/asnycio/simplepipe.cpp +++ /dev/null @@ -1,98 +0,0 @@ -/** @file - * - * (c) Copyright 2007 by Intra2net AG - * - * info@intra2net.com - */ - -#include "simplepipe.hpp" - -#include -#include - -#include -#include -#include -#include - - -namespace I2n -{ -namespace SimpleIo -{ - - -/* - * Implementation of SimplePipe - */ - -SimplePipe::SimplePipe() -{ - m_signal_read.connect(boost::bind(&SimplePipe::slotReceived,this)); -} // eo SimplePipe::SimplePipe() - - -SimplePipe::~SimplePipe() -{ -} // eo SimplePipe::~SimplePipe() - - -/** - * makes a pipe. - * This method connects itself with a newly created peer object with a bidirectional pipe. - * @return the peer pipe object. - */ -bool SimplePipe::makePipe(SimplePipe& peer) -{ - close(); // just in case... - - int fds[2]; - - int res= ::socketpair( AF_UNIX, SOCK_STREAM, 0, fds); - - if (res) - { - m_errno= errno; - return false; - } - else - { - m_errno= 0; - } - - peer.close(); // just in case - - setWriteFd(fds[0]); - setReadFd(fds[0]); - - peer.setWriteFd(fds[1]); - peer.setReadFd(fds[1]); - - return true; -} // eo SimplePipe.:makePipe() - - -/** - * sends a string through the pipe. - * @param data the data which should be sent to the other side. - */ -void SimplePipe::sendString(const std::string& data) -{ - lowSend(data); -} // eo SimplePipe::sendString(const std::string&) - - -/** - * emits the signal signalReceived with the received data. - * This slot is connected to IOImplementation::m_signal_read. - */ -void SimplePipe::slotReceived() -{ - std::string data; - data.swap(m_input_buffer); - signal_received_string(data); -} // eo SimplePipe::slotReceived() - - -} // eo namespace SimpleIo -} // eo namespace I2n diff --git a/asnycio/simplepipe.hpp b/asnycio/simplepipe.hpp deleted file mode 100644 index 80b7cc6..0000000 --- a/asnycio/simplepipe.hpp +++ /dev/null @@ -1,44 +0,0 @@ -/** @file - * - * (c) Copyright 2007 by Intra2net AG - * - * info@intra2net.com - */ - -#ifndef _SIMPLEIO_SIMPLEPIPE_HPP_ -#define _SIMPLEIO_SIMPLEPIPE_HPP_ - -#include "simpleio.hpp" - -namespace I2n -{ -namespace SimpleIo -{ - - -class SimplePipe : public IOImplementation -{ - public: - SimplePipe(); - virtual ~SimplePipe(); - - bool makePipe(SimplePipe& peer); - - void sendString(const std::string& data); - - boost::signal signal_received_string; - - protected: - - private: - - void slotReceived(); - -}; // eo SimplePipe - - -} // eo namespace SimpleIo -} // eo namespace I2n - - -#endif diff --git a/asnycio/simpleprocess.cpp b/asnycio/simpleprocess.cpp deleted file mode 100644 index 8e3754d..0000000 --- a/asnycio/simpleprocess.cpp +++ /dev/null @@ -1,817 +0,0 @@ -/** @file - * - * - * (c) Copyright 2007-2008 by Intra2net AG - * - * info@intra2net.com - */ - -//#define NOISEDEBUG - -#include "simpleprocess.hpp" - -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include - - -#ifdef NOISEDEBUG -#include -#include -#define DOUT(msg) std::cout << msg << std::endl -#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl -#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl -#else -#define DOUT(msg) do {} while (0) -#define FODOUT(obj,msg) do {} while (0) -#define ODOUT(msg) do {} while (0) -#endif - - -namespace -{ - -using namespace I2n::SimpleIo; - -/** - * local configuration values - */ -namespace config -{ - - /// the capacity of the child status list (/ vector) - const unsigned int pid_pool_capacity= 512; - -} // eo namespace config - - - -/// the previous handler for the child signal (SIGCHLD) -void (*oldChildHandler)(int) = NULL; - -/// method pointer for activating process manager -void (ProcessManager::*_activate_manager)(); - -PidStateList pending_pid_states; - - -/** - * signal handler for child signal (SIGCHLD) - * @param sig the signal number as provided by the OS - */ -void handleSigChild(int sig) -{ - int status; - pid_t pid; - while ( (pid = waitpid(-1,&status,WNOHANG)) > 0) - { - pending_pid_states.push_back( PidStatePair(pid,status) ); - } - if (_activate_manager) - { - // tricky way to access a protected method without being a (official) friend: - ( ProcessManager::getInstance()->*_activate_manager)(); - } - //TODO: ? - signal(sig,handleSigChild); -} // eo handleSigChild - - -namespace process -{ - -typedef std::pair PidProcPair; -typedef std::list< PidProcPair > PidProcList; - - -template< typename F, typename S > -struct CmpFirst -{ - F _f; - CmpFirst ( F f ) : _f(f) {} - bool operator () ( const std::pair& v ) const { return v.first == _f; } -}; // eo struct CmpFirst - - -std::list g_process_list; -PidProcList g_pid_list; - - -void addProcessInstance( ProcessImplementation* obj ) -{ - g_process_list.push_back(obj); -} // eo addProcessInstance(ProcessImplementation*) - - -void removeProcessInstance( ProcessImplementation* obj ) -{ - // remove obj from list - g_process_list.remove(obj); - // clear pointers in pid list - for(PidProcList::iterator it= g_pid_list.begin(); - it != g_pid_list.end(); - ++it) - { - if (it->second == obj) - { - it->second= NULL; - } - } -} // eo removeProcessInstance(ProcessImplementation*) - - -void addChildProcess( pid_t pid, ProcessImplementation* obj) -{ - g_pid_list.push_back ( PidProcPair(pid,obj) ); -} // eo addChildProcess(pid_t,ProcessImplementation*) - - -void removeChildProcess ( pid_t pid, ProcessImplementation* obj) -{ - PidProcList::iterator it= std::find( - g_pid_list.begin(), g_pid_list.end(), - PidProcPair(pid,obj)); - if (it != g_pid_list.end()) - { - g_pid_list.erase(it); - } -} // eo removeChildProcess(pid_t,ProcessImplementation*) - - -bool findChildProcess ( pid_t pid, ProcessImplementation* & obj ) -{ - PidProcList::iterator it = std::find_if( - g_pid_list.begin(), g_pid_list.end(), - CmpFirst(pid) ); - if (it == g_pid_list.end()) - { - return false; - } - obj = it->second; - return true; -} // eo findChildProcess(pid_t,ProcessImplementation*&) - - -} // eo namespace process - - - - - -/* -** misc tools -*/ - - -/** - * convenience tool for closing file descriptors... - */ -struct FdCloser -{ - int m_fd; - - FdCloser(int fd=-1) : m_fd(fd) {} - - ~FdCloser() - { - if (m_fd >= 0) ::close(m_fd); - } - - void release() { m_fd= -1; } - -}; // eo struct FdCloser - - - -} // eo namespace - - -namespace I2n -{ -namespace SimpleIo -{ - - -/* - * global functions - */ - -/** - * installs the handler for the child signal (SIGCHLD). - * Installing this handler is mandatory for the process subsystem to work correctly. - * @return @a true iff the child handler is successfully installed. - */ -bool installChildHandler() -{ - if (oldChildHandler) - { - // already installed - return true; - } - if (! ProcessManager::getInstance() ) - { - // we need an instance of the process manager - return false; - } - pending_pid_states.reserve( config::pid_pool_capacity ); - oldChildHandler = signal( Signal::CHLD, handleSigChild ); - if (oldChildHandler == SIG_ERR) - { - oldChildHandler= NULL; - return false; - } - return true; -} // eo installChildHandler - - -/** - * uninstalls the child handler. - * @return @a true iff the old child handler is reestablished. - */ -bool restoreChildHandler() -{ - if (!oldChildHandler) - { - return false; - } - void(*res)(int) = signal( Signal::CHLD, oldChildHandler); - - if (res == SIG_ERR) - { - return false; - } - oldChildHandler= NULL; - return true; -} // eo restoreChildHandler - - - - -/* - * Implementation of ProcessImplementation - */ - -IOImplementation2* ProcessImplementation::_StderrOnStdout = ((IOImplementation2*) 1); -IOImplementation2* ProcessImplementation::_UseParentsStderr = ((IOImplementation2*) 0); - - -/** - * @brief constructor for the process implementation. - * - * the constructor takes the path to the executable and (initial) cli arguments. - * - * @param path path to the executable. - * @param args initial command line arguments. - */ -ProcessImplementation::ProcessImplementation( - const std::string& path, - const std::vector& args - ) -: IOImplementation(-1,-1) -, m_path(path) -, m_nice_inc(0) -, m_create_new_session(false) -, m_pid(0) -, m_state(ProcessState::stopped) -, m_exit_code(0) -{ - m_args.push_back(path); - std::copy( args.begin(), args.end(), std::back_inserter(m_args) ); - process::addProcessInstance(this); -} // eo ProcessImplementation::ProcessImplementation(const std::string&) - - -ProcessImplementation::~ProcessImplementation() -{ - if (m_pid > 0 && m_state!=ProcessState::stopped) - { - stopProcess(true); - } - process::removeProcessInstance(this); -} // eo ProcessImplementation::~ProcessImplementation() - - -void ProcessImplementation::close(Direction direction) -{ - inherited::close(direction); - if (!inherited::opened() && (m_state != ProcessState::stopped) ) - { - stopProcess(false); - } -} // eo ProcessImplementation::close(Direction) - - -/** - * returns an object for adding new arguments to the argument list. - * @return the adder object. - */ -PushBackFiller ProcessImplementation::getArgAdder() -{ - return PushBackFiller(m_args); -} // eo ProcessImplementation::getArgAdder() - - -/** - * @brief set if the process should create a new session when started. - * @param enable determine if the process should start a new session. - * @return @a true iff the value of enable was accepted. - * - * If the process is already running, a new value is not accepted. - */ -bool ProcessImplementation::setCreateNewSession( bool enable ) -{ - if (m_state != ProcessState::stopped and enable != m_create_new_session) - { - return false; - } - m_create_new_session= enable; - return true; -} // eo ProcessImplementation::setCreateNewSession(bool); - - -/** - * @brief sets a new nice increment. - * @param nice the desired nice increment. - * @return @a true if the value was accepted and - in case the process was already started - - * the nice value was successfully changed. - */ -bool ProcessImplementation::setNice(int nice) -{ - errno= 0; - if (m_state != ProcessState::stopped) - { - int delta= m_nice_inc + nice; - m_nice_inc= nice; - int res= ::nice(delta); - if (res == -1 and errno !=0 ) - { - return false; - } - } - else - { - m_nice_inc = nice; - } - return true; -} // eo ProcessImplementation::setNice(int) - - -/** - * @brief sets the work dir the process should be started with. - * @param workdir the workdir - * @return @a true if the new workdir was accepted. - * - * The method will return @a false if the process is already started. - * The workdir can only be set before the process is started. - */ -bool ProcessImplementation::setWorkDir(const std::string& workdir) -{ - if ( m_state != ProcessState::stopped and workdir != m_workdir) - { - return false; - } - if (not workdir.empty()) - { - I2n::Stat stat(workdir); - if (not stat or not stat.is_directory()) - { - return false; - } - } - m_workdir= workdir; - return true; -} // eo ProcessImplementation::setWorkDir(const std::string&) - - -/** - * @brief sets new arguments for the process (the path to the binary is kept). - * - * @param args the new cli arguments for the subprocess (replacing the old ones). - */ -void ProcessImplementation::resetArgs( const std::vector< std::string >& args ) -{ - if (m_args.size() > 1) - { - m_args.erase( ++m_args.begin(), m_args.end()); - } - std::copy( args.begin(), args.end(), std::back_inserter(m_args) ); -} // eo ProcessImplementation::resetArgs(const std::vectors< std::string >&) - - -/** - * starts the new process. - * provides pipes for sending data to/ receiving data from the new process. - * Basically forks and execs the new process. - * - * @param stderr if not NULL the given object will be connected to stderr of the new process. - * The object can then be used for reading the data from the process' stderr; but cannot be written to. - * (The object will be closed if it was open). - * If the constant @a ProcessImplementation::StderrOnStdout is passed then stderr of the new process will - * be written to the same channel as stdout (i.e. can be read from the process class instance like the - * normal output). - * If NULL then the stderr channel from the parent process will also be used by the child. - * @return @a true iff the new subprocess started. - */ -bool ProcessImplementation::startProcess( IOImplementation2 *stderr ) -{ - bool stderr2stdout= false; - m_errno = 0; - m_input_buffer.clear(); - if (m_pid > 0 && m_state != ProcessState::stopped) - { - // process still/already running... - return false; - } - m_exit_code= 0; - - if (stderr == _StderrOnStdout) - { - stderr2stdout= true; - stderr= NULL; - } - - int to_process_pipe[2]; - int from_process_pipe[2]; - int from_process_stderr_pipe[2]= { -1, -1 }; - - if ( ::pipe(to_process_pipe) ) - { - m_errno= errno; - return false; - } - FdCloser closeTo0( to_process_pipe[0] ); - FdCloser closeTo1( to_process_pipe[1] ); - if ( ::pipe (from_process_pipe) ) - { - m_errno= errno; - return false; - } - FdCloser closeFrom0( from_process_pipe[0] ); - FdCloser closeFrom1( from_process_pipe[1] ); - if (stderr) - { - if (stderr->opened()) stderr->close(); - if ( ::pipe (from_process_stderr_pipe) ) - { - m_errno= errno; - return false; - } - } - FdCloser closeFromErr0( from_process_stderr_pipe[0] ); - FdCloser closeFromErr1( from_process_stderr_pipe[1] ); - - m_pid = ::fork(); - - if ( m_pid == (pid_t)-1 ) - { - m_errno= errno; - m_pid= 0; - // error; something went wrong - return false; - } - else if (m_pid > 0) - { - // we are in the parent part - - // keep the fd's we need and (later) close the other ones: - closeTo1.release(); // don't close this fd! - setWriteFd(to_process_pipe[1]); - closeFrom0.release(); // don't close this fd! - setReadFd(from_process_pipe[0]); - - if (stderr) - { - closeFromErr0.release(); // don't close this fd! - stderr->setReadFd(from_process_stderr_pipe[0]); - } - - m_state= ProcessState::running; - process::addChildProcess(m_pid,this); - DOUT(" started child with pid " << m_pid); - return true; - } - else // pid > 0 - { - // we are in the child part - - // dup the fd's for stdin/-out/-err into place: - ::dup2(to_process_pipe[0],0); - ::dup2(from_process_pipe[1],1); - if (stderr) - { - ::dup2(from_process_stderr_pipe[1],2); - ::close(from_process_stderr_pipe[0]); ::close(from_process_stderr_pipe[1]); - } - else if (stderr2stdout) - { - ::dup2(from_process_pipe[1],2); - } - // close what we don't need: - ::close(to_process_pipe[0]); ::close(to_process_pipe[1]); - ::close(from_process_pipe[0]); ::close(from_process_pipe[1]); - - // set workdir if requested: - if (not m_workdir.empty()) - { - int r= ::chdir( m_workdir.c_str() ); - if (r !=0 ) - { - //TODO? - exit(255); - } - } - - // - // collect args: - char **argv= new char*[m_args.size()+1]; - int i=0; - for(std::vector::iterator it= m_args.begin(); - it != m_args.end(); - ++it,++i) - { - argv[i]= strdup( it->c_str() ); - } - argv[i]= NULL; - // update nice level: - if (m_nice_inc) - { - nice(m_nice_inc); - } - // create a new session id if requested: - if (m_create_new_session) - { - setsid(); - } - // execute: - execv(m_path.c_str(), argv); - // exit if exec failed - exit(255); - //cleanup! ... just joking; we exec or we exit, in either case the system cleans - // everything which needs to be cleaned up. - } - return false; // keep the compiler happy... -} // eo ProcessImplementation::startProcess() - - -/** - * convenience method for starting the child process. - * This method uses predefined enum values for the stderr handling mode. - * - * @param stderr_mode the desired stderr mode. - * @return @a true iff the child process was created. - */ -bool ProcessImplementation::startProcess( ProcessImplementation::StderrMode stderr_mode ) -{ - switch (stderr_mode) - { - case UseParentsStderr: - return startProcess( _UseParentsStderr ); - - case StderrOnStdout: - return startProcess( _StderrOnStdout ); - } - return false; -}; // eo ProcessImplementation::startProcess(ProcessImplementation::StderrMode) - - -/** - * stops the process. - * - * @todo think about a more intelligent handling... - */ -void ProcessImplementation::stopProcess(bool force) -{ - // TODO: do it somewhat more intelligent?! - if (force) - { - kill(Signal::KILL); - //TODO: set running state? - } - else - { - kill(Signal::TERM); - } -} // eo ProcessImplementation::stop(bool) - - - -/** - * sends a signal to the child process. - * @param signal the Signal which should be send. - * @return @a true if the signal was sent; @a false if an error occured. - */ -bool ProcessImplementation::kill(Signal signal) -{ - m_errno = 0; - if (m_pid == 0 || m_pid == (pid_t)-1) - { - m_errno= ESRCH; - return false; - } - int res = ::kill(m_pid, signal); - if (res < 0) - { - m_errno= errno; - return false; - } - if (signal == Signal::CONT && m_state == ProcessState::suspended) - { - m_state = ProcessState::running; - } - return true; -} // eo ProcessImplementation::kill(Signal) - - - -/** - * set a new child state with information gobbled by the child signal handler. - * - * @note This method should only be called by the process manager! - * - * @param pid the pid of the child process. - * @param status the new status value (as delivered by waitpid()) - */ -void ProcessImplementation::setChildState(pid_t pid, int status) -{ - DOUT("setChildState("<first; - int status = it->second; - ODOUT(" pid=" << pid << ", status=" << status); - ProcessImplementation *process_obj; - if (process::findChildProcess(pid,process_obj)) - { - ODOUT(" local managed child, process_obj="<< process_obj); - // pid found in list: - if (!WIFSTOPPED(status) -#ifdef WIFCONTINUED - && !WIFCONTINUED(status) -#endif - ) - { - // take it from list if the child exited: - process::removeChildProcess(pid,process_obj); - } - if (process_obj) - { - // give the process object a chance to handle the state change: - process_obj->setChildState(pid, status); - } - } - else - { - ODOUT("foreign child"); - // pid not found in list: - /* NOTE: in a non threaded environment this pid must be from a child process which is not - managed by this process classes; since this method is called after all setup of a child process - is done (; especially entering the new child pid into our internal lists). - */ - m_foreign_pid_states.push_back(*it); - } - } - - // handle the foreign childs: - { - /* idea: - * fetch a (pid,status) from the list, erase it (to avoid reentrance problems) - * and fire the signal. If someone forks childs outside this module then he can - * connect to the signal and receive all necessary status information gobbled by - * our child handler. - */ - while (! m_foreign_pid_states.empty()) - { - PidStateList::iterator it= m_foreign_pid_states.begin(); - pid_t pid = it->first; - int status = it->second; - m_foreign_pid_states.erase(it); - m_foreign_child_state_changed_signal(pid,status); - } - } - -} // eo ProcessManager::execute - - -} // eo namespace SimpleIo -} // eo namespace I2n diff --git a/asnycio/simpleprocess.hpp b/asnycio/simpleprocess.hpp deleted file mode 100644 index d42c755..0000000 --- a/asnycio/simpleprocess.hpp +++ /dev/null @@ -1,198 +0,0 @@ -/** @file - * - * simple process handling based on simple io classes. - * - * (c) Copyright 2007-2008 by Intra2net AG - * - * info@intra2net.com - */ - -#ifndef _CONND_SIMPLEPROCESS_HPP_ -#define _CONND_SIMPLEPROCESS_HPP_ - -#include -#include - -#include - -#include -#include -#include "simpleio.hpp" - - -namespace I2n -{ -namespace SimpleIo -{ - -using SystemTools::Signal; -using SystemTools::ScopedSignalBlocker; - - -class ProcessManager; - - -typedef std::pair< pid_t, int > PidStatePair; -typedef std::vector< PidStatePair > PidStateList; - - -/** - * represents process states. - */ -struct ProcessState -{ - enum _ProcessState - { - stopped = 0, - running, - suspended - }; // eo enum _ProcessState - - _ProcessState m_state; - - ProcessState(_ProcessState _state = stopped) : m_state(_state) {} - - operator _ProcessState() const { return m_state; } -}; // eo struct ProcessState - - -/** - * specialisation of the io implementation class which fork/exec's a subprocess and - * connects with the new child's stdin/stdout. - * - * @note the signal @a IOImplementation::m_signal_eof of the base class can be used to detect when the - * new child closes it's stdout (which usually means that the child ended). - */ -class ProcessImplementation : public IOImplementation -{ - typedef IOImplementation inherited; - - friend class ProcessManager; - - public: - - enum StderrMode - { - /// "magic" constant to pass to start() when childs stderr should be the same as parents stderr. - UseParentsStderr= 0, - /// "magic" constant to pass to start() when stderr should be the same as stdout for the new process. - StderrOnStdout - }; // eo enum StderrMode - - public: - ProcessImplementation( - const std::string& path, - const std::vector& args = std::vector()); - virtual ~ProcessImplementation(); - - virtual void close(Direction direction = Direction::both); - - virtual bool startProcess( IOImplementation2* stderr ); - bool startProcess( StderrMode stderr_mode = UseParentsStderr ); - - virtual void stopProcess(bool force=false); - - PushBackFiller getArgAdder(); - - bool setCreateNewSession( bool enable= true); - - bool setNice(int nice); - - bool setWorkDir(const std::string& workdir); - - void resetArgs( const std::vector< std::string >& args = std::vector< std::string >() ); - - /// returns the current process state - ProcessState processState() const { return m_state; } - - ///returns the exit code of the process (if in stopped state) - int exitCode() const { return m_exit_code; } - - - protected: - - bool kill(const Signal signal); - - void setChildState(pid_t pid, int status); - - protected: - /// the path to the binary - std::string m_path; - /// argument list (starting with argv0, usually the name of the binary) - std::vector m_args; - /// increment of the nice level when the new child is started - int m_nice_inc; - /// determines if the child should start a new session. - bool m_create_new_session; - /// determines the workdir where the child process should be started with. - std::string m_workdir; - - /// the pid of the child process - pid_t m_pid; - /// the state of the child process - ProcessState m_state; - /// the exit code of the child (-1 if not available yet) - int m_exit_code; - - /// signal which is fired when the child terminated - SignalType m_signal_terminated; - - - /// "magic" constant to pass to start() when childs stderr should be the same as parents stderr. - static IOImplementation2* _UseParentsStderr; - /// "magic" constant to pass to start() when stderr should be the same as stdout for the new process. - static IOImplementation2* _StderrOnStdout; - - private: - -}; // eo class ProcessImplementation - - -/** - * manages overall process related stuff. - * - * @note this class is implemented as a singleton. - * @note this class uses the io timer interface to be called within the backend loops when necessary. - */ -class ProcessManager : public TimerBase -{ - public: - - static ProcessManager* getInstance(); - - protected: - ProcessManager(); - ProcessManager(const ProcessManager&); - - virtual void execute(); - - void activateMe(); - - public: - - /** - * the signal which is fired when waitpid() returns a status for a child process - * which is not managed by this process subsystem. - * Another module which forks child processes can connect to this signal to receive - * the information when these child processes are terminated. - */ - boost::signal m_foreign_child_state_changed_signal; - - protected: - - static ProcessManager *the_instance; - - PidStateList m_foreign_pid_states; - - private: -}; - - -bool installChildHandler(); -bool restoreChildHandler(); - - -} // eo namespace SimpleIo -} // eo I2n - -#endif diff --git a/asnycio/simplesocket.cpp b/asnycio/simplesocket.cpp deleted file mode 100644 index def93c0..0000000 --- a/asnycio/simplesocket.cpp +++ /dev/null @@ -1,397 +0,0 @@ -/** @file - * - * (c) Copyright 2008 by Intra2net AG - * - * info@intra2net.com - * - * @todo unlink unix server socket on close. - */ - -#include "simplesocket.hpp" -#include -#include -#include -#include -#include -#include -#include -#include - - -namespace I2n -{ -namespace SimpleIo -{ - - -namespace -{ - -struct sockaddr_un dummy_un; - -union MegaAddr { - struct sockaddr m_addr; - struct sockaddr_in m_addr_in; - struct sockaddr_un m_addr_un; // NOTE (historically) too small... - // storage is large enough to hold all sockaddr_* variants with the (historical) exception of _un ! - struct sockaddr_storage m_addr_store; - // a char array large enough to hold _un (with an path up to the maximum allowed size!) - // (the +1 is added for a later 0-termination of the path) - char m_buffer[ sizeof(dummy_un) - sizeof(dummy_un.sun_path) + PATH_MAX + 1 ]; -}; - - -} // eo namespace - - - -/* -** implementation of ServerSocketBaseImplementation -*/ - - -ServerSocketBaseImplementation::ServerSocketBaseImplementation() -: IOImplementation() -{ -} // eo ServerSocketBaseImplementation::ServerSocketBaseImplementation() - - -/** - * @brief handled incoming connections on the server port. - * - * accepts the new connection, stores the peer address in an internal buffer - * and calls a (derived) acceptNewConnection method to create an apropriate - * IO class instance. - * If no io instance is created the connection is closed. - */ -void ServerSocketBaseImplementation::doRead() -{ - MegaAddr addr; - socklen_t addrlen = sizeof(addr); - - // reset errno: - m_errno= 0; - - // reset the mark: - resetReadMark(); - - int fd = ::accept( readFd(), &addr.m_addr, &addrlen); - if (fd < 0) - { - // handle errors. - m_errno= errno; - switch (m_errno) - { - case EBADF: - case EINVAL: - case ENOTSOCK: - // should not happen... - close(); - break; - default:; - } - return; - } - - if (addrlen < sizeof(addr)) - { - // in case of unix domain socket: terminate the path! - // NOTE we are doing this here since we don't pass the length info. - addr.m_buffer[addrlen]= 0; - } - else - { - //something went terribly wrong!! - // the resulting address structure is larger than it ever could be... - ::close(fd); - return; - } - - IOImplementationPtr connection= acceptNewConnection(fd, &addr.m_addr); - if(!connection) - { - ::close(fd); - return; - } - if (m_new_connection_base_callback) - { - m_new_connection_base_callback(connection); - } -} // eo ServerSocketBaseImplementation::doRead() - - -/** - * @brief handles write events. - * - * since server sockets never ever get a write mark, something must - * be wrong and the connection is closed! - */ -void ServerSocketBaseImplementation::doWrite() -{ - // if this method is called, something went wrong... - close(); - //TODO throw something?! -} // eo ServerSocketBaseImplementation::doWrite() - - - -/** - * @brief sets a function which is called when a new connection was established. - * - * The function gets a (shared) pointer to the new connetion as parameter and is - * expected to store it when it accepts the connection. - * (Else the conenction gets deleted after the function was called.) - * - * @param func the function which hsould be called on new conenctions. - */ -void ServerSocketBaseImplementation::setNewConnectionBaseCallback( - const NewConnectionBaseCallbackFunc& func) -{ - m_new_connection_base_callback= func; -} // eo ServerSocketBaseImplementation::setNewConnectionBaseCallback(NewConnectionBaseCallbackFunc&) - - - -/** - * @brief callback for new connections. - * - * The base method always returns an empty pointer. - * - * Derived classes must override this method and do something useful with - * the passed file descriptor. - * - * @param fd the file descriptor of the new connection. - * @param addr pointer to the address structure filled from ::accept() - * @return shared pointer to the new IO class instance (; empty if not accepted) - */ -IOImplementationPtr ServerSocketBaseImplementation::acceptNewConnection( - int fd, boost::any addr) -{ - // needs to be defined in derived class! - return IOImplementationPtr(); -} // eo ServerSocketBaseImplementation::acceptNewConnection(int,boost::any) - - - -/* -** implementation of UnixIOSocket -*/ - - -UnixIOSocket::UnixIOSocket() -{ -} // eo UnixIOSocket::UnixIOSocket() - - - -UnixIOSocket::UnixIOSocket(const std::string& path) -: m_peer_pid(0) -, m_peer_uid(0) -, m_peer_gid(0) -{ - open(path); -} // eo UnixIOSocket::UnixIOSocket(const std::string&) - - -UnixIOSocket::UnixIOSocket( - int fd, const std::string& path, - unsigned int peer_pid, unsigned int peer_uid, unsigned int peer_gid) -: IOImplementation(fd,fd) -, m_path(path) -, m_peer_pid(peer_pid) -, m_peer_uid(peer_uid) -, m_peer_gid(peer_gid) -{ -} // eo UnixIOSocket::UnixIOSocket(int,const std::string&,unsigned,unsigned,unsigned) - - -/** - * @brief opens a (client) connection to an unix domain socket. - * - * @param path the path the server is listening on. - * @return @a true iff the connection was successfully opened. - */ -bool UnixIOSocket::open(const std::string& path) -{ - if (opened()) close(); - - m_errno= 0; - m_path.clear(); - - if (path.empty() || path.size() >= PATH_MAX) - { - return false; - } - - int fd= ::socket(PF_UNIX, SOCK_STREAM, 0); - if (fd<0) - { - m_errno= errno; - return false; - } - - { - MegaAddr addr; - addr.m_addr_un.sun_family= AF_UNIX; - strncpy(addr.m_addr_un.sun_path, path.c_str(), PATH_MAX); - if (::connect(fd,(sockaddr*)&addr.m_addr_un, SUN_LEN(&addr.m_addr_un)) < 0) - { - m_errno= errno; - ::close(fd); - return false; - } - } - m_path= path; - setReadFd(fd); - setWriteFd(fd); - return true; -} // eo UnixIOSocket::open(const std::string&,int) - - -/* -** implementation of UnixServerSocketBase -*/ - - -UnixServerSocketBase::UnixServerSocketBase() -{ -} // eo UnixServerSocketBase::UnixServerSocketBase - - -UnixServerSocketBase::UnixServerSocketBase(const std::string& path, int mode) -{ - open(path,mode); -} // eo UnixServerSocketBase::UnixServerSocketBase(const std::string&,int) - - -/** - * @brief opens the server part of an unix domain socket. - * - * @param path the path the new port should listen on. - * @param mode the mode for the path. - * @return @a true iff the port was successfully opened. - */ -bool UnixServerSocketBase::open(const std::string& path, int mode) -{ - if (opened()) close(); - m_errno= 0; - if (path.empty() || path.size() >= PATH_MAX) - { - return false; - } - - int fd= ::socket(PF_UNIX, SOCK_STREAM, 0); - if (fd<0) - { - m_errno= errno; - return false; - } - - { - MegaAddr addr; - addr.m_addr_un.sun_family= AF_UNIX; - strncpy(addr.m_addr_un.sun_path, path.c_str(), PATH_MAX); - ::I2n::unlink(path); // just in case... - mode_t old_mask= ::umask( (mode & 0777) ^ 0777); - if (::bind(fd,(sockaddr*)&addr.m_addr_un, SUN_LEN(&addr.m_addr_un)) < 0) - { - m_errno= errno; - ::umask(old_mask); - ::close(fd); - return false; - } - ::umask(old_mask); - } - m_path= path; - - { - int res= ::listen(fd,8); - if (res < 0) - { - m_errno= errno; - ::close(fd); - return false; - } - } - setReadFd(fd); - return true; -} // eo UnixServerSocketBase::open(const std::string&,int) - - -/** - * @brief called from base class to create a new connection instance. - * - * This method accepts only connections to unix domain sockets. - * It also tries to determine the peer pid, uid and gid. - * - * @param fd the file descriptor of a freshly accepted connection. - * @param addr conatins "pointer to struct sockaddr" - * @return @a a (shared) pointer to the new connection isntance; empty if none was - * created. - */ -IOImplementationPtr UnixServerSocketBase::acceptNewConnection(int fd, boost::any addr) -{ - struct sockaddr *addr_ptr= NULL; - try { - addr_ptr = boost::any_cast(addr); - } - catch (boost::bad_any_cast&) - { - return IOImplementationPtr(); - } - // check for the right family: - if (addr_ptr->sa_family != AF_UNIX) - { - return IOImplementationPtr(); - } - struct sockaddr_un *un_ptr = reinterpret_cast(addr_ptr); - std::string peer_path( un_ptr->sun_path ); - unsigned peer_pid=0; - unsigned peer_gid=0; - unsigned peer_uid=0; -#ifdef __linux__ - { // the linux way to get peer info (pid,gid,uid): - struct ucred cred; - socklen_t cred_len = sizeof(cred); - if (getsockopt(fd,SOL_SOCKET,SO_PEERCRED,&cred,&cred_len) == 0) - { - peer_pid= cred.pid; - peer_uid= cred.uid; - peer_gid= cred.gid; - } - } -#else -#error dont know how to determine peer info. -#endif - - UnixIOSocketPtr ptr( createIOSocket(fd, peer_path, peer_pid, peer_uid, peer_gid) ); - return ptr; -} // eo UnixServerSocketBase::acceptNewConnection(int,boost::any); - - -/** - * @brief "real" creator of the connection instance. - * - * called by UnixServerSocketBase::acceptNewConnection to create the new io instance. - * - * @param fd file descriptor for the socket - * @param path path as delivered by peer. - * @param peer_pid peer pid. - * @param peer_uid peer uid. - * @param peer_gid peer gid. - * @return (shared) pointer to the new io instance. - */ -UnixIOSocketPtr UnixServerSocketBase::createIOSocket( - int fd, const std::string& path, - unsigned int peer_pid, - unsigned int peer_uid, unsigned int peer_gid) -{ - return UnixIOSocketPtr( - new UnixIOSocket(fd, path, peer_pid, peer_uid, peer_gid) - ); -} // eo UnixServerSocketBase::createIOSocket(int,const std::string&,unsigned,unsigned,unsigned) - - - -}// eo namespace SimpleIo -}// eo namespace I2n diff --git a/asnycio/simplesocket.hpp b/asnycio/simplesocket.hpp deleted file mode 100644 index 745c787..0000000 --- a/asnycio/simplesocket.hpp +++ /dev/null @@ -1,215 +0,0 @@ -/** @file - * @brief socket classes for the SimpleIo framework. - * - * - * (c) Copyright 2008 by Intra2net AG - * - * info@intra2net.com - */ - -#ifndef __SIMPLEIO__SIMPLESOCKET_HPP__ -#define __SIMPLEIO__SIMPLESOCKET_HPP__ - -#include "simpleio.hpp" - -#include -#include -#include -#include -#include -#include -#include - - -namespace I2n -{ -namespace SimpleIo -{ - - -typedef boost::shared_ptr< IOImplementation > IOImplementationPtr; - - -/** - * @brief base class for server sockets. - * - * Contains all the stuff which is common for different types of server sockets. - */ -class ServerSocketBaseImplementation -: public IOImplementation -{ - public: - typedef boost::function< void(IOImplementationPtr) > NewConnectionBaseCallbackFunc; - - public: - - void setNewConnectionBaseCallback( const NewConnectionBaseCallbackFunc& func); - - protected: - ServerSocketBaseImplementation(); - - - virtual void doRead(); - virtual void doWrite(); - - virtual IOImplementationPtr acceptNewConnection(int fd, boost::any addr); - - - protected: - - NewConnectionBaseCallbackFunc m_new_connection_base_callback; - -}; // eo class ServerSocketBaseImplementation - - -typedef boost::shared_ptr< ServerSocketBaseImplementation > ServerSocketBaseImplementationPtr; - - -/* -** unix domain sockets -*/ - - -template< - class IOClass -> -class UnixServerSocket; - - -/** - * @brief spezialized IO class for unix domain sockets. - * - */ -class UnixIOSocket -: public IOImplementation -{ - public: - UnixIOSocket(); - UnixIOSocket(const std::string& path); - - bool open(const std::string& path); - - protected: - friend class UnixServerSocketBase; - friend class UnixServerSocket; - - UnixIOSocket( - int fd, const std::string& path, - unsigned int peer_pid, unsigned int peer_uid, unsigned int peer_gid); - - protected: - - std::string m_path; - unsigned int m_peer_pid; - unsigned int m_peer_uid; - unsigned int m_peer_gid; - -}; // eo class UnixIOSocket - -typedef boost::shared_ptr< UnixIOSocket > UnixIOSocketPtr; - - - -/** - * @brief spezialized server socket class for unix domain sockets. - * - */ -class UnixServerSocketBase -: public ServerSocketBaseImplementation -{ - public: - UnixServerSocketBase(); - UnixServerSocketBase(const std::string& path, int mode=0600); - - bool open(const std::string& path, int mode= 0600); - - protected: - - virtual IOImplementationPtr acceptNewConnection(int fd, boost::any addr); - - virtual UnixIOSocketPtr createIOSocket( - int fd, const std::string& path, - unsigned int peer_pid, - unsigned int peer_uid, unsigned int peer_gid); - - protected: - - std::string m_path; - -}; // eo class UnixServerSocketBase - - -/** - * @brief unix server socket class which "produces" connections of a determined type. - * - + @param IOClass the type of the connections. - */ -template< - class IOClass -> -class UnixServerSocket -: public UnixServerSocketBase -{ - BOOST_STATIC_ASSERT(( boost::is_base_of::value )); - - public: - typedef boost::shared_ptr< IOClass > IOClassPtr; - typedef boost::function< void(IOClassPtr) > NewConnectionCallbackFunc; - - public: - - UnixServerSocket() - : UnixServerSocketBase() - {} - - UnixServerSocket(const std::string& path, int mode=0600) - : UnixServerSocketBase(path,mode) - {} - - void setNewConnectionCallback( const NewConnectionCallbackFunc& func) - { - if (func) - { - UnixServerSocketBase::setNewConnectionBaseCallback( - boost::bind( - func, - boost::bind( - &UnixServerSocket::my_ptr_cast, - _1 - ) - ) - ); - } - else - { - UnixServerSocketBase::setNewConnectionBaseCallback( - NewConnectionBaseCallbackFunc() - ); - } - } - - protected: - - virtual UnixIOSocketPtr createIOSocket( - int fd, const std::string& path, - unsigned int peer_pid, - unsigned int peer_uid, unsigned int peer_gid) - { - return UnixIOSocketPtr( - new IOClass(fd, path, peer_pid, peer_uid, peer_gid) - ); - } - - static IOClassPtr my_ptr_cast(IOImplementationPtr ptr) - { - return boost::dynamic_pointer_cast(ptr); - } - -}; // eo class UnixServerSocket - - -}// eo namespace SimpleIo -}// eo namespace I2n - - -#endif diff --git a/asnycio/simpletimer.cpp b/asnycio/simpletimer.cpp deleted file mode 100644 index f716981..0000000 --- a/asnycio/simpletimer.cpp +++ /dev/null @@ -1,107 +0,0 @@ -/** @file - * - * (c) Copyright 2007 by Intra2net AG - * - * info@intra2net.com - */ - -//#define NOISEDEBUG - - -#include "simpletimer.hpp" - - -#ifdef NOISEDEBUG -#include -#include -#define DOUT(msg) std::cout << msg << std::endl -#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl -#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl -#else -#define DOUT(msg) do {} while (0) -#define FODOUT(obj,msg) do {} while (0) -#define ODOUT(msg) do {} while (0) -#endif - - -namespace I2n -{ -namespace SimpleIo -{ - - -/* - * Implementation of SimpleTimer - */ - -SimpleTimer::SimpleTimer() -{ -} // eo SimpleTimer::SimpleTimer() - - -SimpleTimer::SimpleTimer(const MilliTime& delta, const TimerSignal::slot_function_type& action) -{ - addAction(action); - startTimer(delta); -} // eo SimpleTimer::SimpleTimer(const MilliTime&, const TimerSignal::slot_type&) - - -SimpleTimer::SimpleTimer(long milli_seonds_delta, const TimerSignal::slot_function_type& action) -{ - addAction(action); - startTimerMS(milli_seonds_delta); -} // eo SimpleTimer::SimpleTimer(long, const TimerSignal::slot_type&) - - -void SimpleTimer::execute() -{ - ODOUT("execute()!"); - m_timer_signal(); - m_timer_signal_p(this); -} // eo SimpleTimer::execute - - -void SimpleTimer::addAction( const TimerSignal::slot_function_type& action ) -{ - m_timer_signal.connect(action); -} // eo SimpleTimer::addAction(const TimerSignal::slot_type&) - - -void SimpleTimer::addActionP( const TimerSignalP::slot_function_type& action ) -{ - m_timer_signal_p.connect(action); -} // eo SimpleTimer::addAction(const TimerSignalP::slot_type&) - - -void SimpleTimer::startTimer( const MilliTime& delta ) -{ - m_delta= delta; - setDeltaWhenTime( delta ); - activate(true); -#ifdef NOISEDEBUG - MilliTime now, t; - get_current_time(now); - t= getWhenTime(); - MilliTime dt= t-now; - ODOUT("startTimer"); - ODOUT(" now: sec="<< now.mt_sec << ", msec="< - -namespace I2n -{ -namespace SimpleIo -{ - - -class SimpleTimer : public TimerBase -{ - public: - - typedef boost::signal TimerSignal; - typedef boost::signal TimerSignalP; - - public: - - SimpleTimer(); - - // convenience constructors for simple timeouts: - SimpleTimer(const MilliTime& delta, const TimerSignal::slot_function_type& action); - SimpleTimer(long milli_seonds_delta, const TimerSignal::slot_function_type& action); - - // add actions: - void addAction( const TimerSignal::slot_function_type& action ); - void addActionP( const TimerSignalP::slot_function_type& action ); - - // start the timer: - void startTimer( const MilliTime& delta ); - void startTimerMS( long milli_seconds ); - - // stop the timer: - void stopTimer(); - - protected: - MilliTime m_delta; - - TimerSignal m_timer_signal; - TimerSignalP m_timer_signal_p; - - virtual void execute(); -}; // eo class SimpleTimer - - -} // eo namespace SimpleIo -} // eo namespace I2n - -#endif diff --git a/asyncio/Makefile.am b/asyncio/Makefile.am new file mode 100644 index 0000000..2694ef2 --- /dev/null +++ b/asyncio/Makefile.am @@ -0,0 +1,13 @@ +INCLUDES = @LIBI2NCOMMON_CFLAGS@ @BOOST_CPPFLAGS@ +METASOURCES = AUTO +lib_LTLIBRARIES = libsimpleio.la +libsimpleio_la_SOURCES = simplecallout.cpp simpleio.cpp simplepipe.cpp \ + simpleprocess.cpp simplesocket.cpp simpletimer.cpp +include_HEADERS = simplecallout.hpp simpleio.hpp simplepipe.hpp \ + simpleprocess.hpp simplesocket.hpp simpletimer.hpp +libsimpleio_la_LIBADD = @LIBI2NCOMMON_LIBS@ @BOOST_LDFLAGS@ @BOOST_SIGNALS_LIB@ + +libsimpleio_la_LDFLAGS = -version-info @LIBSIMPLEIO_LIB_VERSION@ + +pkgconfigdir=$(libdir)/pkgconfig +pkgconfig_DATA= libsimpleio.pc diff --git a/asyncio/libsimpleio.pc.in b/asyncio/libsimpleio.pc.in new file mode 100644 index 0000000..a671d47 --- /dev/null +++ b/asyncio/libsimpleio.pc.in @@ -0,0 +1,11 @@ +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + +Name: libsimpleio +Description: asynchrounous io lib +Version: @VERSION@ +Requires: libi2ncommon +Libs: -L${libdir} -lsimpleio +Cflags: -I${includedir} diff --git a/asyncio/simplecallout.cpp b/asyncio/simplecallout.cpp new file mode 100644 index 0000000..13f5c63 --- /dev/null +++ b/asyncio/simplecallout.cpp @@ -0,0 +1,336 @@ +/** + * @file + * + * @copyright © Copyright 2008 by Intra2net AG + * @license commercial + * + * info@intra2net.com + */ + +#include "simplecallout.hpp" + +#include +#include + +#include + + +namespace I2n +{ +namespace SimpleIo +{ + +namespace +{ + +typedef boost::shared_ptr< Detail::Caller > CallerPtr; + +typedef std::map< unsigned long, CallerPtr > CallMap; + + +unsigned long l_last_id=0; + +CallMap l_call_map; + + +/** + * @brief creates a new id value for a call out. + * @return the new id. + * + * The id value is basically just a counter. + * It is implemented in a way that it can not be 0 and can deal with wrap arounds. + */ +unsigned long create_call_out_id_value() +{ + while ( l_call_map.find(++l_last_id) != l_call_map.end() and l_last_id != 0); + return l_last_id; +} // eo create_call_out_id_value + + +void add_call( CallerPtr caller ) +{ + if (caller->joinId()) + { + l_call_map[ caller->getCallOutId().getValue() ] = caller; + } +} // eo add_call + + +bool remove_call( unsigned long id_value ) +{ + CallMap::iterator it= l_call_map.find(id_value); + if (it != l_call_map.end()) + { + l_call_map.erase(it); + return true; + } + return false; +} // eo remove_call(unsigned long) + + +CallerPtr get_call(unsigned long id_value) +{ + CallMap::iterator it= l_call_map.find(id_value); + if (it != l_call_map.end()) + { + return it->second; + } + return CallerPtr(); +} // eo get_call(unsigned long) + + +bool has_call( unsigned long id_value ) +{ + CallMap::iterator it= l_call_map.find(id_value); + return (it != l_call_map.end() ); +} // eo has_call(unsigned long) + + + +} // eo namespace + + + +/* +** implementation of class CallOutId +*/ + +CallOutId::CallOutId() +: m_value(0) +{ +} // eo CallOutId::CallOutId() + + +CallOutId::CallOutId(unsigned long value) +: m_value(value) +{ +} // eo CallOutId::CallOutId(unsigned long) + + +bool CallOutId::thaw() const +{ + if (m_caller_weak_ptr.expired()) + { + return false; + } + CallerPtr call_ptr= get_call(m_value); + if (call_ptr) + { + return call_ptr->thaw(); + } + return false; +} // eo CallOutId::thaw() const + + +bool CallOutId::remove() +{ + if (m_caller_weak_ptr.expired()) + { + return false; + } + unsigned long value= m_value; + m_value= 0; + return remove_call(value); +} // eo CallOutId::remove() + + +bool CallOutId::active() const +{ + return m_value!=0 and not m_caller_weak_ptr.expired() and has_call(m_value); +} // eo CallOutId::active() const + + +/** + * @brief retruns if the call is frozen. + * @return @a true iff the call is frozen. + */ +bool CallOutId::frozen() const +{ + CallerPtr caller= m_caller_weak_ptr.lock(); + if (not caller) + { + return false; + } + return caller->frozen(); +} // eo CallOutId::frozen() const + + +/** + * @brief returns the remaining time until the call is done or thrown away (on frozen calls). + * @return the remaining time. + * + * The result only makes sense if the call is still active. + */ +MilliTime CallOutId::remaining_time() +{ + CallerPtr caller= m_caller_weak_ptr.lock(); + if ( not active() or not caller ) + { + return MilliTime(); + } + MilliTime t; + get_current_monotonic_time(t); + MilliTime result= caller->getWhenTime(); + result-= t; + MilliTime t_null; + return (result < t_null ? t_null : result); +} // eo CallOutId::remaining_time() + + +namespace Detail +{ + +/* +** implementation of class Caller +*/ + +Caller::Caller( boost::function< void() > f, long delta_sec, long delta_msec, bool frozen) +: TimerBase() +, m_call_out_id( create_call_out_id_value() ) +, m_func(f) +, m_waiting(frozen) +{ + SCOPETRACKER(); + setDeltaWhenTime( delta_sec, delta_msec); +} // eo Caller::Caller(boost::function< void() >,long) + + +Caller::~Caller() +{ + SCOPETRACKER(); +} // eo Caller::~Caller() + + +void Caller::execute() +{ + SCOPETRACKER(); + // NOTE: since the func may throw an exception, we first get a shared pointer + // (to prevent early deletion) and then we remove us from the call map. + CallerPtr ptr= shared_from_this(); + m_call_out_id.remove(); + + if (m_func and not m_waiting) + { + m_func(); // may throw..., but at this point it doesn't harm. + //( it may harm at other places,...) + } +} // eo Caller::execute() + + +bool Caller::thaw() +{ + if (m_waiting) + { + m_waiting= false; + setDeltaWhenTime( 0, 0 ); + return true; + } + return false; +} // eo Caller::thaw() + + +bool Caller::joinId() +{ + if (m_call_out_id.m_caller_weak_ptr.expired()) + { + m_call_out_id.m_caller_weak_ptr= shared_from_this(); + activate(); + return true; + } + return false; +} // eo Caller::joinId() + + +bool Caller::frozen() const +{ + return m_waiting; +} // eo Caller::frozen() const + + +} // eo namespace Detail + + + +/** + * @brief remove a pending call by id. + * + * @param id the call id which should be removed. + * @return @a true iff the call was removed, @a false if no call with the given id was found. + */ +bool removeCallOut( CallOutId& id ) +{ + return id.remove(); +} // eo removeCallOut(CallOutId&) + + + +template<> +CallOutId callOut( boost::function< void() > f, long delta_sec) +{ + CallerPtr caller( new Detail::Caller(f,delta_sec) ); + add_call(caller); + return caller->getCallOutId(); +} // eo callOut(boost::function< void() >,long) + +template<> +CallOutId callOut( boost::function< void() > f, int delta_sec) +{ + return callOut(f,delta_sec); +} // eo callOut(boost::function< void() >,int) + + +template<> +CallOutId callOut( boost::function< void() > f, double delta_sec ) +{ + long delta_sec_i = (long)delta_sec; + long delta_sec_m = (long)((delta_sec - (double)delta_sec_i)*1000.0); + CallerPtr caller( new Detail::Caller(f,delta_sec_i, delta_sec_m) ); + add_call(caller); + return caller->getCallOutId(); +} // eo callOut(boost::function< void() >,double) + + +template<> +CallOutId callOut( boost::function< void() > f, float delta_sec ) +{ + return callOut(f,delta_sec); +} // eo callOut(boost::function< void() >,float) + + + + +template<> +CallOutId frozenCall( boost::function< void() > f, long delta_sec) +{ + CallerPtr caller( new Detail::Caller(f,delta_sec, 0, true) ); + add_call(caller); + return caller->getCallOutId(); +} // eo frozenCall(boost::function< void() >,long) + +template<> +CallOutId frozenCall( boost::function< void() > f, int delta_sec) +{ + return frozenCall(f,delta_sec); +} // eo frozenCall(boost::function< void() >,int) + + +template<> +CallOutId frozenCall( boost::function< void() > f, double delta_sec ) +{ + long delta_sec_i = (long)delta_sec; + long delta_sec_m = (long)((delta_sec - (double)delta_sec_i)*1000.0); + CallerPtr caller( new Detail::Caller(f,delta_sec_i, delta_sec_m, true) ); + add_call(caller); + return caller->getCallOutId(); +} // eo frozenCall(boost::function< void() >,double) + + +template<> +CallOutId frozenCall( boost::function< void() > f, float delta_sec ) +{ + return frozenCall(f,delta_sec); +} // eo frozenCall(boost::function< void() >,float) + + +} // eo namespace SimpleIo +} // eo namespace I2n diff --git a/asyncio/simplecallout.hpp b/asyncio/simplecallout.hpp new file mode 100644 index 0000000..c895bbc --- /dev/null +++ b/asyncio/simplecallout.hpp @@ -0,0 +1,158 @@ +/** + * @file + * @brief provides a method for delayed execution of functions. + * + * + * @copyright © Copyright 2008 by Intra2net AG + * @license commercial + * + * info@intra2net.com + */ + +#ifndef __SIMPLEIO_SIMPLECALLOUT_HPP_ +#define __SIMPLEIO_SIMPLECALLOUT_HPP_ + +#include "simpleio.hpp" + +#include +#include +#include + + +namespace I2n +{ +namespace SimpleIo +{ + +// forward declarations: +namespace Detail { + +class Caller; + +typedef boost::shared_ptr< Caller > CallerPtr; +typedef boost::weak_ptr< Caller > CallerWeakPtr; + +} // eo namespace Detail + + +/** + * @brief represents an id for a deferred call. + * + * Also provides methods for modifying the call + * (like thaw or delete it). + */ +class CallOutId +{ + friend class Detail::Caller; + + public: + CallOutId(); + + unsigned long getValue() const {return m_value;} + + bool thaw() const; + bool remove(); + + bool active() const; + bool frozen() const; + + MilliTime remaining_time(); + + private: + + CallOutId(unsigned long value); + + private: + + unsigned long m_value; + + Detail::CallerWeakPtr m_caller_weak_ptr; + +}; // eo class CallOutId + + + +/* +** +*/ + +namespace Detail { + +/** + * @brief tool class for holding and executing a deferred call. + * + */ +class Caller +: public TimerBase +, public boost::enable_shared_from_this< Caller > +{ + public: + Caller( boost::function< void() > f, long delta_sec, long delta_msec=0, bool frozen=false ); + virtual ~Caller(); + + CallOutId getCallOutId() const { return m_call_out_id; } + + bool thaw(); + + bool joinId(); + + bool frozen() const; + + protected: + + virtual void execute(); + + private: + + CallOutId m_call_out_id; + boost::function< void() > m_func; + bool m_waiting; +}; // eo class Caller + + +} // eo namespace Detail + +/* +** +*/ + +/** + * @brief initiates a deferred call of a function. + * + * @param f the function which should be called. + * @param delta_sec the delta time (in seconds) when the function should be called. + * @return an id to identify the call (may be used for preliminary removal of the call) + */ +template< typename F > +CallOutId callOut( boost::function< void() > f, F delta_sec ); + +template<> CallOutId callOut( boost::function< void() > f, long delta_sec ); +template<> CallOutId callOut( boost::function< void() > f, double delta_sec ); +template<> CallOutId callOut( boost::function< void() > f, float delta_sec ); +template<> CallOutId callOut( boost::function< void() > f, int delta_sec ); + + +/** + * @brief initiates a frozen call of a function. + * + * @param f the function which should be called. + * @param delta_sec the delta time (in seconds) when the call will be (silently) removed. + * @return an id to identify the call; neccessary for thaw the call. + */ +template< typename F > +CallOutId frozenCall( boost::function< void() > f, F delta_sec); + +template<> CallOutId frozenCall( boost::function< void() > f, long delta_sec ); +template<> CallOutId frozenCall( boost::function< void() > f, double delta_sec ); +template<> CallOutId frozenCall( boost::function< void() > f, float delta_sec ); +template<> CallOutId frozenCall( boost::function< void() > f, int delta_sec ); + + + +bool removeCallOut( CallOutId& id ); + + +} // eo namespace SimpleIo +} // eo namespace I2n + +#endif diff --git a/asyncio/simpleio.cpp b/asyncio/simpleio.cpp new file mode 100644 index 0000000..9bf0cad --- /dev/null +++ b/asyncio/simpleio.cpp @@ -0,0 +1,1695 @@ +/** @file + * + * + * @copyright Copyright © 2007-2008 by Intra2net AG + * @license commercial + * @contact info@intra2net.com + */ + +//#define NOISEDEBUG + +#include "simpleio.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include + +#ifdef NOISEDEBUG +#include +#include +#define DOUT(msg) std::cout << msg << std::endl +#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl +//#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl +#define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl +#else +#define DOUT(msg) do {} while (0) +#define FODOUT(obj,msg) do {} while (0) +#define ODOUT(msg) do {} while (0) +#endif + +namespace +{ + + +/* + * configuration: + */ + + +const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min + + +/** + * contains internal helper structs and functions for io handling. + */ +namespace internal_io +{ + +/** + * extends struct pollfd with some convenience functions + */ +struct PollFd : public ::pollfd +{ + PollFd() + { + fd= 0; + events= revents= 0; + } // eo PollFd + + + /** + * initializes the struct with a given file descriptor and clears the event mask(s). + * @param _fd + */ + PollFd(int _fd) + { + fd= _fd; + events= revents= 0; + } // eo PollFd + + + /** + * set that we want to be notified about new incoming data + */ + void setPOLLIN() { events |= POLLIN; } + + /** + * set that we want to be notified if we can send (more) data. + */ + void setPOLLOUT() { events |= POLLOUT; } + +}; // eo struct PollFd + + +typedef std::vector PollVector; +typedef std::map FdPollMap; +typedef std::map FdIOMap; + + +/** + * struct for interfacing our local structures with poll() + */ +struct PollDataCluster +{ + PollVector m_poll_vector; + FdPollMap m_fd_poll_map; + FdIOMap m_read_fd_io_map; + FdIOMap m_write_fd_io_map; + + void add_read_fd( int fd, I2n::SimpleIo::IOImplementation* io); + void add_write_fd( int fd, I2n::SimpleIo::IOImplementation* io); + + pollfd* get_pollfd_ptr(); + unsigned int get_num_pollfds() const; + +}; // eo struct PollDataCluster + + +template +class PtrList : public std::list +{ + typedef std::list inherited; + public: + bool dirty; + + static int Instances; + + public: + + PtrList() + : dirty(false) + { + ++Instances; + } // eo PtrList + + + ~PtrList() + { + ODOUT(""); + --Instances; + } + + + /** + * add a new item pointer to the list. + * + * @param item item pointer which should be added + */ + void add_item(T* item) + { + typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item); + if (it != inherited::end()) + { + // nothing to do since item is already in the list + return; + } + push_back(item); + } // eo add + + + /** + * remove an item pointer from the list by setting NULL at the current position of the item and mark + * the list as dirty. + * + * @param item the io object which should be removed from the list. + */ + void remove_item(T* item) + { + typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item); + if (it == inherited::end()) + { + // nothing to do: + return; + } + *it = NULL; // forget the pointer + dirty= true; // ..and mark the list as dirty (i.e. has NULL elements) + } // eo remove + + + /** + * cleans the list of objects by removing the NULL elements (if any). + * + * @note this function should only be called when it is ensured that no other functions using iterators of this list. + */ + void clean_list() + { + if (!dirty) + { + // nothing to do + return; + } + // remove the NULL elements now: + erase( + std::remove( inherited::begin(), inherited::end(), (T*)NULL), + inherited::end() ); + dirty= false; + } // eo clean_list + + +}; // eo class PtrList + + +typedef PtrList IOList; +typedef PtrList TimerList; + +template<> int IOList::Instances= 0; +template<> int TimerList::Instances= 0; + + +/** + * the (internal) global list of io objects (object pointers) + */ +IOList& g_io_list() +{ + static IOList _the_io_list; + return _the_io_list; +}; + + +/** + * the (internal) global list of timer objects (object pointers) + */ +TimerList& g_timer_list() +{ + static TimerList _the_timer_list; + return _the_timer_list; +} + +/* + * implementation of PollDataCluster + */ + + +/** + * add a new file descriptor to the read list. + * + * @param fd the file descriptor. + * @param io the io object which uses the fd for reading. + */ +void PollDataCluster::add_read_fd( int fd, I2n::SimpleIo::IOImplementation* io) +{ + FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd); + if (itPollMap != m_fd_poll_map.end()) + { + m_poll_vector[ itPollMap->second ].setPOLLIN(); + } + else + { + PollFd item(fd); + item.setPOLLIN(); + m_fd_poll_map[fd] = m_poll_vector.size(); + m_poll_vector.push_back( item ); + } + m_read_fd_io_map[fd]= io; +} // eo PollDataCluster::add_read_fd + + +/** + * add a new file descriptor to the write list. + * + * @param fd the file descriptor. + * @param io the io object which uses the fd for writing. + */ +void PollDataCluster::add_write_fd( int fd, I2n::SimpleIo::IOImplementation* io) +{ + FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd); + if (itPollMap != m_fd_poll_map.end()) + { + m_poll_vector[ itPollMap->second ].setPOLLOUT(); + } + else + { + PollFd item(fd); + item.setPOLLOUT(); + m_fd_poll_map[fd] = m_poll_vector.size(); + m_poll_vector.push_back( item ); + } + m_write_fd_io_map[fd]= io; +} // eo PollDataCluster::add_write_fd + + +/** + * returns a pointer to a pollfd array; suitable for passing to poll() + * + * @return pointer to pollfd array + */ +pollfd* PollDataCluster::get_pollfd_ptr() +{ + return m_poll_vector.empty() ? NULL : &m_poll_vector.front(); +} // eo get_pollfd_ptr + + +/** + * returns the number of entries in the pollfd array; suitable for passing to poll() + * + * @return the number of entries in the pollfd array + */ +unsigned int PollDataCluster::get_num_pollfds() const +{ + return m_poll_vector.size(); +} // eo get_num_pollfds + + + +} // eo namespace internal_io + + +/* + * some internal tool functions and structures + */ + +struct FilterMatch { + I2n::SimpleIo::FilterBasePtr m_filter; + + FilterMatch(I2n::SimpleIo::FilterBasePtr filter) + : m_filter(filter) + {} + + bool operator () (const I2n::SimpleIo::FilterBasePtr& item) + { + return item && item == m_filter; + } + +}; // eo struct FilterMatch + + +void get_current_real_time(long& current_sec, long& current_msec) +{ + struct timeval tv; + gettimeofday(&tv,NULL); + current_sec= tv.tv_sec; + current_msec= (tv.tv_usec / 1000); + if (current_msec >= 1000) + { + current_sec += (current_msec / 1000); + current_msec%= 1000; + } +} // eo get_current_real_time + + +void get_current_monotonic_time(long& current_sec, long& current_msec) +{ + long nsec; + if (monotonic_clock_gettime(current_sec,nsec)) + { + current_msec= nsec / 1000000L; + } + else + { + //fallback... + get_current_real_time(current_sec,current_msec); + } +} // eo get_current_monotonic_time + + + +} // eo anonymous namespace + + + + +namespace I2n +{ +namespace SimpleIo +{ + + +/** + * @brief gets the current time as MilliTime structure. + * @param[out] mt reference to the MilliTime strcucture which is filled with the result. + */ +void get_current_real_time(MilliTime& mt) +{ + long sec, msec; + ::get_current_real_time(sec,msec); + mt.set(sec,msec); +} // eo get_current_real_time + + +/** + * @brief gets the current time as MilliTime structure. + * @param[out] mt reference to the MilliTime strcucture which is filled with the result. + */ +void get_current_monotonic_time(MilliTime& mt) +{ + long sec, msec; + ::get_current_monotonic_time(sec,msec); + mt.set(sec,msec); +} // eo get_current_monotonic_time + + +/* + * implementation of MilliTime + */ + +MilliTime::MilliTime(long sec, long msec) +: mt_sec(sec), mt_msec(msec) +{ + normalize(); +} // eo MilliTime::MilliTime + + +void MilliTime::set(long sec, long msec) +{ + mt_sec= sec; + mt_msec= msec; + normalize(); +} // eo MilliTime::set + + +/** + * normalizes the values, so that mt_msec has a value between 0 and 999. + */ +void MilliTime::normalize() +{ + if (mt_msec < 0) + { + mt_sec += (mt_msec / 1000) - 1; + mt_msec = (mt_msec % 1000) + 1000; + } + else if (mt_msec>=1000) + { + mt_sec+= (mt_msec / 1000); + mt_msec %= 1000; + } +} // eo MilliTime::normalize + + +/** + * determine if the represented point in time is before another one. + * @param other the other point in time. + * @return true if the own point in time is before the other one. + */ +bool MilliTime::operator < (MilliTime& other) +{ + normalize(); + other.normalize(); + return + (mt_sec < other.mt_sec) + || (( mt_sec == other.mt_sec) && (mt_msec < other.mt_msec)); +} // eo MilliTime::operator < + + +/** + * determine if two point in times are equal. + * @param other the point in time to compare with. + * @return true if the represented times are equal. + */ +bool MilliTime::operator == (MilliTime& other) +{ + normalize(); + other.normalize(); + return (( mt_sec == other.mt_sec) && (mt_msec == other.mt_msec)); +} // eo MilliTime::operator < + +/** + * @brief subtracts a time delta from the object. + * @param lhs the time delta to subtract. + * @return reference to the object itself. + */ +MilliTime& MilliTime::operator -= (const MilliTime& lhs) +{ + mt_sec -= lhs.mt_sec; + mt_msec -= lhs.mt_msec; +} // eo operator -= + + +/** + * @brief adds a time delta from the object. + * @param lhs the time delta to add. + * @return reference to the object itself. + */ +MilliTime& MilliTime::operator += (const MilliTime& lhs) +{ + mt_sec += lhs.mt_sec; + mt_msec += lhs.mt_msec; +} // eo operator += + + +/* + * implementation of TimerBase + */ + +/** + * constructor. Adds the object to the internal timer list. + */ +TimerBase::TimerBase() +: m_active(false) +, m_marked(false) +{ + internal_io::g_timer_list().add_item(this); +} // eo TimerBase::TimerBase + + +/** + * destructor. Removes the object from the internal timer list. + */ +TimerBase::~TimerBase() +{ + ODOUT("enter"); + if (internal_io::TimerList::Instances) + { + ODOUT("remove from list"); + internal_io::g_timer_list().remove_item(this); + } +} // eo TimerBase::~TimerBase + + +/** + * @brief returns the point in time when the time is executed in real time. + * @return the point in time when the timer is to be executed. + */ +MilliTime TimerBase::getRealWhenTime() const +{ + MilliTime mono_time; + MilliTime real_time; + get_current_monotonic_time(mono_time); + get_current_real_time(real_time); + MilliTime result= m_when - mono_time + real_time; + return result; +} // eo TimerBase::getRealWhenTime() const + + +/** + * sets the time when the event should be executed. + * @param sec the seconds part of the point in time. + * @param msec the milliseconds part of the point in time. + */ +void TimerBase::setWhenTime(long sec, long msec) +{ + m_when.set(sec,msec); + m_marked= false; +} // eo TimerBase::setWhenTime + + +/** + * sets the time when the event should be executed. + * @param mt the point in time. + */ +void TimerBase::setWhenTime(const MilliTime& mt) +{ + m_when= mt; + m_marked= false; +} // eo TimerBase::setWhenTime + + +/** + * sets the time delta measured from current time when the event should be executed. + * @param sec the seconds of the time delta + * @param msec the milli seconds of the time delta + */ +void TimerBase::setDeltaWhenTime(long sec, long msec) +{ + setDeltaWhenTime( MilliTime(sec,msec) ); +} // eo TimerBase::setWhenTime + + + +/** + * sets the time delta measured from current time when the event should be executed. + * @param mt the time delta + */ +void TimerBase::setDeltaWhenTime(const MilliTime& mt) +{ + get_current_monotonic_time(m_when); + m_when+= mt; + m_marked= false; +} // eo TimerBase::setWhenTime + + +/** + * set the active state of the timer event. + * @param active determines if the object should be active (default: yes). + */ +void TimerBase::activate(bool active) +{ + m_active = active; + if (!active) + { + // clear the mark if we are not active. + m_marked= false; + } +} // eo TimerBase::activate + + +/** @fn void TimerBase::deactivate() + * deactivates the event by clearing the active state. + */ + + +/** + * called when the timer event occured. + */ +void TimerBase::execute() +{ +} // eo TimerBase::execute + + +/* + * implementation of FilterBase class + */ + + +FilterBase::FilterBase() +: m_io(NULL) +{ +} // eo FilterBase::FilterBase() + + +/** + * injects incoming data. + * @param data the new data + */ +void FilterBase::injectIncomingData(const std::string& data) +{ + if (m_io) + { + FilterBasePtr ptr= get_ptr_as< FilterBase >(); + if (ptr) + { + m_io->injectIncomingData(ptr,data); + } + } +} // FilterBase::injectIncomingData(const std::string&) + + +/** + * injects outgoing data. + * @param data the new data + */ +void FilterBase::injectOutgoingData(const std::string& data) +{ + if (m_io) + { + FilterBasePtr ptr= get_ptr_as< FilterBase >(); + if (ptr) + { + m_io->injectOutgoingData(ptr,data); + } + } +} // eo FilterBase::injectOutgoingData(const std::string&) + + + +/** + * called when EOF detected on incoming channel (or incoming channel closed) + */ +void FilterBase::endOfIncomingData() +{ +} // eo FilterBase::endOfIncomingData() + +/** + * called when the filter should reset. + * This is used when a new channel is opened or when the filter is taken out of a filter chain. + */ +void FilterBase::reset() +{ +} // eo FilterBase::reset() + + +/* + * implementation of IOImplementation class + */ + + +/** + * constructor for the base io class. + * + * Also adds the object to internal list of io objects (which is used by the backend). + * + * @param read_fd the file descriptor which should be used for reading (default -1 for no value) + * @param write_fd the file descriptor which should be used for writing (default -1 for no value) + */ +IOImplementation::IOImplementation(int read_fd, int write_fd) +: m_read_fd(-1) +, m_write_fd(-1) +, m_eof(false) +, m_not_writable(false) +, m_input_buffer() +, m_output_buffer() +, m_marked_for_reading(false) +, m_marked_for_writing(false) +{ + internal_io::g_io_list().add_item(this); + if (read_fd >= 0) + { + setReadFd( read_fd ); + } + if (write_fd >= 0) + { + setWriteFd( write_fd ); + } +} // eo IOImplementation::IOImplementation + + +/** + * destructor of the base io class. + * + * Removes the object from the interal list of io objects. + */ +IOImplementation::~IOImplementation() +{ + close(); + if (internal_io::IOList::Instances) + { + internal_io::g_io_list().remove_item(this); + } + // now clear the filters: + while (! m_filter_chain.empty() ) + { + FilterChain::iterator it = m_filter_chain.begin(); + (*it)->reset(); + (*it)->m_io= NULL; + //TODO: signal the filter that it is removed ?! + m_filter_chain.erase(it); + } +} // eo IOImplementation::~IOImplementation + + +/** + * adds another filter to the filter chain. + * @param filter pointer to the new filter. + */ +void IOImplementation::addFilter +( + FilterBasePtr filter +) +{ + if (!filter) + { + return; // nothing to do + } + if (filter->m_io) + { + filter->m_io->removeFilter(filter); + } + m_filter_chain.push_back( filter ); +} // eo IOImplementation::addFilter + + +/** + * removes a filter from the filter chain. + * @param filter the pointer to the filter which is removed. + * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for + * deleting the filter if it was dynamically allocated. + */ +void IOImplementation::removeFilter +( + FilterBasePtr filter +) +{ + FilterChain::iterator it = + std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) ); + if (it != m_filter_chain.end()) + { + filter->reset(); + filter->m_io= NULL; + //TODO: signal the filter that it is removed ?! + m_filter_chain.erase(it); + } +} // eo IOImplementation::removeFilter + + +/** + * closes the file descriptors (/ the connection). + * + * @param direction the direction which should be closed (default: @a Direction::both for all). + */ +void IOImplementation::close(Direction direction) +{ + bool had_read_fd= (m_read_fd >= 0); + m_errno= 0; + if (direction == Direction::unspecified) direction= Direction::both; + if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0) + { // special case: half closed (socket) connections... + // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part + // (read_fd or write_fd) this class works as desired. + switch(direction) + { + case Direction::in: + { + int res= ::shutdown(m_read_fd, SHUT_RD); + if (res<0) + { + m_errno= errno; + } + m_read_fd= -1; + if (!m_eof) + { + for(FilterChain::iterator it= m_filter_chain.begin(); + it != m_filter_chain.end(); + ++it) + { + (*it)->endOfIncomingData(); + } + } + } + return; + + case Direction::out: + { + int res= ::shutdown(m_write_fd, SHUT_WR); + if (res<0) + { + m_errno= errno; + } + m_write_fd= -1; + m_output_buffer.clear(); + } + return; + } + } + if (m_write_fd >= 0 && (direction & Direction::out) ) + { + int res1 = ::close(m_write_fd); + if (m_write_fd == m_read_fd) + { + m_read_fd= -1; + } + m_write_fd= -1; + m_output_buffer.clear(); + if (res1<0) m_errno= errno; + } + if (m_read_fd >=0 && (direction & Direction::in) ) + { + int res1 = ::close(m_read_fd); + m_read_fd= -1; + if (res1<0) m_errno= errno; + } + if (had_read_fd && !m_eof && (m_read_fd<0)) + { + for(FilterChain::iterator it= m_filter_chain.begin(); + it != m_filter_chain.end(); + ++it) + { + (*it)->endOfIncomingData(); + } + } +} // eo IOImplementation::close + + +/** + * determines if the io class wants to read data. + * Default implementation checks only for a valid file descriptor value. + * + * @return @a true if the objects wants to read data + */ +bool IOImplementation::wantRead() +{ + return (m_read_fd >= 0) && ! m_eof; +} // eo IOImplementation::wantRead + + +/** + * determines if the io class wants to write data. + * Default implementation checks for a valid file descriptor value and if the object + * cannot write data immediately. + * + * @return @a true if the objects wants to write data + */ +bool IOImplementation::wantWrite() +{ + return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable; +} // eo IOImplementation::wantWrite + + +/** + * delivers if opened. + * The default returns @a true if at least one file descriptor (read or write) is valid. + * @return @a true if opened. + */ +bool IOImplementation::opened() const +{ + return (m_read_fd>=0) || (m_write_fd>=0); +} // eo IOImplementation::opened() const + + +/** + * returns if the read side detected an end of file (EOF). + * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid). + */ +bool IOImplementation::eof() const +{ + return (m_read_fd < 0) || m_eof; +} // eo IOImplementatio::eof() const + + +/** + * @brief returns of the write side didn't detect that it cannot write. + * @return @a true if we can write. + */ +bool IOImplementation::writable() const +{ + return (m_write_fd >=0 ) and not m_not_writable; +} // eo IOImplementation::writable() const + + +/** + * returns if the output buffer is empty. + * @return + */ +bool IOImplementation::empty() const +{ + return m_output_buffer.empty(); +} // eo IOImplementation::empty + +/** + * puts data into the output buffer and sends it immediately if possible, + * + * The data is passed through the filter chain before it's stored in the output buffer + * (i.e. the output buffer contains data as it should be send directly to the descriptor). + * @param _data the data which should be send. + */ +void IOImplementation::lowSend(const std::string& _data) +{ + std::string data(_data); + + for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin(); + it_filter!= m_filter_chain.rend(); + ++it_filter) + { + data= (*it_filter)->filterOutgoingData(data); + } + m_output_buffer+= data; + + // if we can send immediately, do it: + if (! m_output_buffer.empty() && m_marked_for_writing) + { + doWrite(); + } +} // eo IOImplementation::lowSend + + +/** + * called by the backend when there is data to read for this object. + * + * Reads the data from the connection (read file descriptor) and passes the data through the filter chain. + * The final data is appended to the input buffer and the signal @a m_signal_read() is called. + * + * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called. + * + * @note overload this method only when You know what You are doing! + * (overloading is necessary when handling server sockets.) + */ +void IOImplementation::doRead() +{ + // static read buffer; should be ok as long as we don't use threads + static char buffer[8*1024]; // 8 KiB + + m_errno = 0; + if (m_read_fd<0 || !m_marked_for_reading) + { + ODOUT("exit0; read_fd="< " << count); + + // interpret what we got: + if (count < 0) // error + { + m_errno = errno; + int fd= m_read_fd; + + switch(m_errno) + { + case EINVAL: + case EBADF: + case ECONNRESET: + case ENETRESET: + if (fd == m_read_fd) + { + close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in ); + } + break; + } + } + else if (count==0) // EOF + { + // remember the read fd: + int fd = m_read_fd; + // remember the EOF: + m_eof= true; + // signal EOF + m_signal_eof(); + // if the fd is still the same: close it. + if (fd == m_read_fd) + { + close( Direction::in ); + } + } + else // we have valid data + { + std::string data(buffer,count); + ODOUT(" got \"" << data << "\""); + for(FilterChain::iterator it_filter= m_filter_chain.begin(); + it_filter != m_filter_chain.end(); + ++it_filter) + { + data= (*it_filter)->filterIncomingData(data); + } + m_input_buffer+= data; + m_signal_read(); + } +} // eo IOImplementation::doRead + + +/** + * interface for filter classes to inject data into the filter chain (emulating new incoming data). + * @param from_filter the filter which injects the new data. + * @param _data the new data. + */ +void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data) +{ + FilterChain::iterator it_filter = + std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) ); + if (it_filter == m_filter_chain.end()) + { + // dont accept data inject from a unknown filter + return; + } + // well: pass the data through the remaining filters: + // NOTE: processing is (nearly) the same as in IOImplementation::doRead() + std::string data(_data); + for(++it_filter; + it_filter != m_filter_chain.end(); + ++it_filter) + { + data= (*it_filter)->filterIncomingData(data); + } + m_input_buffer+= data; + m_signal_read(); +} // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&) + + +/** + * interface for filter classes to inject data into the filter chain (emulating new outgoing data). + * @param from_filter the filter which injects the new data. + * @param _data the new data. + */ +void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data) +{ + FilterChain::reverse_iterator it_filter = + std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) ); + if (it_filter == m_filter_chain.rend()) + { + // dont accept data inject from a unknown filter + return; + } + // well: pass the data through the remaining filters: + // NOTE: processing is (nearly) the same as in IOImplementation::lowSend() + std::string data(_data); + for(++it_filter; + it_filter!= m_filter_chain.rend(); + ++it_filter) + { + data= (*it_filter)->filterOutgoingData(data); + } + m_output_buffer+= data; + + // if we can send immediately, do it: + if (! m_output_buffer.empty() && m_marked_for_writing) + { + doWrite(); + } +} // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&) + + +/** + * set the read file descriptor. + * Although a derived class can also set the read fd directly; this method should be used + * for this task since it updates some flags on the fd for async operation. + * @param fd the new read file descriptor. + */ +void IOImplementation::setReadFd(int fd) +{ + // test if we already have a valid descriptor (and may have to close it): + if (m_read_fd >=0 ) + { + if (m_read_fd == fd) + { + // fd was already right; consider it to be ok. + return; + } + close(Direction::in); + } + // reset our errno: + m_errno= 0; + // if the new descriptor looks valid, set some flags: + if (fd >= 0) + { + long flags= ::fcntl(fd, F_GETFL); + if (flags != -1) + { + // set the flags for non blocking, async operation + flags |= O_NONBLOCK|O_ASYNC; + ::fcntl(fd,F_SETFL, flags); + } + else if ( errno == EBADF ) + { + // well, we seemed to be fed with an invalid descriptor...: + m_errno = errno; + fd= -1; + } + } + if (fd >= 0) // if still valid: + { + // set the close-on-exec flag + ::fcntl(fd,F_SETFD, FD_CLOEXEC); + } + m_read_fd= fd; + m_marked_for_reading= false; + m_eof= false; +} // eo IOImplementation::setReadFd(int) + + + +/** + * set the write file descriptor. + * Although a derived class can also set the write fd directly; this method should be used + * for this task since it updates some flags on the fd for async operation. + * @param fd the new write file descriptor. + */ +void IOImplementation::setWriteFd(int fd) +{ + if (m_write_fd >=0 ) + { + if (m_write_fd == fd) + { + // fd was already right; consider it to be ok. + return; + } + close(Direction::out); + } + // reset our errno: + m_errno= 0; + // if the new descriptor looks valid, set some flags: + if (fd >= 0) + { + long flags= ::fcntl(fd, F_GETFL); + if (flags != -1) + { + // set the flags for non blocking, async operation + flags |= O_NONBLOCK|O_ASYNC; + ::fcntl(fd,F_SETFL, flags); + } + else if (errno == EBADF) + { + // well, we seemed to be fed with an invalid descriptor...: + m_errno = errno; + fd= -1; + } + } + if (fd >= 0) // if still valid: + { + // set the close-on-exec flag + ::fcntl(fd,F_SETFD, FD_CLOEXEC); + } + m_write_fd = fd; + m_marked_for_writing= false; + m_not_writable= false; +} // eo IOImplementation::setWriteFd(int) + + + +/** + * called by the backend when this object can write data. + * + * If some data was sended, the signal @a m_signal_write is called. + * + * @internal tries to write all buffered data to output; if this succeeds, + * the connection is assumed to be still able to accept more data. + * (i.e. the internal write mark is kept!) + * + * @note overload this method only when You know what You are doing! +*/ +void IOImplementation::doWrite() +{ + m_errno = 0; + if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty()) + { + return; + } + + ODOUT("doWrite, d=\"" << m_output_buffer << "\""); + + //reset mark: + m_marked_for_writing= false; + + // now write the data + ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size()); + + ODOUT("::write -> " << count); + + if (count < 0) // error + { + m_errno= errno; + int fd= m_write_fd; + + switch(m_errno) + { + case EPIPE: + m_not_writable= true; + // emit a signal + m_signal_not_writable(); + // fall through + case EINVAL: + case EBADF: + case ECONNRESET: + case ENETRESET: + if (fd == m_write_fd) + { + close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out ); + } + break; + } + } + else + { + m_output_buffer.erase(0, count); + if (m_output_buffer.empty()) + { + // special case: if we were able to send all the data, we keep the write mark: + m_marked_for_writing= true; + } + } + if (count > 0) + { + m_signal_write(); + } +} // eo IOImplementation::doWrite + + +/* + * implementation of SimpleIO + */ + + +SimpleIO::SimpleIO(int read_fd, int write_fd) +: inherited(read_fd, write_fd) +{ + m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this)); +} // eo SimpleIO::SimpleIO() + + +SimpleIO::~SimpleIO() +{ +} // eo SimpleIO::~SimpleIO() + + +/** + * sends a string. + * @param data the string. + */ +void SimpleIO::sendString(const std::string& data) +{ + lowSend(data); +} // eo SimpleIO::sendString(const std::string&) + + +/** + * emits the signal signalReceived with the received data. + * This slot is connected to IOImplementation::m_signal_read. + */ +void SimpleIO::slotReceived() +{ + std::string data; + data.swap(m_input_buffer); + signal_received_string(data); +} // eo SimpleIO::slotReceived() + + + +/* + * implementation of SimpleIO2 + */ + + +SimpleIO2::SimpleIO2(int read_fd, int write_fd) +: inherited(read_fd, write_fd) +{ + m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this)); +} // eo SimpleIO2::SimpleIO2() + + +SimpleIO2::~SimpleIO2() +{ +} // eo SimpleIO2::~SimpleIO2() + + +/** + * sends a string. + * @param data the string. + */ +void SimpleIO2::sendString(const std::string& data) +{ + lowSend(data); +} // eo SimpleIO2::sendString(const std::string&) + + +/** + * emits the signal signalReceived with the received data. + * This slot is connected to IOImplementation::m_signal_read. + */ +void SimpleIO2::slotReceived() +{ + std::string data; + data.swap(m_input_buffer); + signal_received_string(data); +} // eo SimpleIO2::slotReceived() + + + +/* + * implementation of class Backend (singleton) + */ + +Backend* Backend::g_backend= NULL; + +int Backend::m_count_active_steps=0; + + +Backend::Backend() +: m_count_active_loops(0) +, m_count_stop_requests(0) +{ + SystemTools::ignore_signal( SystemTools::Signal::PIPE ); +} // eo Backend::Backend + + +Backend::~Backend() +{ + SystemTools::restore_signal_handler( SystemTools::Signal::PIPE ); +} // eo Backend::~Backend() + +/** + * delivers pointer to the current backend, instantiating a new backend if there was no current one. + * + * This should be the only way to access the backend which should be a singleton. + * + * @return the pointer to the current backend. + */ +Backend* Backend::getBackend() +{ + if (!g_backend) + { + g_backend = new Backend(); + } + return g_backend; +} // eo Backend::getBackend + + + + +/** + * performs one backend cycle. + * + * Collects all file descriptors from the active io objects which should be selected for reading and/or writing. + * Also determines the timer events which become due and adjusts the timeout. + * Constructs the necessary structures and calls poll(). + * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events) + * + * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured. + * @return @a true if there was at least one active object; otherwise @a false + * + * @note this method is a little beast. + * + * @internal + * The cycle is divided into four steps: collecting; poll; mark and execute. + * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage + * are calling @a Backup::doOneStep or open their own local backend loop. + * + * @todo handle some more error cases. + * @todo provide a plugin interface for external handler. + * (currently inclusion of external handler is possible by (ab)using timer classes) + */ +bool Backend::doOneStep(int timeout) +{ + ODOUT( "timeout=" << timeout ); + internal_io::PollDataCluster poll_data; + bool had_active_object = false; + + ++m_count_active_steps; + + try { + + FdSetType local_read_fds; + FdSetType local_write_fds; + + // step 1 ; collect + + { // step 1.1: collect fds for read/write operations + for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin(); + itIOList != internal_io::g_io_list().end(); + ++itIOList) + { + if (! *itIOList) continue; // skip NULL entries + int read_fd = (*itIOList)->m_read_fd; + int write_fd = (*itIOList)->m_write_fd; + bool want_read = (read_fd >= 0) and (*itIOList)->wantRead(); + bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite(); + if (read_fd >= 0 ) + { + local_read_fds.insert( read_fd ); + } + if (write_fd >= 0) + { + local_write_fds.insert( write_fd ); + } + if (!want_read && !want_write) continue; + if (want_read) + { + FODOUT( (*itIOList), "wants to read (fd=" << read_fd << ")"); + poll_data.add_read_fd(read_fd, *itIOList); + } + if (want_write) + { + FODOUT( (*itIOList), "wants to write (fd=" << write_fd << ")"); + poll_data.add_write_fd(write_fd, *itIOList); + } + had_active_object= true; + } + } + + { // step 1.2: collect timer events + MilliTime current_time; + MilliTime min_event_time; + + get_current_monotonic_time(current_time); + bool min_event_time_set; + + if (timeout >= 0) + { + min_event_time = current_time + MilliTime(0,timeout); + min_event_time_set= true; + } + else + { + min_event_time = current_time + MilliTime(86400,0); + min_event_time_set= false; + } + // TODO + + for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin(); + it_timer != internal_io::g_timer_list().end() + && (!had_active_object || !min_event_time_set || current_time < min_event_time); + ++ it_timer) + { + if (! *it_timer) continue; // skip NULL entries + if (! (*it_timer)->m_active) continue; // skip if not enabled + if ( !min_event_time_set || (*it_timer)->m_when < min_event_time) + { + min_event_time = (*it_timer)->m_when; + min_event_time_set= true; + } + had_active_object= true; + } + + if (min_event_time_set) + { // we have at a minimal event time, so (re)compute the timeout value: + MilliTime delta= (min_event_time - current_time); + long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h + if (delta_ms <= 0L) + { + timeout= 0L; + } + else + { + timeout= delta_ms + (delta_ms<5 ? 1 : 3); + } + } + } + + // step 2 : poll + ODOUT(" poll timeout is " << timeout); + { + MilliTime current_time; + get_current_monotonic_time(current_time); + ODOUT(" current time is sec="< " << poll_result); + { + MilliTime current_time; + get_current_monotonic_time(current_time); + ODOUT(" current time is sec="<m_marked_for_reading= true; + } + } + if ( 0!= (itPollItem->revents & POLLOUT)) + { + IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ]; + if (io && io->m_write_fd==itPollItem->fd) + { + io->m_marked_for_writing= true; + } + } + if ( 0!= (itPollItem->revents & POLLERR)) + { + IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ]; + if (0!= (itPollItem->events & POLLOUT)) + { + if (io && io->m_write_fd==itPollItem->fd) + { + io->m_marked_for_writing= false; + //io->close( Direction::out ); + } + } + } + // TODO error handling (POLLERR, POLLHUP, POLLNVAL) + } + } + + //Step 3.2: mark timer objects + { + MilliTime current_time; + + get_current_monotonic_time(current_time); + ODOUT(" current time is sec="<m_active) continue; // skip if not enabled + if ( (*it_timer)->m_when <= current_time) + { + ODOUT(" ==> MARK"); + (*it_timer)->m_marked = true; + } + } + } + + + // step 4 : execute + + // step 4.1: execute io + ODOUT("execute stage"); + for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin(); + it_io != internal_io::g_io_list().end(); + ++ it_io) + { + ODOUT(" check obj " << *it_io); + if (NULL == *it_io) continue; + if ((*it_io)->m_marked_for_writing) + { + FODOUT((*it_io),"exec doWrite"); + (*it_io)->doWrite(); + if ((*it_io) == NULL) continue; // skip remaining if we lost the object + if ((*it_io)->m_errno) + { + continue; + } + } + if ((*it_io)->m_marked_for_reading) + { + FODOUT((*it_io),"exec doRead"); + (*it_io)->doRead(); + if ((*it_io) == NULL) continue; // skip remaining if we lost the object + if ((*it_io)->m_errno) + { + continue; + } + } + } + + // step 4.2: execute timer events + { + for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin(); + it_timer != internal_io::g_timer_list().end(); + ++ it_timer) + { + if (! *it_timer) continue; // skip NULL entries + if (! (*it_timer)->m_active) continue; // skip if not enabled + if (! (*it_timer)->m_marked) continue; // skip if not marked + + // reset the mark and deactivate object now since the execute() method might activate it again + (*it_timer)->m_marked= false; + (*it_timer)->m_active= false; + + // now execute the event: + (*it_timer)->execute(); + } + } + + } // eo try + catch(...) + { + // clean up our counter + --m_count_active_steps; + // and forward the exception + throw; + } + + if ( 0 == --m_count_active_steps) + { + internal_io::g_io_list().clean_list(); + internal_io::g_timer_list().clean_list(); + } + + return had_active_object; +} // eo Backend::doOneStep + + +/** + * enters a backend loop. + * + * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more + * active objects (io objects or timer objects). + */ +void Backend::run() +{ + ++m_count_active_loops; + do + { + try + { + if (!doOneStep(c_max_poll_wait)) + { + // stop if there are no more active objects. + stop(); + } + } + catch(...) + { + // clean up our counter + --m_count_active_loops; + // and forward the exception + throw; + } + } + while (0 == m_count_stop_requests); + --m_count_active_loops; + --m_count_stop_requests; +} // eo Backend::run + + +/** + * @brief stops the latest loop currently run by Backend::run(). + * @see Backend::run() + */ +void Backend::stop() +{ + if (m_count_active_loops) + { + ++m_count_stop_requests; + } +} // eo Backend::stop() + + + +} // eo namespace SimpleIo +} // eo namespace I2n diff --git a/asyncio/simpleio.hpp b/asyncio/simpleio.hpp new file mode 100644 index 0000000..d3727be --- /dev/null +++ b/asyncio/simpleio.hpp @@ -0,0 +1,463 @@ +/** + * @file + * @brief simple basic IO handling. + * + * @copyright © Copyright 2007-2008 by Intra2net AG + * @license commercial + * @contact info@intra2net.com + * + * Deals with POSIX file descriptors; provides an additional abstraction + * level above select() or poll() interface. + * Also provides basic functionality for dealing with timer events. + */ + +#ifndef __I2N_SIMPLEIO_HPP__ +#define __I2N_SIMPLEIO_HPP__ + +#include +#include +#include + +#include + +#include +#include + + +namespace I2n +{ +namespace SimpleIo +{ + + +/* + * forward declarations + */ +class Backend; +class IOImplementation; + +/* + * end of forward declarations + */ + + +/** + * direction for io operations. + */ +struct Direction +{ + enum _Direction + { + unspecified= 0, + in = 1, + out = 2, + both= 3 + } m_direction; + + Direction( _Direction direction = unspecified) : m_direction(direction) {} + + operator _Direction () const { return m_direction; } +}; // eo struct IODirection; + + +/** + * structure for storing (a point in time as) seconds and milliseconds. + */ +struct MilliTime +{ + long mt_sec; + long mt_msec; + + MilliTime(long sec=0, long msec=0); + + void set(long sec, long msec=0); + + inline long long get_milliseconds() const + { + return ((long long)mt_sec * 1000L + mt_msec); + } // eo get_milliseconds + + void normalize(); + + bool operator < (MilliTime& other); + bool operator == (MilliTime& other); + + MilliTime& operator -= (const MilliTime& lhs); + MilliTime& operator += (const MilliTime& lhs); + +}; // eo struct MilliTime + + +inline MilliTime operator + (const MilliTime& rhs, const MilliTime& lhs) +{ + MilliTime t(rhs); + return t+= lhs; +} // eo operator + (const MilliTime& rhs, const MilliTime lhs) + +inline MilliTime operator - (const MilliTime& rhs, const MilliTime& lhs) +{ + MilliTime t(rhs); + return t-= lhs; +} // eo operator - (const MilliTime& rhs, const MilliTime lhs) + + +inline bool operator <= (MilliTime& rhs, MilliTime& lhs) +{ + return (rhs PtrType; + public: + FilterBase(); + virtual ~FilterBase() {}; + + protected: + + virtual std::string filterIncomingData(const std::string& data)= 0; + virtual std::string filterOutgoingData(const std::string& data)= 0; + + virtual void endOfIncomingData(); + virtual void reset(); + + protected: + + void injectIncomingData(const std::string& data); + void injectOutgoingData(const std::string& data); + + private: + /// pointer to the io object which uses this filter: + IOImplementation *m_io; +}; // eo class FilterBase + +typedef FilterBase::PtrType FilterBasePtr; + + +/** + * identity filter; does nothing with the data (in other words: it's useless ;-) ). + */ +class FilterIdentity : public FilterBase +{ + protected: + virtual std::string filterIncomingData(const std::string& data) { return data; } + virtual std::string filterOutgoingData(const std::string& data) { return data; } +}; // eo class FilterIdentity + + +/** + * @brief null filter; deletes everything it receives. + * + * usefull when output from a subprocess (like stderr) should be ignored... + */ +class FilterNull : public FilterBase +{ + protected: + virtual std::string filterIncomingData(const std::string& data) { return std::string(); } + virtual std::string filterOutgoingData(const std::string& data) { return std::string(); } +}; // eo FilterNull + + +/** + * the base class of the IO classes. + * + * provides the functionality to read from a file desriptor and write to a file descriptor (which can be + * identical (like for socket io), but also can be different (like pipes from/to a process)). + * The data stream can be filtered through plugged filter objects which are building a filter chain. + * Incoming data is filtered forward through the chain; outgoing data is filtered backward through the chain. + * (So the first filter is the one which modifies the data closest to the connections). + * + * @note the functionality is provided in conjunction with the @a Backend class which handles parts of + * the low level IO. + * + * @note this is a base class; it provides most "interesting" functionality in the protected section only. + * This way, derived classes can decide which of that functionality they want to export in their own public + * interfaces and which to keep hidden. + */ +class IOImplementation +: public boost::signals::trackable +, virtual public SharedBase +{ + friend class Backend; + friend class FilterBase; + + public: + + typedef std::list< FilterBasePtr > FilterChain; + + typedef boost::signal< void() > SignalType; + + typedef boost::shared_ptr< IOImplementation > PtrType; + + public: + IOImplementation(int read_fd=-1, int write_fd=-1); + virtual ~IOImplementation(); + + virtual void close(Direction direction = Direction::both); + + virtual bool wantRead(); + virtual bool wantWrite(); + + virtual bool opened() const; + virtual bool eof() const; + virtual bool writable() const; + virtual bool empty() const; + + protected: + + void addFilter(FilterBasePtr filter); + void removeFilter(FilterBasePtr); + + + void lowSend(const std::string& data); + + std::string::size_type getOutputBufferSize() const { return m_output_buffer.size(); } + + void setWriteFd(int fd); + void setReadFd(int fd); + + inline int readFd() const + { + return m_read_fd; + } + + inline int writeFd() const + { + return m_write_fd; + } + + inline bool isMarkedForReading() const { return m_marked_for_reading; } + inline bool isMarkedForWriting() const { return m_marked_for_writing; } + + + protected: + virtual void doRead(); + virtual void doWrite(); + + void resetReadMark() { m_marked_for_reading= false; } + void resetWriteMark() { m_marked_for_writing= false; } + + void injectIncomingData(FilterBasePtr from_filter, const std::string& _data); + void injectOutgoingData(FilterBasePtr from_filter, const std::string& _data); + + protected: + /// last error number + int m_errno; + /// the input buffer (i.e. the data read from @a m_read_fd) + std::string m_input_buffer; + + /// the chain of filter which are applied to the data received are the data which should be send. + FilterChain m_filter_chain; + + /// signal which is fired when end of file is detected + SignalType m_signal_eof; + /// signal which is fired when write is no longer possible + SignalType m_signal_not_writable; + /// signal which is fired when new data was read + SignalType m_signal_read; + /// signal which is fired when data was written + SignalType m_signal_write; + + /// end of file (on @a m_read_fd) detected (used additionally when m_read_fd is valid) + bool m_eof; + + /// unable-to-write (on @a m_write_fd) detected (used additionally when m_write_fd is valid) + bool m_not_writable; + + private: + /// the file descriptor to read from (-1 if none is given) + int m_read_fd; + /// the file descriptor to write to (-1 if none is given) + int m_write_fd; + /// output buffer; contains the data which needs to be written. + std::string m_output_buffer; + + private: + /// @a true when data is available to be read + bool m_marked_for_reading; + /// @a true when data can be written + bool m_marked_for_writing; + +}; // eo class IOImplementation + + + +/** + * same as IOImplementation, but makes fd access functions public. + */ +class IOImplementation2 : public IOImplementation +{ + typedef IOImplementation inherited; + public: + IOImplementation2(int read_fd=-1, int write_fd=-1) : inherited(read_fd, write_fd) {} + + void setWriteFd(int fd) { inherited::setWriteFd(fd); } + void setReadFd(int fd) { inherited::setReadFd(fd); } + + int readFd() const { return inherited::readFd(); } + int writeFd() const { return inherited::writeFd(); } + +}; // eo class IOImplementation2 + + +/** + * provides sending data and receiving data via a signal. + * + * @note the received data is passed as parameter to the signal and no longer stored in the received buffer. + */ +class SimpleIO : public IOImplementation +{ + typedef IOImplementation inherited; + public: + SimpleIO(int read_fd=-1, int write_fd=-1); + virtual ~SimpleIO(); + + void sendString(const std::string& data); + + boost::signal signal_received_string; + + private: + + void slotReceived(); +}; // eo class SimpleIO + + +/** + * provides sending data and receiving data via a signal. + * + * @note the received data is passed as parameter to the signal and no longer stored in the received buffer. + */ +class SimpleIO2 : public IOImplementation2 +{ + typedef IOImplementation2 inherited; + public: + SimpleIO2(int read_fd=-1, int write_fd=-1); + virtual ~SimpleIO2(); + + void sendString(const std::string& data); + + boost::signal signal_received_string; + + private: + + void slotReceived(); +}; // eo class SimpleIO2 + + +/** + * provides the backend for io handling. + * + * This (singleton) object provides the management of io events. It collects all wishes for reading/writing + * from the io objects and information from the timer events. + * It poll()s for the events and distributes them to the objects, + * + * The class provides the possibility for executing one io cycle (/step) or to run a backend loop. + * + * @note the Backend class needs to be a friend of IOImplementation since it accesses private members + * of IOImplementation while performing the io cycles. + */ +class Backend +{ + public: + typedef std::set< int > FdSetType; + + public: + + bool doOneStep(int ms_timeout= -1); + void run(); + void stop(); + + protected: + Backend(); + Backend(const Backend& other); + ~Backend(); + + protected: + + /// the number of currently active backend loops + int m_count_active_loops; + /// the number of pending stop requests (where each one should exit one active backend loop) + int m_count_stop_requests; + + /// the number of currently active backend cycles(/ steps) + static int m_count_active_steps; + + public: + static Backend* getBackend(); + + protected: + /// pointer to the active backend (which is delivered by Backend::getBackend) + static Backend* g_backend; +}; // eo class Backend + + + +/* +** tool functions: +*/ + + +void get_current_real_time(MilliTime& mt); +void get_current_monotonic_time(MilliTime& mt); + + +} // eo namespace SimpleIo +} // eo namespace I2n + +#endif diff --git a/asyncio/simplepipe.cpp b/asyncio/simplepipe.cpp new file mode 100644 index 0000000..4eba4bc --- /dev/null +++ b/asyncio/simplepipe.cpp @@ -0,0 +1,98 @@ +/** @file + * + * (c) Copyright 2007 by Intra2net AG + * + * info@intra2net.com + */ + +#include "simplepipe.hpp" + +#include +#include + +#include +#include +#include +#include + + +namespace I2n +{ +namespace SimpleIo +{ + + +/* + * Implementation of SimplePipe + */ + +SimplePipe::SimplePipe() +{ + m_signal_read.connect(boost::bind(&SimplePipe::slotReceived,this)); +} // eo SimplePipe::SimplePipe() + + +SimplePipe::~SimplePipe() +{ +} // eo SimplePipe::~SimplePipe() + + +/** + * makes a pipe. + * This method connects itself with a newly created peer object with a bidirectional pipe. + * @return the peer pipe object. + */ +bool SimplePipe::makePipe(SimplePipe& peer) +{ + close(); // just in case... + + int fds[2]; + + int res= ::socketpair( AF_UNIX, SOCK_STREAM, 0, fds); + + if (res) + { + m_errno= errno; + return false; + } + else + { + m_errno= 0; + } + + peer.close(); // just in case + + setWriteFd(fds[0]); + setReadFd(fds[0]); + + peer.setWriteFd(fds[1]); + peer.setReadFd(fds[1]); + + return true; +} // eo SimplePipe.:makePipe() + + +/** + * sends a string through the pipe. + * @param data the data which should be sent to the other side. + */ +void SimplePipe::sendString(const std::string& data) +{ + lowSend(data); +} // eo SimplePipe::sendString(const std::string&) + + +/** + * emits the signal signalReceived with the received data. + * This slot is connected to IOImplementation::m_signal_read. + */ +void SimplePipe::slotReceived() +{ + std::string data; + data.swap(m_input_buffer); + signal_received_string(data); +} // eo SimplePipe::slotReceived() + + +} // eo namespace SimpleIo +} // eo namespace I2n diff --git a/asyncio/simplepipe.hpp b/asyncio/simplepipe.hpp new file mode 100644 index 0000000..80b7cc6 --- /dev/null +++ b/asyncio/simplepipe.hpp @@ -0,0 +1,44 @@ +/** @file + * + * (c) Copyright 2007 by Intra2net AG + * + * info@intra2net.com + */ + +#ifndef _SIMPLEIO_SIMPLEPIPE_HPP_ +#define _SIMPLEIO_SIMPLEPIPE_HPP_ + +#include "simpleio.hpp" + +namespace I2n +{ +namespace SimpleIo +{ + + +class SimplePipe : public IOImplementation +{ + public: + SimplePipe(); + virtual ~SimplePipe(); + + bool makePipe(SimplePipe& peer); + + void sendString(const std::string& data); + + boost::signal signal_received_string; + + protected: + + private: + + void slotReceived(); + +}; // eo SimplePipe + + +} // eo namespace SimpleIo +} // eo namespace I2n + + +#endif diff --git a/asyncio/simpleprocess.cpp b/asyncio/simpleprocess.cpp new file mode 100644 index 0000000..8e3754d --- /dev/null +++ b/asyncio/simpleprocess.cpp @@ -0,0 +1,817 @@ +/** @file + * + * + * (c) Copyright 2007-2008 by Intra2net AG + * + * info@intra2net.com + */ + +//#define NOISEDEBUG + +#include "simpleprocess.hpp" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + + +#ifdef NOISEDEBUG +#include +#include +#define DOUT(msg) std::cout << msg << std::endl +#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl +#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl +#else +#define DOUT(msg) do {} while (0) +#define FODOUT(obj,msg) do {} while (0) +#define ODOUT(msg) do {} while (0) +#endif + + +namespace +{ + +using namespace I2n::SimpleIo; + +/** + * local configuration values + */ +namespace config +{ + + /// the capacity of the child status list (/ vector) + const unsigned int pid_pool_capacity= 512; + +} // eo namespace config + + + +/// the previous handler for the child signal (SIGCHLD) +void (*oldChildHandler)(int) = NULL; + +/// method pointer for activating process manager +void (ProcessManager::*_activate_manager)(); + +PidStateList pending_pid_states; + + +/** + * signal handler for child signal (SIGCHLD) + * @param sig the signal number as provided by the OS + */ +void handleSigChild(int sig) +{ + int status; + pid_t pid; + while ( (pid = waitpid(-1,&status,WNOHANG)) > 0) + { + pending_pid_states.push_back( PidStatePair(pid,status) ); + } + if (_activate_manager) + { + // tricky way to access a protected method without being a (official) friend: + ( ProcessManager::getInstance()->*_activate_manager)(); + } + //TODO: ? + signal(sig,handleSigChild); +} // eo handleSigChild + + +namespace process +{ + +typedef std::pair PidProcPair; +typedef std::list< PidProcPair > PidProcList; + + +template< typename F, typename S > +struct CmpFirst +{ + F _f; + CmpFirst ( F f ) : _f(f) {} + bool operator () ( const std::pair& v ) const { return v.first == _f; } +}; // eo struct CmpFirst + + +std::list g_process_list; +PidProcList g_pid_list; + + +void addProcessInstance( ProcessImplementation* obj ) +{ + g_process_list.push_back(obj); +} // eo addProcessInstance(ProcessImplementation*) + + +void removeProcessInstance( ProcessImplementation* obj ) +{ + // remove obj from list + g_process_list.remove(obj); + // clear pointers in pid list + for(PidProcList::iterator it= g_pid_list.begin(); + it != g_pid_list.end(); + ++it) + { + if (it->second == obj) + { + it->second= NULL; + } + } +} // eo removeProcessInstance(ProcessImplementation*) + + +void addChildProcess( pid_t pid, ProcessImplementation* obj) +{ + g_pid_list.push_back ( PidProcPair(pid,obj) ); +} // eo addChildProcess(pid_t,ProcessImplementation*) + + +void removeChildProcess ( pid_t pid, ProcessImplementation* obj) +{ + PidProcList::iterator it= std::find( + g_pid_list.begin(), g_pid_list.end(), + PidProcPair(pid,obj)); + if (it != g_pid_list.end()) + { + g_pid_list.erase(it); + } +} // eo removeChildProcess(pid_t,ProcessImplementation*) + + +bool findChildProcess ( pid_t pid, ProcessImplementation* & obj ) +{ + PidProcList::iterator it = std::find_if( + g_pid_list.begin(), g_pid_list.end(), + CmpFirst(pid) ); + if (it == g_pid_list.end()) + { + return false; + } + obj = it->second; + return true; +} // eo findChildProcess(pid_t,ProcessImplementation*&) + + +} // eo namespace process + + + + + +/* +** misc tools +*/ + + +/** + * convenience tool for closing file descriptors... + */ +struct FdCloser +{ + int m_fd; + + FdCloser(int fd=-1) : m_fd(fd) {} + + ~FdCloser() + { + if (m_fd >= 0) ::close(m_fd); + } + + void release() { m_fd= -1; } + +}; // eo struct FdCloser + + + +} // eo namespace + + +namespace I2n +{ +namespace SimpleIo +{ + + +/* + * global functions + */ + +/** + * installs the handler for the child signal (SIGCHLD). + * Installing this handler is mandatory for the process subsystem to work correctly. + * @return @a true iff the child handler is successfully installed. + */ +bool installChildHandler() +{ + if (oldChildHandler) + { + // already installed + return true; + } + if (! ProcessManager::getInstance() ) + { + // we need an instance of the process manager + return false; + } + pending_pid_states.reserve( config::pid_pool_capacity ); + oldChildHandler = signal( Signal::CHLD, handleSigChild ); + if (oldChildHandler == SIG_ERR) + { + oldChildHandler= NULL; + return false; + } + return true; +} // eo installChildHandler + + +/** + * uninstalls the child handler. + * @return @a true iff the old child handler is reestablished. + */ +bool restoreChildHandler() +{ + if (!oldChildHandler) + { + return false; + } + void(*res)(int) = signal( Signal::CHLD, oldChildHandler); + + if (res == SIG_ERR) + { + return false; + } + oldChildHandler= NULL; + return true; +} // eo restoreChildHandler + + + + +/* + * Implementation of ProcessImplementation + */ + +IOImplementation2* ProcessImplementation::_StderrOnStdout = ((IOImplementation2*) 1); +IOImplementation2* ProcessImplementation::_UseParentsStderr = ((IOImplementation2*) 0); + + +/** + * @brief constructor for the process implementation. + * + * the constructor takes the path to the executable and (initial) cli arguments. + * + * @param path path to the executable. + * @param args initial command line arguments. + */ +ProcessImplementation::ProcessImplementation( + const std::string& path, + const std::vector& args + ) +: IOImplementation(-1,-1) +, m_path(path) +, m_nice_inc(0) +, m_create_new_session(false) +, m_pid(0) +, m_state(ProcessState::stopped) +, m_exit_code(0) +{ + m_args.push_back(path); + std::copy( args.begin(), args.end(), std::back_inserter(m_args) ); + process::addProcessInstance(this); +} // eo ProcessImplementation::ProcessImplementation(const std::string&) + + +ProcessImplementation::~ProcessImplementation() +{ + if (m_pid > 0 && m_state!=ProcessState::stopped) + { + stopProcess(true); + } + process::removeProcessInstance(this); +} // eo ProcessImplementation::~ProcessImplementation() + + +void ProcessImplementation::close(Direction direction) +{ + inherited::close(direction); + if (!inherited::opened() && (m_state != ProcessState::stopped) ) + { + stopProcess(false); + } +} // eo ProcessImplementation::close(Direction) + + +/** + * returns an object for adding new arguments to the argument list. + * @return the adder object. + */ +PushBackFiller ProcessImplementation::getArgAdder() +{ + return PushBackFiller(m_args); +} // eo ProcessImplementation::getArgAdder() + + +/** + * @brief set if the process should create a new session when started. + * @param enable determine if the process should start a new session. + * @return @a true iff the value of enable was accepted. + * + * If the process is already running, a new value is not accepted. + */ +bool ProcessImplementation::setCreateNewSession( bool enable ) +{ + if (m_state != ProcessState::stopped and enable != m_create_new_session) + { + return false; + } + m_create_new_session= enable; + return true; +} // eo ProcessImplementation::setCreateNewSession(bool); + + +/** + * @brief sets a new nice increment. + * @param nice the desired nice increment. + * @return @a true if the value was accepted and - in case the process was already started - + * the nice value was successfully changed. + */ +bool ProcessImplementation::setNice(int nice) +{ + errno= 0; + if (m_state != ProcessState::stopped) + { + int delta= m_nice_inc + nice; + m_nice_inc= nice; + int res= ::nice(delta); + if (res == -1 and errno !=0 ) + { + return false; + } + } + else + { + m_nice_inc = nice; + } + return true; +} // eo ProcessImplementation::setNice(int) + + +/** + * @brief sets the work dir the process should be started with. + * @param workdir the workdir + * @return @a true if the new workdir was accepted. + * + * The method will return @a false if the process is already started. + * The workdir can only be set before the process is started. + */ +bool ProcessImplementation::setWorkDir(const std::string& workdir) +{ + if ( m_state != ProcessState::stopped and workdir != m_workdir) + { + return false; + } + if (not workdir.empty()) + { + I2n::Stat stat(workdir); + if (not stat or not stat.is_directory()) + { + return false; + } + } + m_workdir= workdir; + return true; +} // eo ProcessImplementation::setWorkDir(const std::string&) + + +/** + * @brief sets new arguments for the process (the path to the binary is kept). + * + * @param args the new cli arguments for the subprocess (replacing the old ones). + */ +void ProcessImplementation::resetArgs( const std::vector< std::string >& args ) +{ + if (m_args.size() > 1) + { + m_args.erase( ++m_args.begin(), m_args.end()); + } + std::copy( args.begin(), args.end(), std::back_inserter(m_args) ); +} // eo ProcessImplementation::resetArgs(const std::vectors< std::string >&) + + +/** + * starts the new process. + * provides pipes for sending data to/ receiving data from the new process. + * Basically forks and execs the new process. + * + * @param stderr if not NULL the given object will be connected to stderr of the new process. + * The object can then be used for reading the data from the process' stderr; but cannot be written to. + * (The object will be closed if it was open). + * If the constant @a ProcessImplementation::StderrOnStdout is passed then stderr of the new process will + * be written to the same channel as stdout (i.e. can be read from the process class instance like the + * normal output). + * If NULL then the stderr channel from the parent process will also be used by the child. + * @return @a true iff the new subprocess started. + */ +bool ProcessImplementation::startProcess( IOImplementation2 *stderr ) +{ + bool stderr2stdout= false; + m_errno = 0; + m_input_buffer.clear(); + if (m_pid > 0 && m_state != ProcessState::stopped) + { + // process still/already running... + return false; + } + m_exit_code= 0; + + if (stderr == _StderrOnStdout) + { + stderr2stdout= true; + stderr= NULL; + } + + int to_process_pipe[2]; + int from_process_pipe[2]; + int from_process_stderr_pipe[2]= { -1, -1 }; + + if ( ::pipe(to_process_pipe) ) + { + m_errno= errno; + return false; + } + FdCloser closeTo0( to_process_pipe[0] ); + FdCloser closeTo1( to_process_pipe[1] ); + if ( ::pipe (from_process_pipe) ) + { + m_errno= errno; + return false; + } + FdCloser closeFrom0( from_process_pipe[0] ); + FdCloser closeFrom1( from_process_pipe[1] ); + if (stderr) + { + if (stderr->opened()) stderr->close(); + if ( ::pipe (from_process_stderr_pipe) ) + { + m_errno= errno; + return false; + } + } + FdCloser closeFromErr0( from_process_stderr_pipe[0] ); + FdCloser closeFromErr1( from_process_stderr_pipe[1] ); + + m_pid = ::fork(); + + if ( m_pid == (pid_t)-1 ) + { + m_errno= errno; + m_pid= 0; + // error; something went wrong + return false; + } + else if (m_pid > 0) + { + // we are in the parent part + + // keep the fd's we need and (later) close the other ones: + closeTo1.release(); // don't close this fd! + setWriteFd(to_process_pipe[1]); + closeFrom0.release(); // don't close this fd! + setReadFd(from_process_pipe[0]); + + if (stderr) + { + closeFromErr0.release(); // don't close this fd! + stderr->setReadFd(from_process_stderr_pipe[0]); + } + + m_state= ProcessState::running; + process::addChildProcess(m_pid,this); + DOUT(" started child with pid " << m_pid); + return true; + } + else // pid > 0 + { + // we are in the child part + + // dup the fd's for stdin/-out/-err into place: + ::dup2(to_process_pipe[0],0); + ::dup2(from_process_pipe[1],1); + if (stderr) + { + ::dup2(from_process_stderr_pipe[1],2); + ::close(from_process_stderr_pipe[0]); ::close(from_process_stderr_pipe[1]); + } + else if (stderr2stdout) + { + ::dup2(from_process_pipe[1],2); + } + // close what we don't need: + ::close(to_process_pipe[0]); ::close(to_process_pipe[1]); + ::close(from_process_pipe[0]); ::close(from_process_pipe[1]); + + // set workdir if requested: + if (not m_workdir.empty()) + { + int r= ::chdir( m_workdir.c_str() ); + if (r !=0 ) + { + //TODO? + exit(255); + } + } + + // + // collect args: + char **argv= new char*[m_args.size()+1]; + int i=0; + for(std::vector::iterator it= m_args.begin(); + it != m_args.end(); + ++it,++i) + { + argv[i]= strdup( it->c_str() ); + } + argv[i]= NULL; + // update nice level: + if (m_nice_inc) + { + nice(m_nice_inc); + } + // create a new session id if requested: + if (m_create_new_session) + { + setsid(); + } + // execute: + execv(m_path.c_str(), argv); + // exit if exec failed + exit(255); + //cleanup! ... just joking; we exec or we exit, in either case the system cleans + // everything which needs to be cleaned up. + } + return false; // keep the compiler happy... +} // eo ProcessImplementation::startProcess() + + +/** + * convenience method for starting the child process. + * This method uses predefined enum values for the stderr handling mode. + * + * @param stderr_mode the desired stderr mode. + * @return @a true iff the child process was created. + */ +bool ProcessImplementation::startProcess( ProcessImplementation::StderrMode stderr_mode ) +{ + switch (stderr_mode) + { + case UseParentsStderr: + return startProcess( _UseParentsStderr ); + + case StderrOnStdout: + return startProcess( _StderrOnStdout ); + } + return false; +}; // eo ProcessImplementation::startProcess(ProcessImplementation::StderrMode) + + +/** + * stops the process. + * + * @todo think about a more intelligent handling... + */ +void ProcessImplementation::stopProcess(bool force) +{ + // TODO: do it somewhat more intelligent?! + if (force) + { + kill(Signal::KILL); + //TODO: set running state? + } + else + { + kill(Signal::TERM); + } +} // eo ProcessImplementation::stop(bool) + + + +/** + * sends a signal to the child process. + * @param signal the Signal which should be send. + * @return @a true if the signal was sent; @a false if an error occured. + */ +bool ProcessImplementation::kill(Signal signal) +{ + m_errno = 0; + if (m_pid == 0 || m_pid == (pid_t)-1) + { + m_errno= ESRCH; + return false; + } + int res = ::kill(m_pid, signal); + if (res < 0) + { + m_errno= errno; + return false; + } + if (signal == Signal::CONT && m_state == ProcessState::suspended) + { + m_state = ProcessState::running; + } + return true; +} // eo ProcessImplementation::kill(Signal) + + + +/** + * set a new child state with information gobbled by the child signal handler. + * + * @note This method should only be called by the process manager! + * + * @param pid the pid of the child process. + * @param status the new status value (as delivered by waitpid()) + */ +void ProcessImplementation::setChildState(pid_t pid, int status) +{ + DOUT("setChildState("<first; + int status = it->second; + ODOUT(" pid=" << pid << ", status=" << status); + ProcessImplementation *process_obj; + if (process::findChildProcess(pid,process_obj)) + { + ODOUT(" local managed child, process_obj="<< process_obj); + // pid found in list: + if (!WIFSTOPPED(status) +#ifdef WIFCONTINUED + && !WIFCONTINUED(status) +#endif + ) + { + // take it from list if the child exited: + process::removeChildProcess(pid,process_obj); + } + if (process_obj) + { + // give the process object a chance to handle the state change: + process_obj->setChildState(pid, status); + } + } + else + { + ODOUT("foreign child"); + // pid not found in list: + /* NOTE: in a non threaded environment this pid must be from a child process which is not + managed by this process classes; since this method is called after all setup of a child process + is done (; especially entering the new child pid into our internal lists). + */ + m_foreign_pid_states.push_back(*it); + } + } + + // handle the foreign childs: + { + /* idea: + * fetch a (pid,status) from the list, erase it (to avoid reentrance problems) + * and fire the signal. If someone forks childs outside this module then he can + * connect to the signal and receive all necessary status information gobbled by + * our child handler. + */ + while (! m_foreign_pid_states.empty()) + { + PidStateList::iterator it= m_foreign_pid_states.begin(); + pid_t pid = it->first; + int status = it->second; + m_foreign_pid_states.erase(it); + m_foreign_child_state_changed_signal(pid,status); + } + } + +} // eo ProcessManager::execute + + +} // eo namespace SimpleIo +} // eo namespace I2n diff --git a/asyncio/simpleprocess.hpp b/asyncio/simpleprocess.hpp new file mode 100644 index 0000000..d42c755 --- /dev/null +++ b/asyncio/simpleprocess.hpp @@ -0,0 +1,198 @@ +/** @file + * + * simple process handling based on simple io classes. + * + * (c) Copyright 2007-2008 by Intra2net AG + * + * info@intra2net.com + */ + +#ifndef _CONND_SIMPLEPROCESS_HPP_ +#define _CONND_SIMPLEPROCESS_HPP_ + +#include +#include + +#include + +#include +#include +#include "simpleio.hpp" + + +namespace I2n +{ +namespace SimpleIo +{ + +using SystemTools::Signal; +using SystemTools::ScopedSignalBlocker; + + +class ProcessManager; + + +typedef std::pair< pid_t, int > PidStatePair; +typedef std::vector< PidStatePair > PidStateList; + + +/** + * represents process states. + */ +struct ProcessState +{ + enum _ProcessState + { + stopped = 0, + running, + suspended + }; // eo enum _ProcessState + + _ProcessState m_state; + + ProcessState(_ProcessState _state = stopped) : m_state(_state) {} + + operator _ProcessState() const { return m_state; } +}; // eo struct ProcessState + + +/** + * specialisation of the io implementation class which fork/exec's a subprocess and + * connects with the new child's stdin/stdout. + * + * @note the signal @a IOImplementation::m_signal_eof of the base class can be used to detect when the + * new child closes it's stdout (which usually means that the child ended). + */ +class ProcessImplementation : public IOImplementation +{ + typedef IOImplementation inherited; + + friend class ProcessManager; + + public: + + enum StderrMode + { + /// "magic" constant to pass to start() when childs stderr should be the same as parents stderr. + UseParentsStderr= 0, + /// "magic" constant to pass to start() when stderr should be the same as stdout for the new process. + StderrOnStdout + }; // eo enum StderrMode + + public: + ProcessImplementation( + const std::string& path, + const std::vector& args = std::vector()); + virtual ~ProcessImplementation(); + + virtual void close(Direction direction = Direction::both); + + virtual bool startProcess( IOImplementation2* stderr ); + bool startProcess( StderrMode stderr_mode = UseParentsStderr ); + + virtual void stopProcess(bool force=false); + + PushBackFiller getArgAdder(); + + bool setCreateNewSession( bool enable= true); + + bool setNice(int nice); + + bool setWorkDir(const std::string& workdir); + + void resetArgs( const std::vector< std::string >& args = std::vector< std::string >() ); + + /// returns the current process state + ProcessState processState() const { return m_state; } + + ///returns the exit code of the process (if in stopped state) + int exitCode() const { return m_exit_code; } + + + protected: + + bool kill(const Signal signal); + + void setChildState(pid_t pid, int status); + + protected: + /// the path to the binary + std::string m_path; + /// argument list (starting with argv0, usually the name of the binary) + std::vector m_args; + /// increment of the nice level when the new child is started + int m_nice_inc; + /// determines if the child should start a new session. + bool m_create_new_session; + /// determines the workdir where the child process should be started with. + std::string m_workdir; + + /// the pid of the child process + pid_t m_pid; + /// the state of the child process + ProcessState m_state; + /// the exit code of the child (-1 if not available yet) + int m_exit_code; + + /// signal which is fired when the child terminated + SignalType m_signal_terminated; + + + /// "magic" constant to pass to start() when childs stderr should be the same as parents stderr. + static IOImplementation2* _UseParentsStderr; + /// "magic" constant to pass to start() when stderr should be the same as stdout for the new process. + static IOImplementation2* _StderrOnStdout; + + private: + +}; // eo class ProcessImplementation + + +/** + * manages overall process related stuff. + * + * @note this class is implemented as a singleton. + * @note this class uses the io timer interface to be called within the backend loops when necessary. + */ +class ProcessManager : public TimerBase +{ + public: + + static ProcessManager* getInstance(); + + protected: + ProcessManager(); + ProcessManager(const ProcessManager&); + + virtual void execute(); + + void activateMe(); + + public: + + /** + * the signal which is fired when waitpid() returns a status for a child process + * which is not managed by this process subsystem. + * Another module which forks child processes can connect to this signal to receive + * the information when these child processes are terminated. + */ + boost::signal m_foreign_child_state_changed_signal; + + protected: + + static ProcessManager *the_instance; + + PidStateList m_foreign_pid_states; + + private: +}; + + +bool installChildHandler(); +bool restoreChildHandler(); + + +} // eo namespace SimpleIo +} // eo I2n + +#endif diff --git a/asyncio/simplesocket.cpp b/asyncio/simplesocket.cpp new file mode 100644 index 0000000..def93c0 --- /dev/null +++ b/asyncio/simplesocket.cpp @@ -0,0 +1,397 @@ +/** @file + * + * (c) Copyright 2008 by Intra2net AG + * + * info@intra2net.com + * + * @todo unlink unix server socket on close. + */ + +#include "simplesocket.hpp" +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace I2n +{ +namespace SimpleIo +{ + + +namespace +{ + +struct sockaddr_un dummy_un; + +union MegaAddr { + struct sockaddr m_addr; + struct sockaddr_in m_addr_in; + struct sockaddr_un m_addr_un; // NOTE (historically) too small... + // storage is large enough to hold all sockaddr_* variants with the (historical) exception of _un ! + struct sockaddr_storage m_addr_store; + // a char array large enough to hold _un (with an path up to the maximum allowed size!) + // (the +1 is added for a later 0-termination of the path) + char m_buffer[ sizeof(dummy_un) - sizeof(dummy_un.sun_path) + PATH_MAX + 1 ]; +}; + + +} // eo namespace + + + +/* +** implementation of ServerSocketBaseImplementation +*/ + + +ServerSocketBaseImplementation::ServerSocketBaseImplementation() +: IOImplementation() +{ +} // eo ServerSocketBaseImplementation::ServerSocketBaseImplementation() + + +/** + * @brief handled incoming connections on the server port. + * + * accepts the new connection, stores the peer address in an internal buffer + * and calls a (derived) acceptNewConnection method to create an apropriate + * IO class instance. + * If no io instance is created the connection is closed. + */ +void ServerSocketBaseImplementation::doRead() +{ + MegaAddr addr; + socklen_t addrlen = sizeof(addr); + + // reset errno: + m_errno= 0; + + // reset the mark: + resetReadMark(); + + int fd = ::accept( readFd(), &addr.m_addr, &addrlen); + if (fd < 0) + { + // handle errors. + m_errno= errno; + switch (m_errno) + { + case EBADF: + case EINVAL: + case ENOTSOCK: + // should not happen... + close(); + break; + default:; + } + return; + } + + if (addrlen < sizeof(addr)) + { + // in case of unix domain socket: terminate the path! + // NOTE we are doing this here since we don't pass the length info. + addr.m_buffer[addrlen]= 0; + } + else + { + //something went terribly wrong!! + // the resulting address structure is larger than it ever could be... + ::close(fd); + return; + } + + IOImplementationPtr connection= acceptNewConnection(fd, &addr.m_addr); + if(!connection) + { + ::close(fd); + return; + } + if (m_new_connection_base_callback) + { + m_new_connection_base_callback(connection); + } +} // eo ServerSocketBaseImplementation::doRead() + + +/** + * @brief handles write events. + * + * since server sockets never ever get a write mark, something must + * be wrong and the connection is closed! + */ +void ServerSocketBaseImplementation::doWrite() +{ + // if this method is called, something went wrong... + close(); + //TODO throw something?! +} // eo ServerSocketBaseImplementation::doWrite() + + + +/** + * @brief sets a function which is called when a new connection was established. + * + * The function gets a (shared) pointer to the new connetion as parameter and is + * expected to store it when it accepts the connection. + * (Else the conenction gets deleted after the function was called.) + * + * @param func the function which hsould be called on new conenctions. + */ +void ServerSocketBaseImplementation::setNewConnectionBaseCallback( + const NewConnectionBaseCallbackFunc& func) +{ + m_new_connection_base_callback= func; +} // eo ServerSocketBaseImplementation::setNewConnectionBaseCallback(NewConnectionBaseCallbackFunc&) + + + +/** + * @brief callback for new connections. + * + * The base method always returns an empty pointer. + * + * Derived classes must override this method and do something useful with + * the passed file descriptor. + * + * @param fd the file descriptor of the new connection. + * @param addr pointer to the address structure filled from ::accept() + * @return shared pointer to the new IO class instance (; empty if not accepted) + */ +IOImplementationPtr ServerSocketBaseImplementation::acceptNewConnection( + int fd, boost::any addr) +{ + // needs to be defined in derived class! + return IOImplementationPtr(); +} // eo ServerSocketBaseImplementation::acceptNewConnection(int,boost::any) + + + +/* +** implementation of UnixIOSocket +*/ + + +UnixIOSocket::UnixIOSocket() +{ +} // eo UnixIOSocket::UnixIOSocket() + + + +UnixIOSocket::UnixIOSocket(const std::string& path) +: m_peer_pid(0) +, m_peer_uid(0) +, m_peer_gid(0) +{ + open(path); +} // eo UnixIOSocket::UnixIOSocket(const std::string&) + + +UnixIOSocket::UnixIOSocket( + int fd, const std::string& path, + unsigned int peer_pid, unsigned int peer_uid, unsigned int peer_gid) +: IOImplementation(fd,fd) +, m_path(path) +, m_peer_pid(peer_pid) +, m_peer_uid(peer_uid) +, m_peer_gid(peer_gid) +{ +} // eo UnixIOSocket::UnixIOSocket(int,const std::string&,unsigned,unsigned,unsigned) + + +/** + * @brief opens a (client) connection to an unix domain socket. + * + * @param path the path the server is listening on. + * @return @a true iff the connection was successfully opened. + */ +bool UnixIOSocket::open(const std::string& path) +{ + if (opened()) close(); + + m_errno= 0; + m_path.clear(); + + if (path.empty() || path.size() >= PATH_MAX) + { + return false; + } + + int fd= ::socket(PF_UNIX, SOCK_STREAM, 0); + if (fd<0) + { + m_errno= errno; + return false; + } + + { + MegaAddr addr; + addr.m_addr_un.sun_family= AF_UNIX; + strncpy(addr.m_addr_un.sun_path, path.c_str(), PATH_MAX); + if (::connect(fd,(sockaddr*)&addr.m_addr_un, SUN_LEN(&addr.m_addr_un)) < 0) + { + m_errno= errno; + ::close(fd); + return false; + } + } + m_path= path; + setReadFd(fd); + setWriteFd(fd); + return true; +} // eo UnixIOSocket::open(const std::string&,int) + + +/* +** implementation of UnixServerSocketBase +*/ + + +UnixServerSocketBase::UnixServerSocketBase() +{ +} // eo UnixServerSocketBase::UnixServerSocketBase + + +UnixServerSocketBase::UnixServerSocketBase(const std::string& path, int mode) +{ + open(path,mode); +} // eo UnixServerSocketBase::UnixServerSocketBase(const std::string&,int) + + +/** + * @brief opens the server part of an unix domain socket. + * + * @param path the path the new port should listen on. + * @param mode the mode for the path. + * @return @a true iff the port was successfully opened. + */ +bool UnixServerSocketBase::open(const std::string& path, int mode) +{ + if (opened()) close(); + m_errno= 0; + if (path.empty() || path.size() >= PATH_MAX) + { + return false; + } + + int fd= ::socket(PF_UNIX, SOCK_STREAM, 0); + if (fd<0) + { + m_errno= errno; + return false; + } + + { + MegaAddr addr; + addr.m_addr_un.sun_family= AF_UNIX; + strncpy(addr.m_addr_un.sun_path, path.c_str(), PATH_MAX); + ::I2n::unlink(path); // just in case... + mode_t old_mask= ::umask( (mode & 0777) ^ 0777); + if (::bind(fd,(sockaddr*)&addr.m_addr_un, SUN_LEN(&addr.m_addr_un)) < 0) + { + m_errno= errno; + ::umask(old_mask); + ::close(fd); + return false; + } + ::umask(old_mask); + } + m_path= path; + + { + int res= ::listen(fd,8); + if (res < 0) + { + m_errno= errno; + ::close(fd); + return false; + } + } + setReadFd(fd); + return true; +} // eo UnixServerSocketBase::open(const std::string&,int) + + +/** + * @brief called from base class to create a new connection instance. + * + * This method accepts only connections to unix domain sockets. + * It also tries to determine the peer pid, uid and gid. + * + * @param fd the file descriptor of a freshly accepted connection. + * @param addr conatins "pointer to struct sockaddr" + * @return @a a (shared) pointer to the new connection isntance; empty if none was + * created. + */ +IOImplementationPtr UnixServerSocketBase::acceptNewConnection(int fd, boost::any addr) +{ + struct sockaddr *addr_ptr= NULL; + try { + addr_ptr = boost::any_cast(addr); + } + catch (boost::bad_any_cast&) + { + return IOImplementationPtr(); + } + // check for the right family: + if (addr_ptr->sa_family != AF_UNIX) + { + return IOImplementationPtr(); + } + struct sockaddr_un *un_ptr = reinterpret_cast(addr_ptr); + std::string peer_path( un_ptr->sun_path ); + unsigned peer_pid=0; + unsigned peer_gid=0; + unsigned peer_uid=0; +#ifdef __linux__ + { // the linux way to get peer info (pid,gid,uid): + struct ucred cred; + socklen_t cred_len = sizeof(cred); + if (getsockopt(fd,SOL_SOCKET,SO_PEERCRED,&cred,&cred_len) == 0) + { + peer_pid= cred.pid; + peer_uid= cred.uid; + peer_gid= cred.gid; + } + } +#else +#error dont know how to determine peer info. +#endif + + UnixIOSocketPtr ptr( createIOSocket(fd, peer_path, peer_pid, peer_uid, peer_gid) ); + return ptr; +} // eo UnixServerSocketBase::acceptNewConnection(int,boost::any); + + +/** + * @brief "real" creator of the connection instance. + * + * called by UnixServerSocketBase::acceptNewConnection to create the new io instance. + * + * @param fd file descriptor for the socket + * @param path path as delivered by peer. + * @param peer_pid peer pid. + * @param peer_uid peer uid. + * @param peer_gid peer gid. + * @return (shared) pointer to the new io instance. + */ +UnixIOSocketPtr UnixServerSocketBase::createIOSocket( + int fd, const std::string& path, + unsigned int peer_pid, + unsigned int peer_uid, unsigned int peer_gid) +{ + return UnixIOSocketPtr( + new UnixIOSocket(fd, path, peer_pid, peer_uid, peer_gid) + ); +} // eo UnixServerSocketBase::createIOSocket(int,const std::string&,unsigned,unsigned,unsigned) + + + +}// eo namespace SimpleIo +}// eo namespace I2n diff --git a/asyncio/simplesocket.hpp b/asyncio/simplesocket.hpp new file mode 100644 index 0000000..745c787 --- /dev/null +++ b/asyncio/simplesocket.hpp @@ -0,0 +1,215 @@ +/** @file + * @brief socket classes for the SimpleIo framework. + * + * + * (c) Copyright 2008 by Intra2net AG + * + * info@intra2net.com + */ + +#ifndef __SIMPLEIO__SIMPLESOCKET_HPP__ +#define __SIMPLEIO__SIMPLESOCKET_HPP__ + +#include "simpleio.hpp" + +#include +#include +#include +#include +#include +#include +#include + + +namespace I2n +{ +namespace SimpleIo +{ + + +typedef boost::shared_ptr< IOImplementation > IOImplementationPtr; + + +/** + * @brief base class for server sockets. + * + * Contains all the stuff which is common for different types of server sockets. + */ +class ServerSocketBaseImplementation +: public IOImplementation +{ + public: + typedef boost::function< void(IOImplementationPtr) > NewConnectionBaseCallbackFunc; + + public: + + void setNewConnectionBaseCallback( const NewConnectionBaseCallbackFunc& func); + + protected: + ServerSocketBaseImplementation(); + + + virtual void doRead(); + virtual void doWrite(); + + virtual IOImplementationPtr acceptNewConnection(int fd, boost::any addr); + + + protected: + + NewConnectionBaseCallbackFunc m_new_connection_base_callback; + +}; // eo class ServerSocketBaseImplementation + + +typedef boost::shared_ptr< ServerSocketBaseImplementation > ServerSocketBaseImplementationPtr; + + +/* +** unix domain sockets +*/ + + +template< + class IOClass +> +class UnixServerSocket; + + +/** + * @brief spezialized IO class for unix domain sockets. + * + */ +class UnixIOSocket +: public IOImplementation +{ + public: + UnixIOSocket(); + UnixIOSocket(const std::string& path); + + bool open(const std::string& path); + + protected: + friend class UnixServerSocketBase; + friend class UnixServerSocket; + + UnixIOSocket( + int fd, const std::string& path, + unsigned int peer_pid, unsigned int peer_uid, unsigned int peer_gid); + + protected: + + std::string m_path; + unsigned int m_peer_pid; + unsigned int m_peer_uid; + unsigned int m_peer_gid; + +}; // eo class UnixIOSocket + +typedef boost::shared_ptr< UnixIOSocket > UnixIOSocketPtr; + + + +/** + * @brief spezialized server socket class for unix domain sockets. + * + */ +class UnixServerSocketBase +: public ServerSocketBaseImplementation +{ + public: + UnixServerSocketBase(); + UnixServerSocketBase(const std::string& path, int mode=0600); + + bool open(const std::string& path, int mode= 0600); + + protected: + + virtual IOImplementationPtr acceptNewConnection(int fd, boost::any addr); + + virtual UnixIOSocketPtr createIOSocket( + int fd, const std::string& path, + unsigned int peer_pid, + unsigned int peer_uid, unsigned int peer_gid); + + protected: + + std::string m_path; + +}; // eo class UnixServerSocketBase + + +/** + * @brief unix server socket class which "produces" connections of a determined type. + * + + @param IOClass the type of the connections. + */ +template< + class IOClass +> +class UnixServerSocket +: public UnixServerSocketBase +{ + BOOST_STATIC_ASSERT(( boost::is_base_of::value )); + + public: + typedef boost::shared_ptr< IOClass > IOClassPtr; + typedef boost::function< void(IOClassPtr) > NewConnectionCallbackFunc; + + public: + + UnixServerSocket() + : UnixServerSocketBase() + {} + + UnixServerSocket(const std::string& path, int mode=0600) + : UnixServerSocketBase(path,mode) + {} + + void setNewConnectionCallback( const NewConnectionCallbackFunc& func) + { + if (func) + { + UnixServerSocketBase::setNewConnectionBaseCallback( + boost::bind( + func, + boost::bind( + &UnixServerSocket::my_ptr_cast, + _1 + ) + ) + ); + } + else + { + UnixServerSocketBase::setNewConnectionBaseCallback( + NewConnectionBaseCallbackFunc() + ); + } + } + + protected: + + virtual UnixIOSocketPtr createIOSocket( + int fd, const std::string& path, + unsigned int peer_pid, + unsigned int peer_uid, unsigned int peer_gid) + { + return UnixIOSocketPtr( + new IOClass(fd, path, peer_pid, peer_uid, peer_gid) + ); + } + + static IOClassPtr my_ptr_cast(IOImplementationPtr ptr) + { + return boost::dynamic_pointer_cast(ptr); + } + +}; // eo class UnixServerSocket + + +}// eo namespace SimpleIo +}// eo namespace I2n + + +#endif diff --git a/asyncio/simpletimer.cpp b/asyncio/simpletimer.cpp new file mode 100644 index 0000000..f716981 --- /dev/null +++ b/asyncio/simpletimer.cpp @@ -0,0 +1,107 @@ +/** @file + * + * (c) Copyright 2007 by Intra2net AG + * + * info@intra2net.com + */ + +//#define NOISEDEBUG + + +#include "simpletimer.hpp" + + +#ifdef NOISEDEBUG +#include +#include +#define DOUT(msg) std::cout << msg << std::endl +#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl +#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl +#else +#define DOUT(msg) do {} while (0) +#define FODOUT(obj,msg) do {} while (0) +#define ODOUT(msg) do {} while (0) +#endif + + +namespace I2n +{ +namespace SimpleIo +{ + + +/* + * Implementation of SimpleTimer + */ + +SimpleTimer::SimpleTimer() +{ +} // eo SimpleTimer::SimpleTimer() + + +SimpleTimer::SimpleTimer(const MilliTime& delta, const TimerSignal::slot_function_type& action) +{ + addAction(action); + startTimer(delta); +} // eo SimpleTimer::SimpleTimer(const MilliTime&, const TimerSignal::slot_type&) + + +SimpleTimer::SimpleTimer(long milli_seonds_delta, const TimerSignal::slot_function_type& action) +{ + addAction(action); + startTimerMS(milli_seonds_delta); +} // eo SimpleTimer::SimpleTimer(long, const TimerSignal::slot_type&) + + +void SimpleTimer::execute() +{ + ODOUT("execute()!"); + m_timer_signal(); + m_timer_signal_p(this); +} // eo SimpleTimer::execute + + +void SimpleTimer::addAction( const TimerSignal::slot_function_type& action ) +{ + m_timer_signal.connect(action); +} // eo SimpleTimer::addAction(const TimerSignal::slot_type&) + + +void SimpleTimer::addActionP( const TimerSignalP::slot_function_type& action ) +{ + m_timer_signal_p.connect(action); +} // eo SimpleTimer::addAction(const TimerSignalP::slot_type&) + + +void SimpleTimer::startTimer( const MilliTime& delta ) +{ + m_delta= delta; + setDeltaWhenTime( delta ); + activate(true); +#ifdef NOISEDEBUG + MilliTime now, t; + get_current_time(now); + t= getWhenTime(); + MilliTime dt= t-now; + ODOUT("startTimer"); + ODOUT(" now: sec="<< now.mt_sec << ", msec="< + +namespace I2n +{ +namespace SimpleIo +{ + + +class SimpleTimer : public TimerBase +{ + public: + + typedef boost::signal TimerSignal; + typedef boost::signal TimerSignalP; + + public: + + SimpleTimer(); + + // convenience constructors for simple timeouts: + SimpleTimer(const MilliTime& delta, const TimerSignal::slot_function_type& action); + SimpleTimer(long milli_seonds_delta, const TimerSignal::slot_function_type& action); + + // add actions: + void addAction( const TimerSignal::slot_function_type& action ); + void addActionP( const TimerSignalP::slot_function_type& action ); + + // start the timer: + void startTimer( const MilliTime& delta ); + void startTimerMS( long milli_seconds ); + + // stop the timer: + void stopTimer(); + + protected: + MilliTime m_delta; + + TimerSignal m_timer_signal; + TimerSignalP m_timer_signal_p; + + virtual void execute(); +}; // eo class SimpleTimer + + +} // eo namespace SimpleIo +} // eo namespace I2n + +#endif -- 1.7.1