From aba4c34d56e1c270663b4166c7ea7e8f23f06b74 Mon Sep 17 00:00:00 2001 From: Reinhard Pfau Date: Tue, 21 Oct 2008 12:46:58 +0000 Subject: [PATCH] libasyncio: (reinhard) moved some stuff into a separate utils module. Building RPMs now works. --- AUTHORS | 7 +- Makefile.am | 2 +- README | 38 ++ TODO | 6 + asyncio/Makefile.am | 11 +- asyncio/async_io.cpp | 239 +--------- asyncio/async_io.hpp | 60 +--- asyncio/libasyncio.pc.in | 6 +- configure.in | 7 +- doc/Makefile.am | 2 +- glue_t2n/Makefile.am | 8 +- glue_t2n/async_io_t2n.cpp | 958 ------------------------------------ glue_t2n/async_io_t2n.hpp | 206 -------- glue_t2n/asyncio_t2n.cpp | 959 +++++++++++++++++++++++++++++++++++++ glue_t2n/asyncio_t2n.hpp | 206 ++++++++ templates/cpp | 2 +- templates/hpp | 2 +- unittest/Makefile.am | 8 +- unittest/test_simpleio_basics.cpp | 2 +- utils/Makefile.am | 18 + utils/asyncio_ptr_list.hpp | 221 +++++++++ utils/asyncio_time_tools.cpp | 214 +++++++++ utils/asyncio_time_tools.hpp | 83 ++++ utils/asyncio_utils.cpp | 21 + utils/asyncio_utils.hpp | 15 + utils/libasyncio_utils.pc.in | 10 + 26 files changed, 1836 insertions(+), 1475 deletions(-) delete mode 100644 glue_t2n/async_io_t2n.cpp delete mode 100644 glue_t2n/async_io_t2n.hpp create mode 100644 glue_t2n/asyncio_t2n.cpp create mode 100644 glue_t2n/asyncio_t2n.hpp create mode 100644 utils/Makefile.am create mode 100644 utils/asyncio_ptr_list.hpp create mode 100644 utils/asyncio_time_tools.cpp create mode 100644 utils/asyncio_time_tools.hpp create mode 100644 utils/asyncio_utils.cpp create mode 100644 utils/asyncio_utils.hpp create mode 100644 utils/libasyncio_utils.pc.in diff --git a/AUTHORS b/AUTHORS index abc505d..4f722c1 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1 +1,6 @@ -Reinhard Pfau +Authors of libasyncio: +====================== + +Reinhard Pfau + + diff --git a/Makefile.am b/Makefile.am index 5ac978e..112f367 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,5 +1,5 @@ AUTOMAKE_OPTIONS = foreign -SUBDIRS = asyncio glue_t2n unittest doc +SUBDIRS = utils asyncio glue_t2n unittest doc EXTRA_DIST+= LICENSE diff --git a/README b/README index e69de29..aa28536 100644 --- a/README +++ b/README @@ -0,0 +1,38 @@ +libasyncio +=========== + +a library providing an infrastructure for dealing with asnychronous +io and support event driven programming. + +The basic idea for this lib is to provide classes which hide the details +of polling/selecting for events on file descriptors and provide a more +convenient interface which supports the idea of event driven programming. + + +The library was originally developed by Intra2net as part of the +Intranator connection daemon which is part of the Intranator (see http://www.intranator.com/). +During development it was extracted as a separate module. +And then we decide to publish it under The GNU Lesser Public License (LGPL). + + + +The libasyncio provides: + +- basic class for handling reading from and writing to fd's +- filter plugin support for incoming/outgoing data. +- timer events +- deferred function calls; "frozen" function calls +- start and handle subprocesses (connecting to their stdin/stdout/stderr) +- unix domain socket handling +- event signals (using boost-signal lib) +- ... and some more stuff. + + +It depend on the boost library (see http://www.boost.org/). + + +Links: +====== + +the boost library: http://www.boost.org/ +Intra2net: http://www.intra2net.com/ diff --git a/TODO b/TODO index e69de29..66aa10c 100644 --- a/TODO +++ b/TODO @@ -0,0 +1,6 @@ +TODO list +========== + +optimize, consolidate, beautify (a never ending task :-) ) + + diff --git a/asyncio/Makefile.am b/asyncio/Makefile.am index 4e25b19..eadb89d 100644 --- a/asyncio/Makefile.am +++ b/asyncio/Makefile.am @@ -1,4 +1,4 @@ -INCLUDES = @LIBI2NCOMMON_CFLAGS@ @BOOST_CPPFLAGS@ +INCLUDES = -I$(top_srcdir)/utils @BOOST_CPPFLAGS@ @LIBI2NCOMMON_CFLAGS@ METASOURCES = AUTO lib_LTLIBRARIES = libasyncio.la libasyncio_la_SOURCES = async_callout.cpp async_io.cpp async_pipe.cpp \ @@ -9,5 +9,12 @@ libasyncio_la_LIBADD = @LIBI2NCOMMON_LIBS@ @BOOST_LDFLAGS@ @BOOST_SIGNALS_LIB@ libasyncio_la_LDFLAGS = -version-info @LIBASYNCIO_LIB_VERSION@ -pkgconfigdir=$(libdir)/pkgconfig +pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA= libasyncio.pc + +$(top_srcdir)/headerlist.asyncio: $(include_HEADERS) + list='$(include_HEADERS)'; \ + ( for n in $$list; do echo $(includedir)/$$n ; done) > $@ + +.PHONY: headerlist +headerlist: $(top_srcdir)/headerlist.asyncio diff --git a/asyncio/async_io.cpp b/asyncio/async_io.cpp index 28f7b77..95d916b 100644 --- a/asyncio/async_io.cpp +++ b/asyncio/async_io.cpp @@ -43,9 +43,11 @@ #define ODOUT(msg) do {} while (0) #endif + namespace { +using namespace AsyncIo::Utils; /* * configuration: @@ -121,95 +123,11 @@ struct PollDataCluster }; // eo struct PollDataCluster -template -class PtrList : public std::list -{ - typedef std::list inherited; - public: - bool dirty; - - static int Instances; - - public: - - PtrList() - : dirty(false) - { - ++Instances; - } // eo PtrList - - - ~PtrList() - { - ODOUT(""); - --Instances; - } - - - /** - * add a new item pointer to the list. - * - * @param item item pointer which should be added - */ - void add_item(T* item) - { - typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item); - if (it != inherited::end()) - { - // nothing to do since item is already in the list - return; - } - push_back(item); - } // eo add - - - /** - * remove an item pointer from the list by setting NULL at the current position of the item and mark - * the list as dirty. - * - * @param item the io object which should be removed from the list. - */ - void remove_item(T* item) - { - typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item); - if (it == inherited::end()) - { - // nothing to do: - return; - } - *it = NULL; // forget the pointer - dirty= true; // ..and mark the list as dirty (i.e. has NULL elements) - } // eo remove - - - /** - * cleans the list of objects by removing the NULL elements (if any). - * - * @note this function should only be called when it is ensured that no other functions using iterators of this list. - */ - void clean_list() - { - if (!dirty) - { - // nothing to do - return; - } - // remove the NULL elements now: - erase( - std::remove( inherited::begin(), inherited::end(), (T*)NULL), - inherited::end() ); - dirty= false; - } // eo clean_list - - -}; // eo class PtrList - - -typedef PtrList IOList; -typedef PtrList TimerList; +typedef PtrList< AsyncIo::IOImplementation, true > IOList; +typedef PtrList< AsyncIo::TimerBase, true > TimerList; -template<> int IOList::Instances= 0; -template<> int TimerList::Instances= 0; +template<> int IOList::GlobalCountType::InstanceCount= 0; +template<> int TimerList::GlobalCountType::InstanceCount= 0; /** @@ -329,35 +247,6 @@ struct FilterMatch { }; // eo struct FilterMatch -void get_current_real_time(long& current_sec, long& current_msec) -{ - struct timeval tv; - gettimeofday(&tv,NULL); - current_sec= tv.tv_sec; - current_msec= (tv.tv_usec / 1000); - if (current_msec >= 1000) - { - current_sec += (current_msec / 1000); - current_msec%= 1000; - } -} // eo get_current_real_time - - -void get_current_monotonic_time(long& current_sec, long& current_msec) -{ - long nsec; - if (monotonic_clock_gettime(current_sec,nsec)) - { - current_msec= nsec / 1000000L; - } - else - { - //fallback... - get_current_real_time(current_sec,current_msec); - } -} // eo get_current_monotonic_time - - } // eo anonymous namespace @@ -368,118 +257,6 @@ namespace AsyncIo { -/** - * @brief gets the current time as MilliTime structure. - * @param[out] mt reference to the MilliTime strcucture which is filled with the result. - */ -void get_current_real_time(MilliTime& mt) -{ - long sec, msec; - ::get_current_real_time(sec,msec); - mt.set(sec,msec); -} // eo get_current_real_time - - -/** - * @brief gets the current time as MilliTime structure. - * @param[out] mt reference to the MilliTime strcucture which is filled with the result. - */ -void get_current_monotonic_time(MilliTime& mt) -{ - long sec, msec; - ::get_current_monotonic_time(sec,msec); - mt.set(sec,msec); -} // eo get_current_monotonic_time - - -/* - * implementation of MilliTime - */ - -MilliTime::MilliTime(long sec, long msec) -: mt_sec(sec), mt_msec(msec) -{ - normalize(); -} // eo MilliTime::MilliTime - - -void MilliTime::set(long sec, long msec) -{ - mt_sec= sec; - mt_msec= msec; - normalize(); -} // eo MilliTime::set - - -/** - * normalizes the values, so that mt_msec has a value between 0 and 999. - */ -void MilliTime::normalize() -{ - if (mt_msec < 0) - { - mt_sec += (mt_msec / 1000) - 1; - mt_msec = (mt_msec % 1000) + 1000; - } - else if (mt_msec>=1000) - { - mt_sec+= (mt_msec / 1000); - mt_msec %= 1000; - } -} // eo MilliTime::normalize - - -/** - * determine if the represented point in time is before another one. - * @param other the other point in time. - * @return true if the own point in time is before the other one. - */ -bool MilliTime::operator < (MilliTime& other) -{ - normalize(); - other.normalize(); - return - (mt_sec < other.mt_sec) - || (( mt_sec == other.mt_sec) && (mt_msec < other.mt_msec)); -} // eo MilliTime::operator < - - -/** - * determine if two point in times are equal. - * @param other the point in time to compare with. - * @return true if the represented times are equal. - */ -bool MilliTime::operator == (MilliTime& other) -{ - normalize(); - other.normalize(); - return (( mt_sec == other.mt_sec) && (mt_msec == other.mt_msec)); -} // eo MilliTime::operator < - -/** - * @brief subtracts a time delta from the object. - * @param lhs the time delta to subtract. - * @return reference to the object itself. - */ -MilliTime& MilliTime::operator -= (const MilliTime& lhs) -{ - mt_sec -= lhs.mt_sec; - mt_msec -= lhs.mt_msec; -} // eo operator -= - - -/** - * @brief adds a time delta from the object. - * @param lhs the time delta to add. - * @return reference to the object itself. - */ -MilliTime& MilliTime::operator += (const MilliTime& lhs) -{ - mt_sec += lhs.mt_sec; - mt_msec += lhs.mt_msec; -} // eo operator += - - /* * implementation of TimerBase */ @@ -501,7 +278,7 @@ TimerBase::TimerBase() TimerBase::~TimerBase() { ODOUT("enter"); - if (internal_io::TimerList::Instances) + if (internal_io::TimerList::Instances()) { ODOUT("remove from list"); internal_io::g_timer_list().remove_item(this); @@ -704,7 +481,7 @@ IOImplementation::IOImplementation(int read_fd, int write_fd) IOImplementation::~IOImplementation() { close(); - if (internal_io::IOList::Instances) + if (internal_io::IOList::Instances()) { internal_io::g_io_list().remove_item(this); } diff --git a/asyncio/async_io.hpp b/asyncio/async_io.hpp index fb2278f..f86f7b5 100644 --- a/asyncio/async_io.hpp +++ b/asyncio/async_io.hpp @@ -18,6 +18,8 @@ #include #include +#include + #include #include @@ -28,7 +30,7 @@ namespace AsyncIo { using namespace I2n; - +using Utils::MilliTime; /* * forward declarations @@ -60,53 +62,6 @@ struct Direction }; // eo struct IODirection; -/** - * structure for storing (a point in time as) seconds and milliseconds. - */ -struct MilliTime -{ - long mt_sec; - long mt_msec; - - MilliTime(long sec=0, long msec=0); - - void set(long sec, long msec=0); - - inline long long get_milliseconds() const - { - return ((long long)mt_sec * 1000L + mt_msec); - } // eo get_milliseconds - - void normalize(); - - bool operator < (MilliTime& other); - bool operator == (MilliTime& other); - - MilliTime& operator -= (const MilliTime& lhs); - MilliTime& operator += (const MilliTime& lhs); - -}; // eo struct MilliTime - - -inline MilliTime operator + (const MilliTime& rhs, const MilliTime& lhs) -{ - MilliTime t(rhs); - return t+= lhs; -} // eo operator + (const MilliTime& rhs, const MilliTime lhs) - -inline MilliTime operator - (const MilliTime& rhs, const MilliTime& lhs) -{ - MilliTime t(rhs); - return t-= lhs; -} // eo operator - (const MilliTime& rhs, const MilliTime lhs) - - -inline bool operator <= (MilliTime& rhs, MilliTime& lhs) -{ - return (rhs - * - * @copyright © Copyright 2008 by Intra2net AG - * @license LGPL - * @contact info@intra2net.com - */ - -#include "async_io_t2n.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include - - -namespace AsyncIo -{ - - -namespace -{ - - -Logger::PartLogger& module_logger() -{ - static Logger::PartLogger _module_logger(HERE); - return _module_logger; -} // eo module_logger(); - - - -/** - * @brief a class with some methods we like to have on our io classes. - * - * This class is to be used as second base in a wrapper class and needs it's methods to - * be redefined for "the real thing". - */ -class IOExportWrapperBase -{ - public: - - IOExportWrapperBase() - { - } - - virtual ~IOExportWrapperBase() - { - } - - virtual void sendData(const std::string& data) - { - } - - - virtual std::string receiveData() - { - return std::string(); - } - - virtual boost::signals::connection connectEof( const boost::function< void() >& func ) - { - return boost::signals::connection(); - } - - virtual boost::signals::connection connectRead( const boost::function< void() >& func ) - { - return boost::signals::connection(); - } - -}; // eo class IOExportWrapperBase - - -typedef boost::shared_ptr< IOExportWrapperBase > IOExportWrapperBasePtr; - - -/** - * @brief IO wrapper template. - * @tparam IOClass a type based on AsyncIo::IOImplementation - * - * The type is used as a public base for the resulting class; the second public base is our - * helper with the additional methods we need internally and which we (finally) define here. - */ -template< - class IOClass -> -class IOExportWrapper -: public IOClass -, public IOExportWrapperBase -{ - BOOST_STATIC_ASSERT(( boost::is_base_of< IOImplementation,IOClass >::value )); - - public: - IOExportWrapper() - { - } - - template< - typename Arg1 - > - IOExportWrapper(Arg1 arg1) - : IOClass(arg1) - {} - - - template< - typename Arg1, typename Arg2 - > - IOExportWrapper(Arg1 arg1, Arg2 arg2) - : IOClass(arg1,arg2) - {} - - - template< - typename Arg1, typename Arg2, typename Arg3 - > - IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3) - : IOClass(arg1,arg2,arg3) - {} - - - template< - typename Arg1, typename Arg2, typename Arg3, typename Arg4 - > - IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4) - : IOClass(arg1,arg2,arg3,arg4) - {} - - - template< - typename Arg1, typename Arg2, typename Arg3, typename Arg4, typename Arg5 - > - IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4, Arg5 arg5) - : IOClass(arg1,arg2,arg3,arg4,arg5) - {} - - - /** - * @brief exposed funtion for sending data. - * @param data the chunk to be send. - */ - virtual void sendData(const std::string& data) - { - IOClass::lowSend(data); - } - - /** - * @brief returns the new received data. - * @return the receievd data. - * - * Clears the receive buffer. - */ - virtual std::string receiveData() - { - std::string result; - result.swap(IOClass::m_input_buffer); - return result; - } - - /** - * @brief exposed connect to EOF signal. - * @param func the function which should be connected to the eof signal. - * @return signal connection handle. - */ - virtual boost::signals::connection connectEof( const boost::function< void() >& func ) - { - return IOClass::m_signal_eof.connect(func); - } - - /** - * @brief exposed connect to "read" signal. - * @param func the function which should be connected to the "read" signal. - * @return signal connection handle. - */ - virtual boost::signals::connection connectRead( const boost::function< void() >& func ) - { - return IOClass::m_signal_read.connect(func); - } - - protected: - -}; // eo class IOExportWrapper - - -/* -** specialized versions of io classes: -*/ - -/** - * @brief enhanced unix domain socket class with reconnect feature. - * - * Used for t2n client connections. - */ -class T2nUnixIOSocket -: public AsyncIo::UnixIOSocket -{ - typedef AsyncIo::UnixIOSocket inherited; - public: - T2nUnixIOSocket( const std::string& path ); - - virtual void close(AsyncIo::Direction direction = AsyncIo::Direction::both); - - bool reopen(bool force= false); - - protected: - - virtual void doRead(); - - - protected: - - bool m_in_do_read; - bool m_may_reconnect; - -}; // T2nUnixIOSocket - - -T2nUnixIOSocket::T2nUnixIOSocket(const std::string& path) -: inherited(path) -, m_in_do_read(false) -, m_may_reconnect(false) -{ -} // eo T2nUnixIOSocket::T2nUnixIOSocket(const std::string&) - - -void T2nUnixIOSocket::close(AsyncIo::Direction direction) -{ - bool was_open= opened(); - inherited::close(direction); - if (m_in_do_read and not opened()) - { - m_may_reconnect= was_open; - } -} // eo T2nUnixIOSocket::close(AsyncIo::Direction) - - -bool T2nUnixIOSocket::reopen(bool force) -{ - if (m_path.empty()) - { - return false; - } - if (m_may_reconnect || force) - { - return inherited::open( m_path ); - } - return false; -} // eo T2nUnixIOSocket::reopen() - - -void T2nUnixIOSocket::doRead() -{ - m_in_do_read= true; - try - { - inherited::doRead(); - } - catch (...) - { - m_in_do_read= false; - throw; - } - m_in_do_read= false; -} // eo T2nUnixIOSocket::doRead() - - -/** - * @brief server class for libt2n using unix domain sockets. - * - * well, it's enough to provide an appropriate constructor. - * (did i mention that templates are really cool stuff? :-) ) - */ -class T2NUnixServer -: public T2NServerBase -{ - public: - - T2NUnixServer(const std::string& path, int mode=0600) - : T2NServerBase( ServerSocketBaseImplementationPtr( - new UnixServerSocket< - IOExportWrapper< UnixIOSocket > - >(path, mode) - ) ) - { - } // eo T2NServerBase - -}; // eo T2NUnixServer - - - -class RealT2NClientConnection -: public T2NClientConnection -{ - public: - RealT2NClientConnection( AsyncIo::IOImplementationPtr connection ) - : T2NClientConnection(connection) - { - } -}; // eo class T2NClient - -} // eo namespace - - - -/* -** implementation of T2NClientConnection -*/ - - -T2NClientConnection::T2NClientConnection( - IOImplementationPtr connection -) -: libt2n::client_connection() -, m_real_connection(connection) -, m_got_new_data(false) -{ - SCOPETRACKER(); - IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection); - if (!ptr) - { - module_logger().error(HERE) << "illegal pointer passed"; - close(); - return; - } - if (!connection->opened()) - { - module_logger().warning(HERE) << "connection not open, either failed or already closed"; - close(); - return; - } - - ptr->connectRead( boost::bind(&T2NClientConnection::newDataSlot, this) ); - ptr->connectEof( boost::bind(&T2NClientConnection::eofSlot, this) ); - -} // eo T2NClientConnection::T2NClientConnection(IOImplementationPtr) - - -T2NClientConnection::~T2NClientConnection() -{ - SCOPETRACKER(); -} // eo T2NClientConnection::~T2NClientConnection - - -/** - * @brief returns if the connection is open. - * @return @a true if the connection is open. - */ -bool T2NClientConnection::isOpen() -{ - return m_real_connection and m_real_connection->opened(); -} // eo T2NClientConnection::isOpen() - - -/** - * @brief try to reopen a connection. - * @return @a true if the connection was reopened. - */ -bool T2NClientConnection::reopen(bool force) -{ - if (not m_real_connection) - { - return false; - } - boost::shared_ptr< T2nUnixIOSocket > t2n_socket= - boost::shared_dynamic_cast< T2nUnixIOSocket >(m_real_connection); - if (t2n_socket) - { - return t2n_socket->reopen(force); - } - return false; -} // eo T2NClientConnection::reopen() - - -/** - * @brief closes the connection. - * - * This closes the underlying IO connection and calls libt2n::server_connection::close() to - * mark the connection as closed for libt2n. - */ -void T2NClientConnection::close() -{ - SCOPETRACKER(); - if (m_real_connection) - { - m_real_connection->close(); - m_real_connection.reset(); - } - libt2n::client_connection::close(); -} // eo T2NClientConnection::close() - - -/** - * @brief sends a raw data chunk on the connection. - * - * @param data the (raw) data chunk which should be sended. - */ -void T2NClientConnection::real_write(const std::string& data) -{ - SCOPETRACKER(); - if (is_closed()) - { - module_logger().warning(HERE) << "attempt to write data on closed connection"; - return; - } - IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); - if (!ptr) - { - // should never happen... - module_logger().error(HERE)<< "illegal io pointer"; - close(); - //TODO: throw an error?! - NOT_REACHED(); - return; - } - ptr->sendData(data); -} // eo T2NClientConnection::real_write(const std::string) - - -/** - * @brief called to fill the connection buffer. - * - * Since this class uses the asnychronous AsyncIo framework, new data may already be read when - * this method is called. - * - * @param usec_timeout - * @param usec_timeout_remaining - * @return @a true if new data is available. - */ -bool T2NClientConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining) -{ - SCOPETRACKER(); - if (is_closed()) - { - module_logger().debug(HERE) << "fill_buffer() called on closed connection"; - return false; - } - AsyncIo::MilliTime t0,t1; - AsyncIo::get_current_monotonic_time(t0); - if (!m_got_new_data) - { - IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); - if (!ptr) - { - module_logger().error(HERE) << "illegal io pointer"; - close(); - return false; - } - // try to fetch data (call the backend) - int timeout= 0; - - if (usec_timeout<0) - { - timeout= -1; - } - else if (usec_timeout > 0) - { - long long msec_timeout= (usec_timeout + 500)/1000; - - if (msec_timeout >= INT_MAX) - { - timeout= INT_MAX; - } - else - { - timeout= (int)msec_timeout; - } - } - Backend::getBackend()->doOneStep( timeout ); - } - AsyncIo::get_current_monotonic_time(t1); - if (usec_timeout_remaining) - { - long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L; - *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L; - module_logger().debug() << "timeout: " << usec_timeout << " -> " << *usec_timeout_remaining; - } - if (m_got_new_data) - { - m_got_new_data= false; - return true; - } - return false; -} // eo T2NClientConnection::fill_buffer(long long,long long*) - - -/** - * @brief called when new data arrived on this connection. - * - * reads the new data from the underlying IO object and stores it in the connection buffer. - * Also remembers (in the bool member var @a m_got_new_data) that new data was received. - */ -void T2NClientConnection::newDataSlot() -{ - SCOPETRACKER(); - IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); - if (!ptr) - { - //TODO: throw an error?! - NOT_REACHED(); - return; - } - std::string new_data= ptr->receiveData(); - module_logger().debug() << "got " << new_data.size() << " bytes of new data"; - buffer+= new_data; - m_got_new_data= true; -} // eo T2NClientConnection::newDataSlot() - - -/** - * @brief called when an EOF was detected by the underlying IO object (i.e. the connection - * was closed by the peer side). - * - * Calls close(). - */ -void T2NClientConnection::eofSlot() -{ - SCOPETRACKER(); - close(); -} // eo T2NClientConnection::eofSlot() - - -/* -** implementation of T2NServerConnection -*/ - - -T2NServerConnection::T2NServerConnection( - T2NServerBasePtr server, - IOImplementationPtr connection, - int timeout -) -: libt2n::server_connection(timeout) -, m_real_connection(connection) -, m_server_weak_ptr(server) -, m_got_new_data(false) -{ - SCOPETRACKER(); - IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection); - if (!ptr) - { - module_logger().error(HERE) << "illegal pointer passed"; - close(); - return; - } - if (!connection->opened()) - { - module_logger().warning(HERE) << "connection not open, either failed or already closed"; - close(); - return; - } - - ptr->connectRead( boost::bind(&T2NServerConnection::newDataSlot, this) ); - ptr->connectEof( boost::bind(&T2NServerConnection::eofSlot, this) ); - -} // eo T2NServerConnection::T2NServerConnection(IOImplementationPtr) - - -T2NServerConnection::~T2NServerConnection() -{ - SCOPETRACKER(); -} // eo T2NServerConnection::~T2NServerConnection - - -/** - * @brief closes the connection. - * - * This closes the underlying IO connection and calls libt2n::server_connection::close() to - * mark the connection as closed for libt2n. - */ -void T2NServerConnection::close() -{ - SCOPETRACKER(); - if (m_real_connection) - { - m_real_connection->close(); - m_real_connection.reset(); - } - libt2n::server_connection::close(); -} // eo T2NServerConnection::close() - - -/** - * @brief sends a raw data chunk on the connection. - * - * @param data the (raw) data chunk which should be sended. - */ -void T2NServerConnection::real_write(const std::string& data) -{ - SCOPETRACKER(); - if (is_closed()) - { - module_logger().warning(HERE) << "attempt to write data on closed connection"; - return; - } - IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); - if (!ptr) - { - // should never happen... - module_logger().error(HERE)<< "illegal io pointer"; - close(); - //TODO: throw an error?! - NOT_REACHED(); - return; - } - module_logger().debug() << "send " << data.size() << " bytes of data"; - ptr->sendData(data); -} // eo T2NServerConnection::real_write(const std::string) - - -/** - * @brief called to fill the connection buffer. - * - * Since this class uses the asnychronous AsyncIo framework, new data may already be read when - * this method is called. - * - * @param wait determines if we need to wait; if @a false it is just checked if new data - * was received, but no backend cycle is executed. - * @param usec_timeout - * @param usec_timeout_remaining - * @return @a true if new data is available. - */ -bool T2NServerConnection::low_fill_buffer(bool wait, long long usec_timeout, long long* usec_timeout_remaining) -{ - SCOPETRACKER(); - if (is_closed()) - { - module_logger().debug(HERE) << "fill_buffer() called on closed connection"; - return false; - } - if (not m_got_new_data and wait) - { - AsyncIo::MilliTime t0,t1; - AsyncIo::get_current_monotonic_time(t0); - IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); - if (!ptr) - { - module_logger().error(HERE) << "illegal io pointer"; - close(); - return false; - } - // try to fetch data (call the backend) - int timeout= 0; - - if (usec_timeout<0) - { - timeout= -1; - } - else if (usec_timeout > 0) - { - long long msec_timeout= (usec_timeout + 500)/1000; - - if (msec_timeout >= INT_MAX) - { - timeout= INT_MAX; - } - else - { - timeout= (int)msec_timeout; - } - } - Backend::getBackend()->doOneStep( timeout ); - AsyncIo::get_current_monotonic_time(t1); - if (usec_timeout_remaining) - { - long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L; - *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L; - } - } - else - { - if (usec_timeout_remaining) - { - *usec_timeout_remaining= usec_timeout; - } - } - if (m_got_new_data) - { - m_got_new_data= false; - return true; - } - return false; -} // eo T2NServerConnection::low_fill_buffer(bool,long long,long long*) - - -/** - * @brief called to fill the connection buffer. - * - * Since this class uses the asnychronous AsyncIo framework, new data may already be read when - * this method is called. - * - * @param usec_timeout - * @param usec_timeout_remaining - * @return @a true if new data is available. - */ -bool T2NServerConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining) -{ - return low_fill_buffer(true, usec_timeout, usec_timeout_remaining); -} // eo T2NServerConnection::fill_buffer(long long,long long*) - - -/** - * @brief called when new data arrived on this connection. - * - * reads the new data from the underlying IO object and stores it in the connection buffer. - * Also remembers (in the bool member var @a m_got_new_data that new data was received. - */ -void T2NServerConnection::newDataSlot() -{ - SCOPETRACKER(); - IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); - if (!ptr) - { - //TODO:throw an error?! - NOT_REACHED(); - return; - } - std::string new_data= ptr->receiveData(); - buffer+= new_data; - module_logger().debug() << "got " << new_data.size() << " bytes of new data"; - m_got_new_data= true; - reset_timeout(); - - T2NServerBasePtr server =m_server_weak_ptr.lock(); - if (server) - { - server->m_signal_client_got_new_data(); - } -} // eo T2NServerConnection::newDataSlot() - - -/** - * @brief called when an EOF was detected by the underlying IO object (i.e. the connection - * was closed by the peer side). - * - * Calls close(). - */ -void T2NServerConnection::eofSlot() -{ - SCOPETRACKER(); - close(); -} // eo T2NServerConnection::eofSlot() - - - -/* -** implementation of T2NServerBase -*/ - - -/** - * @brief constructs a libt2n server object. - * - * @param server_port shared pointer to a (AsyncIo) port server object which - * is used as underlying port handler. - */ -T2NServerBase::T2NServerBase( ServerSocketBaseImplementationPtr server_port) -: m_server_port(server_port) -, m_new_data_available(false) -{ - SCOPETRACKER(); - // register our callback for new incoming conncetions. - server_port->setNewConnectionBaseCallback( - boost::bind(&T2NServerBase::newConnectionSlot, this, _1) - ); - m_signal_client_got_new_data.connect - ( - boost::bind(&T2NServerBase::clientGotNewDataSlot, this) - ); -} // eo T2NServerBase::T2NServerBase(ServerSocketBaseImplementationPtr) - - -/** - * @brief destructor. - * - */ -T2NServerBase::~T2NServerBase() -{ - SCOPETRACKER(); -} // eo T2NServerBase::~T2NServerBase() - - -/** - * @brief returns wether the server port is opened. - * - * @return @a true iff the server port is open. - */ -bool T2NServerBase::isOpen() -{ - return (m_server_port && m_server_port->opened()); -} // eo T2NServerBase - - - -/** - * @brief callback for the port server object when a new connection is established. - * - * @param io_ptr the (shared) pointer to the new connection. - */ -void T2NServerBase::newConnectionSlot(IOImplementationPtr io_ptr) -{ - SCOPETRACKER(); - add_connection( new T2NServerConnection( get_ptr_as< T2NServerBase >(), io_ptr, get_default_timeout() ) ); -} // eo T2NServerBase::newConnectionSlot(IOImplementationPtr) - - -/** - * @brief callback for "new data available" signal - */ -void T2NServerBase::clientGotNewDataSlot() -{ - m_new_data_available= true; -} // eo T2NServerBase::clientGotNewDataSlot() - - -/** - * @brief try to fill the buffers of the managed connections. - * - * will be called by T2NServerBase::fill_buffer(). - * - * @return @a true if at least one connection buffer has new data. - */ -bool T2NServerBase::fill_connection_buffers() -{ - SCOPETRACKER(); - Backend::getBackend()->doOneStep(0); - bool result= false; - for(std::map::iterator it=connections.begin(); - it != connections.end(); - ++it) - { - T2NServerConnection *conn = dynamic_cast(it->second); - if (!conn) - { - if (it->second) - { - // react somehow if (it->second) is not NULL... - module_logger().error(HERE) << "illegal connection pointer"; - it->second->close(); - } - continue; - } - if ( conn->low_fill_buffer(false, 0) ) - { - result= true; - } - } - return result; -} // eo T2NServerBase::fill_connection_buffers() - - -/** - * @brief fills the connection buffers. - * - * Uses the AsyncIo Backend to wait for new data. - * - * @param usec_timeout the maximum time period to wait for new data (in microseconds). - * 0 returns immediately, -1 waits until some event occurred. - * @param timeout_remaining ignored! - * @return @a true if new data for at least one connection arrived. - * - * @note since this method uses the AsyncIo backend, the timeout will have only milli second - * resolution. - */ -bool T2NServerBase::fill_buffer(long long usec_timeout, long long* timeout_remaining) -{ - SCOPETRACKER(); - - if (m_new_data_available) - { - // short cut if we already know that we have new data: - m_new_data_available= false; - return true; - } - - int timeout= 0; - - if (usec_timeout<0) - { - timeout= -1; - } - else if (usec_timeout > 0) - { - long long msec_timeout= (usec_timeout + 500)/1000; - - if (msec_timeout >= INT_MAX) - { - timeout= INT_MAX; - } - else - { - timeout= (int)msec_timeout; - } - } - // not really.. but it shouldn't be used either... - if (timeout_remaining) *timeout_remaining= 0L; - - if (! fill_connection_buffers() && timeout>0) - { - bool had_activity= Backend::getBackend()->doOneStep( timeout ); - return fill_connection_buffers(); - } - return true; -} // to T2NServerBase::fill_buffer(long long,long long*) - - - -/* -** creator functions: -*/ - - -/** - * @brief creates a server object with unix domain server socket. - * @param path path of the unix domain socket. - * @param mode mode for the socket. - * @return shared pointer with the new server object; empty if none could be created.. - */ -T2NServerBasePtr createT2NUnixServerPort(const std::string& path, int mode) -{ - SCOPETRACKER(); - boost::shared_ptr< T2NUnixServer > result( new T2NUnixServer(path,mode) ); - if (!result->isOpen()) - { - module_logger().error(HERE) - << "failed to open unix domain server socket on \"" << path << "\""; - } - return result; -} // eo createT2NUnixServerPort(const std::string&,int) - - -/** - * @brief creates a client object connected to a server via unix daomain socket. - * @param path path of cthe unix domain socket. - * @return shared pointer with the new client object; empty if none could be created.. - */ -T2NClientConnectionPtr createT2NUnixClientConnection(const std::string& path) -{ - typedef IOExportWrapper< AsyncIo::UnixIOSocket > MyIo; - typedef boost::shared_ptr< MyIo > MyIoPtr; - SCOPETRACKER(); - MyIoPtr connection( new MyIo(path) ); - boost::shared_ptr< RealT2NClientConnection > result( new RealT2NClientConnection( connection ) ); - if (not result->isOpen()) - { - module_logger().error(HERE) - << "failed to open unix domain client socket on \"" << path << "\""; - return T2NClientConnectionPtr(); - } - return result; -} // eo createT2NUnixClientConnection(const std::string&) - -} // eo namespace AsyncIo diff --git a/glue_t2n/async_io_t2n.hpp b/glue_t2n/async_io_t2n.hpp deleted file mode 100644 index 0a998ce..0000000 --- a/glue_t2n/async_io_t2n.hpp +++ /dev/null @@ -1,206 +0,0 @@ -/** - * @file - * @brief the "glue" between libt2n and AsyncIo framework. - * - * contains our own server and client class which should fit into the asnychronous way of "AsyncIo". - * We use our own classes since the libt2n socket classes are made for synchronous operation - * which can lead to problems even if we import the connection fd's into "AsyncIo"... - * - * @author Reinhard Pfau \ - * - * @copyright © Copyright 2008 by Intra2net AG - * @license LGPL - * @contact info@intra2net.com - * - * @todo support for TCP/IP connections. - */ - -#ifndef __CONND_GLUE_T2N_HPP__ -#define __CONND_GLUE_T2N_HPP__ - -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -namespace AsyncIo -{ - -using namespace I2n; - -/** - * @brief specialized version of the libt2n client connection which fits into the AsyncIo framework. - * - */ -class T2NClientConnection -: public libt2n::client_connection -{ - public: - - T2NClientConnection(); - virtual ~T2NClientConnection(); - - bool isOpen(); - - bool reopen(bool force= false); - - - /* - ** overloaded methods from libt2n classes: - */ - - virtual void close(); - - protected: - - T2NClientConnection( AsyncIo::IOImplementationPtr connection ); - - void newDataSlot(); - - void eofSlot(); - - /* - ** overloaded methods from t2n classes: - */ - - virtual void real_write(const std::string& data); - - virtual bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL); - - protected: - - AsyncIo::IOImplementationPtr m_real_connection; - - bool m_got_new_data; - -}; // eo class T2NClientConnection - - -typedef boost::shared_ptr< T2NClientConnection > T2NClientConnectionPtr; - - -class T2NServerBase; - -typedef boost::shared_ptr< T2NServerBase > T2NServerBasePtr; -typedef boost::weak_ptr< T2NServerBase > T2NServerBaseWeakPtr; - -/** - * @brief specialized version of the libt2n server connection which fits into the AsyncIo framework. - * - */ -class T2NServerConnection -: public libt2n::server_connection -{ - friend class T2NServerBase; - public: - - T2NServerConnection(); - virtual ~T2NServerConnection(); - - - /* - ** overloaded methods from libt2n classes: - */ - - virtual void close(); - - protected: - - T2NServerConnection( - T2NServerBasePtr server, - AsyncIo::IOImplementationPtr connection, - int timeout); - - void newDataSlot(); - - void eofSlot(); - - bool low_fill_buffer(bool wait, long long usec_timeout=-1, long long* usec_timeout_remaining=NULL); - - /* - ** overloaded methods from t2n classes: - */ - - virtual void real_write(const std::string& data); - - virtual bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL); - - - protected: - - AsyncIo::IOImplementationPtr m_real_connection; - T2NServerBaseWeakPtr m_server_weak_ptr; - - bool m_got_new_data; - -}; // eo class T2NServerConnection - - - -/** - * @brief base server class for handling server ports for libt2n. - * - * Needs to be derived for the real type of connection (unix, IPv4, etc.). - * - * Does all necessary connection handling and realizes the abstract methods from - * the libt2n::server class. - */ -class T2NServerBase -: public libt2n::server -, virtual public SharedBase -{ - public: - virtual ~T2NServerBase(); - - bool isOpen(); - - /* - ** overloaded methods from t2n classes: - */ - - virtual bool fill_buffer(long long usec_timeout=-1, long long* timeout_remaining=NULL); - - public: - - boost::signal< void() > m_signal_client_got_new_data; - - protected: - - T2NServerBase( AsyncIo::ServerSocketBaseImplementationPtr server_port); - - void newConnectionSlot(AsyncIo::IOImplementationPtr io_ptr); - - void clientGotNewDataSlot(); - - /* - ** overloaded methods from t2n classes: - */ - - virtual bool fill_connection_buffers(void); - - - protected: - - AsyncIo::ServerSocketBaseImplementationPtr m_server_port; - bool m_new_data_available; - -}; // eo T2NServerBase - - -typedef boost::shared_ptr< T2NServerBase > T2NServerBasePtr; - - -T2NServerBasePtr createT2NUnixServerPort(const std::string& path, int mode= 0600); - -T2NClientConnectionPtr createT2NUnixClientConnection(const std::string& path); - - -} // eo namespace AsyncIo - -#endif diff --git a/glue_t2n/asyncio_t2n.cpp b/glue_t2n/asyncio_t2n.cpp new file mode 100644 index 0000000..196190c --- /dev/null +++ b/glue_t2n/asyncio_t2n.cpp @@ -0,0 +1,959 @@ +/** + * @file + * + * @author Reinhard Pfau \ + * + * @copyright © Copyright 2008 by Intra2net AG + * @license LGPL + * @contact info@intra2net.com + */ + +#include "asyncio_t2n.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace AsyncIo +{ + +using namespace Utils; + +namespace +{ + + +Logger::PartLogger& module_logger() +{ + static Logger::PartLogger _module_logger(HERE); + return _module_logger; +} // eo module_logger(); + + + +/** + * @brief a class with some methods we like to have on our io classes. + * + * This class is to be used as second base in a wrapper class and needs it's methods to + * be redefined for "the real thing". + */ +class IOExportWrapperBase +{ + public: + + IOExportWrapperBase() + { + } + + virtual ~IOExportWrapperBase() + { + } + + virtual void sendData(const std::string& data) + { + } + + + virtual std::string receiveData() + { + return std::string(); + } + + virtual boost::signals::connection connectEof( const boost::function< void() >& func ) + { + return boost::signals::connection(); + } + + virtual boost::signals::connection connectRead( const boost::function< void() >& func ) + { + return boost::signals::connection(); + } + +}; // eo class IOExportWrapperBase + + +typedef boost::shared_ptr< IOExportWrapperBase > IOExportWrapperBasePtr; + + +/** + * @brief IO wrapper template. + * @tparam IOClass a type based on AsyncIo::IOImplementation + * + * The type is used as a public base for the resulting class; the second public base is our + * helper with the additional methods we need internally and which we (finally) define here. + */ +template< + class IOClass +> +class IOExportWrapper +: public IOClass +, public IOExportWrapperBase +{ + BOOST_STATIC_ASSERT(( boost::is_base_of< IOImplementation,IOClass >::value )); + + public: + IOExportWrapper() + { + } + + template< + typename Arg1 + > + IOExportWrapper(Arg1 arg1) + : IOClass(arg1) + {} + + + template< + typename Arg1, typename Arg2 + > + IOExportWrapper(Arg1 arg1, Arg2 arg2) + : IOClass(arg1,arg2) + {} + + + template< + typename Arg1, typename Arg2, typename Arg3 + > + IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3) + : IOClass(arg1,arg2,arg3) + {} + + + template< + typename Arg1, typename Arg2, typename Arg3, typename Arg4 + > + IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4) + : IOClass(arg1,arg2,arg3,arg4) + {} + + + template< + typename Arg1, typename Arg2, typename Arg3, typename Arg4, typename Arg5 + > + IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4, Arg5 arg5) + : IOClass(arg1,arg2,arg3,arg4,arg5) + {} + + + /** + * @brief exposed funtion for sending data. + * @param data the chunk to be send. + */ + virtual void sendData(const std::string& data) + { + IOClass::lowSend(data); + } + + /** + * @brief returns the new received data. + * @return the receievd data. + * + * Clears the receive buffer. + */ + virtual std::string receiveData() + { + std::string result; + result.swap(IOClass::m_input_buffer); + return result; + } + + /** + * @brief exposed connect to EOF signal. + * @param func the function which should be connected to the eof signal. + * @return signal connection handle. + */ + virtual boost::signals::connection connectEof( const boost::function< void() >& func ) + { + return IOClass::m_signal_eof.connect(func); + } + + /** + * @brief exposed connect to "read" signal. + * @param func the function which should be connected to the "read" signal. + * @return signal connection handle. + */ + virtual boost::signals::connection connectRead( const boost::function< void() >& func ) + { + return IOClass::m_signal_read.connect(func); + } + + protected: + +}; // eo class IOExportWrapper + + +/* +** specialized versions of io classes: +*/ + +/** + * @brief enhanced unix domain socket class with reconnect feature. + * + * Used for t2n client connections. + */ +class T2nUnixIOSocket +: public AsyncIo::UnixIOSocket +{ + typedef AsyncIo::UnixIOSocket inherited; + public: + T2nUnixIOSocket( const std::string& path ); + + virtual void close(AsyncIo::Direction direction = AsyncIo::Direction::both); + + bool reopen(bool force= false); + + protected: + + virtual void doRead(); + + + protected: + + bool m_in_do_read; + bool m_may_reconnect; + +}; // T2nUnixIOSocket + + +T2nUnixIOSocket::T2nUnixIOSocket(const std::string& path) +: inherited(path) +, m_in_do_read(false) +, m_may_reconnect(false) +{ +} // eo T2nUnixIOSocket::T2nUnixIOSocket(const std::string&) + + +void T2nUnixIOSocket::close(AsyncIo::Direction direction) +{ + bool was_open= opened(); + inherited::close(direction); + if (m_in_do_read and not opened()) + { + m_may_reconnect= was_open; + } +} // eo T2nUnixIOSocket::close(AsyncIo::Direction) + + +bool T2nUnixIOSocket::reopen(bool force) +{ + if (m_path.empty()) + { + return false; + } + if (m_may_reconnect || force) + { + return inherited::open( m_path ); + } + return false; +} // eo T2nUnixIOSocket::reopen() + + +void T2nUnixIOSocket::doRead() +{ + m_in_do_read= true; + try + { + inherited::doRead(); + } + catch (...) + { + m_in_do_read= false; + throw; + } + m_in_do_read= false; +} // eo T2nUnixIOSocket::doRead() + + +/** + * @brief server class for libt2n using unix domain sockets. + * + * well, it's enough to provide an appropriate constructor. + * (did i mention that templates are really cool stuff? :-) ) + */ +class T2NUnixServer +: public T2NServerBase +{ + public: + + T2NUnixServer(const std::string& path, int mode=0600) + : T2NServerBase( ServerSocketBaseImplementationPtr( + new UnixServerSocket< + IOExportWrapper< UnixIOSocket > + >(path, mode) + ) ) + { + } // eo T2NServerBase + +}; // eo T2NUnixServer + + + +class RealT2NClientConnection +: public T2NClientConnection +{ + public: + RealT2NClientConnection( AsyncIo::IOImplementationPtr connection ) + : T2NClientConnection(connection) + { + } +}; // eo class T2NClient + +} // eo namespace + + + +/* +** implementation of T2NClientConnection +*/ + + +T2NClientConnection::T2NClientConnection( + IOImplementationPtr connection +) +: libt2n::client_connection() +, m_real_connection(connection) +, m_got_new_data(false) +{ + SCOPETRACKER(); + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection); + if (!ptr) + { + module_logger().error(HERE) << "illegal pointer passed"; + close(); + return; + } + if (!connection->opened()) + { + module_logger().warning(HERE) << "connection not open, either failed or already closed"; + close(); + return; + } + + ptr->connectRead( boost::bind(&T2NClientConnection::newDataSlot, this) ); + ptr->connectEof( boost::bind(&T2NClientConnection::eofSlot, this) ); + +} // eo T2NClientConnection::T2NClientConnection(IOImplementationPtr) + + +T2NClientConnection::~T2NClientConnection() +{ + SCOPETRACKER(); +} // eo T2NClientConnection::~T2NClientConnection + + +/** + * @brief returns if the connection is open. + * @return @a true if the connection is open. + */ +bool T2NClientConnection::isOpen() +{ + return m_real_connection and m_real_connection->opened(); +} // eo T2NClientConnection::isOpen() + + +/** + * @brief try to reopen a connection. + * @return @a true if the connection was reopened. + */ +bool T2NClientConnection::reopen(bool force) +{ + if (not m_real_connection) + { + return false; + } + boost::shared_ptr< T2nUnixIOSocket > t2n_socket= + boost::shared_dynamic_cast< T2nUnixIOSocket >(m_real_connection); + if (t2n_socket) + { + return t2n_socket->reopen(force); + } + return false; +} // eo T2NClientConnection::reopen() + + +/** + * @brief closes the connection. + * + * This closes the underlying IO connection and calls libt2n::server_connection::close() to + * mark the connection as closed for libt2n. + */ +void T2NClientConnection::close() +{ + SCOPETRACKER(); + if (m_real_connection) + { + m_real_connection->close(); + m_real_connection.reset(); + } + libt2n::client_connection::close(); +} // eo T2NClientConnection::close() + + +/** + * @brief sends a raw data chunk on the connection. + * + * @param data the (raw) data chunk which should be sended. + */ +void T2NClientConnection::real_write(const std::string& data) +{ + SCOPETRACKER(); + if (is_closed()) + { + module_logger().warning(HERE) << "attempt to write data on closed connection"; + return; + } + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + // should never happen... + module_logger().error(HERE)<< "illegal io pointer"; + close(); + //TODO: throw an error?! + NOT_REACHED(); + return; + } + ptr->sendData(data); +} // eo T2NClientConnection::real_write(const std::string) + + +/** + * @brief called to fill the connection buffer. + * + * Since this class uses the asnychronous AsyncIo framework, new data may already be read when + * this method is called. + * + * @param usec_timeout + * @param usec_timeout_remaining + * @return @a true if new data is available. + */ +bool T2NClientConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining) +{ + SCOPETRACKER(); + if (is_closed()) + { + module_logger().debug(HERE) << "fill_buffer() called on closed connection"; + return false; + } + AsyncIo::MilliTime t0,t1; + AsyncIo::get_current_monotonic_time(t0); + if (!m_got_new_data) + { + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + module_logger().error(HERE) << "illegal io pointer"; + close(); + return false; + } + // try to fetch data (call the backend) + int timeout= 0; + + if (usec_timeout<0) + { + timeout= -1; + } + else if (usec_timeout > 0) + { + long long msec_timeout= (usec_timeout + 500)/1000; + + if (msec_timeout >= INT_MAX) + { + timeout= INT_MAX; + } + else + { + timeout= (int)msec_timeout; + } + } + Backend::getBackend()->doOneStep( timeout ); + } + AsyncIo::get_current_monotonic_time(t1); + if (usec_timeout_remaining) + { + long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L; + *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L; + module_logger().debug() << "timeout: " << usec_timeout << " -> " << *usec_timeout_remaining; + } + if (m_got_new_data) + { + m_got_new_data= false; + return true; + } + return false; +} // eo T2NClientConnection::fill_buffer(long long,long long*) + + +/** + * @brief called when new data arrived on this connection. + * + * reads the new data from the underlying IO object and stores it in the connection buffer. + * Also remembers (in the bool member var @a m_got_new_data) that new data was received. + */ +void T2NClientConnection::newDataSlot() +{ + SCOPETRACKER(); + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + //TODO: throw an error?! + NOT_REACHED(); + return; + } + std::string new_data= ptr->receiveData(); + module_logger().debug() << "got " << new_data.size() << " bytes of new data"; + buffer+= new_data; + m_got_new_data= true; +} // eo T2NClientConnection::newDataSlot() + + +/** + * @brief called when an EOF was detected by the underlying IO object (i.e. the connection + * was closed by the peer side). + * + * Calls close(). + */ +void T2NClientConnection::eofSlot() +{ + SCOPETRACKER(); + close(); +} // eo T2NClientConnection::eofSlot() + + +/* +** implementation of T2NServerConnection +*/ + + +T2NServerConnection::T2NServerConnection( + T2NServerBasePtr server, + IOImplementationPtr connection, + int timeout +) +: libt2n::server_connection(timeout) +, m_real_connection(connection) +, m_server_weak_ptr(server) +, m_got_new_data(false) +{ + SCOPETRACKER(); + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection); + if (!ptr) + { + module_logger().error(HERE) << "illegal pointer passed"; + close(); + return; + } + if (!connection->opened()) + { + module_logger().warning(HERE) << "connection not open, either failed or already closed"; + close(); + return; + } + + ptr->connectRead( boost::bind(&T2NServerConnection::newDataSlot, this) ); + ptr->connectEof( boost::bind(&T2NServerConnection::eofSlot, this) ); + +} // eo T2NServerConnection::T2NServerConnection(IOImplementationPtr) + + +T2NServerConnection::~T2NServerConnection() +{ + SCOPETRACKER(); +} // eo T2NServerConnection::~T2NServerConnection + + +/** + * @brief closes the connection. + * + * This closes the underlying IO connection and calls libt2n::server_connection::close() to + * mark the connection as closed for libt2n. + */ +void T2NServerConnection::close() +{ + SCOPETRACKER(); + if (m_real_connection) + { + m_real_connection->close(); + m_real_connection.reset(); + } + libt2n::server_connection::close(); +} // eo T2NServerConnection::close() + + +/** + * @brief sends a raw data chunk on the connection. + * + * @param data the (raw) data chunk which should be sended. + */ +void T2NServerConnection::real_write(const std::string& data) +{ + SCOPETRACKER(); + if (is_closed()) + { + module_logger().warning(HERE) << "attempt to write data on closed connection"; + return; + } + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + // should never happen... + module_logger().error(HERE)<< "illegal io pointer"; + close(); + //TODO: throw an error?! + NOT_REACHED(); + return; + } + module_logger().debug() << "send " << data.size() << " bytes of data"; + ptr->sendData(data); +} // eo T2NServerConnection::real_write(const std::string) + + +/** + * @brief called to fill the connection buffer. + * + * Since this class uses the asnychronous AsyncIo framework, new data may already be read when + * this method is called. + * + * @param wait determines if we need to wait; if @a false it is just checked if new data + * was received, but no backend cycle is executed. + * @param usec_timeout + * @param usec_timeout_remaining + * @return @a true if new data is available. + */ +bool T2NServerConnection::low_fill_buffer(bool wait, long long usec_timeout, long long* usec_timeout_remaining) +{ + SCOPETRACKER(); + if (is_closed()) + { + module_logger().debug(HERE) << "fill_buffer() called on closed connection"; + return false; + } + if (not m_got_new_data and wait) + { + AsyncIo::MilliTime t0,t1; + AsyncIo::get_current_monotonic_time(t0); + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + module_logger().error(HERE) << "illegal io pointer"; + close(); + return false; + } + // try to fetch data (call the backend) + int timeout= 0; + + if (usec_timeout<0) + { + timeout= -1; + } + else if (usec_timeout > 0) + { + long long msec_timeout= (usec_timeout + 500)/1000; + + if (msec_timeout >= INT_MAX) + { + timeout= INT_MAX; + } + else + { + timeout= (int)msec_timeout; + } + } + Backend::getBackend()->doOneStep( timeout ); + AsyncIo::get_current_monotonic_time(t1); + if (usec_timeout_remaining) + { + long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L; + *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L; + } + } + else + { + if (usec_timeout_remaining) + { + *usec_timeout_remaining= usec_timeout; + } + } + if (m_got_new_data) + { + m_got_new_data= false; + return true; + } + return false; +} // eo T2NServerConnection::low_fill_buffer(bool,long long,long long*) + + +/** + * @brief called to fill the connection buffer. + * + * Since this class uses the asnychronous AsyncIo framework, new data may already be read when + * this method is called. + * + * @param usec_timeout + * @param usec_timeout_remaining + * @return @a true if new data is available. + */ +bool T2NServerConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining) +{ + return low_fill_buffer(true, usec_timeout, usec_timeout_remaining); +} // eo T2NServerConnection::fill_buffer(long long,long long*) + + +/** + * @brief called when new data arrived on this connection. + * + * reads the new data from the underlying IO object and stores it in the connection buffer. + * Also remembers (in the bool member var @a m_got_new_data that new data was received. + */ +void T2NServerConnection::newDataSlot() +{ + SCOPETRACKER(); + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + //TODO:throw an error?! + NOT_REACHED(); + return; + } + std::string new_data= ptr->receiveData(); + buffer+= new_data; + module_logger().debug() << "got " << new_data.size() << " bytes of new data"; + m_got_new_data= true; + reset_timeout(); + + T2NServerBasePtr server =m_server_weak_ptr.lock(); + if (server) + { + server->m_signal_client_got_new_data(); + } +} // eo T2NServerConnection::newDataSlot() + + +/** + * @brief called when an EOF was detected by the underlying IO object (i.e. the connection + * was closed by the peer side). + * + * Calls close(). + */ +void T2NServerConnection::eofSlot() +{ + SCOPETRACKER(); + close(); +} // eo T2NServerConnection::eofSlot() + + + +/* +** implementation of T2NServerBase +*/ + + +/** + * @brief constructs a libt2n server object. + * + * @param server_port shared pointer to a (AsyncIo) port server object which + * is used as underlying port handler. + */ +T2NServerBase::T2NServerBase( ServerSocketBaseImplementationPtr server_port) +: m_server_port(server_port) +, m_new_data_available(false) +{ + SCOPETRACKER(); + // register our callback for new incoming conncetions. + server_port->setNewConnectionBaseCallback( + boost::bind(&T2NServerBase::newConnectionSlot, this, _1) + ); + m_signal_client_got_new_data.connect + ( + boost::bind(&T2NServerBase::clientGotNewDataSlot, this) + ); +} // eo T2NServerBase::T2NServerBase(ServerSocketBaseImplementationPtr) + + +/** + * @brief destructor. + * + */ +T2NServerBase::~T2NServerBase() +{ + SCOPETRACKER(); +} // eo T2NServerBase::~T2NServerBase() + + +/** + * @brief returns wether the server port is opened. + * + * @return @a true iff the server port is open. + */ +bool T2NServerBase::isOpen() +{ + return (m_server_port && m_server_port->opened()); +} // eo T2NServerBase + + + +/** + * @brief callback for the port server object when a new connection is established. + * + * @param io_ptr the (shared) pointer to the new connection. + */ +void T2NServerBase::newConnectionSlot(IOImplementationPtr io_ptr) +{ + SCOPETRACKER(); + add_connection( new T2NServerConnection( get_ptr_as< T2NServerBase >(), io_ptr, get_default_timeout() ) ); +} // eo T2NServerBase::newConnectionSlot(IOImplementationPtr) + + +/** + * @brief callback for "new data available" signal + */ +void T2NServerBase::clientGotNewDataSlot() +{ + m_new_data_available= true; +} // eo T2NServerBase::clientGotNewDataSlot() + + +/** + * @brief try to fill the buffers of the managed connections. + * + * will be called by T2NServerBase::fill_buffer(). + * + * @return @a true if at least one connection buffer has new data. + */ +bool T2NServerBase::fill_connection_buffers() +{ + SCOPETRACKER(); + Backend::getBackend()->doOneStep(0); + bool result= false; + for(std::map::iterator it=connections.begin(); + it != connections.end(); + ++it) + { + T2NServerConnection *conn = dynamic_cast(it->second); + if (!conn) + { + if (it->second) + { + // react somehow if (it->second) is not NULL... + module_logger().error(HERE) << "illegal connection pointer"; + it->second->close(); + } + continue; + } + if ( conn->low_fill_buffer(false, 0) ) + { + result= true; + } + } + return result; +} // eo T2NServerBase::fill_connection_buffers() + + +/** + * @brief fills the connection buffers. + * + * Uses the AsyncIo Backend to wait for new data. + * + * @param usec_timeout the maximum time period to wait for new data (in microseconds). + * 0 returns immediately, -1 waits until some event occurred. + * @param timeout_remaining ignored! + * @return @a true if new data for at least one connection arrived. + * + * @note since this method uses the AsyncIo backend, the timeout will have only milli second + * resolution. + */ +bool T2NServerBase::fill_buffer(long long usec_timeout, long long* timeout_remaining) +{ + SCOPETRACKER(); + + if (m_new_data_available) + { + // short cut if we already know that we have new data: + m_new_data_available= false; + return true; + } + + int timeout= 0; + + if (usec_timeout<0) + { + timeout= -1; + } + else if (usec_timeout > 0) + { + long long msec_timeout= (usec_timeout + 500)/1000; + + if (msec_timeout >= INT_MAX) + { + timeout= INT_MAX; + } + else + { + timeout= (int)msec_timeout; + } + } + // not really.. but it shouldn't be used either... + if (timeout_remaining) *timeout_remaining= 0L; + + if (! fill_connection_buffers() && timeout>0) + { + bool had_activity= Backend::getBackend()->doOneStep( timeout ); + return fill_connection_buffers(); + } + return true; +} // to T2NServerBase::fill_buffer(long long,long long*) + + + +/* +** creator functions: +*/ + + +/** + * @brief creates a server object with unix domain server socket. + * @param path path of the unix domain socket. + * @param mode mode for the socket. + * @return shared pointer with the new server object; empty if none could be created.. + */ +T2NServerBasePtr createT2NUnixServerPort(const std::string& path, int mode) +{ + SCOPETRACKER(); + boost::shared_ptr< T2NUnixServer > result( new T2NUnixServer(path,mode) ); + if (!result->isOpen()) + { + module_logger().error(HERE) + << "failed to open unix domain server socket on \"" << path << "\""; + } + return result; +} // eo createT2NUnixServerPort(const std::string&,int) + + +/** + * @brief creates a client object connected to a server via unix daomain socket. + * @param path path of cthe unix domain socket. + * @return shared pointer with the new client object; empty if none could be created.. + */ +T2NClientConnectionPtr createT2NUnixClientConnection(const std::string& path) +{ + typedef IOExportWrapper< AsyncIo::UnixIOSocket > MyIo; + typedef boost::shared_ptr< MyIo > MyIoPtr; + SCOPETRACKER(); + MyIoPtr connection( new MyIo(path) ); + boost::shared_ptr< RealT2NClientConnection > result( new RealT2NClientConnection( connection ) ); + if (not result->isOpen()) + { + module_logger().error(HERE) + << "failed to open unix domain client socket on \"" << path << "\""; + return T2NClientConnectionPtr(); + } + return result; +} // eo createT2NUnixClientConnection(const std::string&) + +} // eo namespace AsyncIo diff --git a/glue_t2n/asyncio_t2n.hpp b/glue_t2n/asyncio_t2n.hpp new file mode 100644 index 0000000..0a998ce --- /dev/null +++ b/glue_t2n/asyncio_t2n.hpp @@ -0,0 +1,206 @@ +/** + * @file + * @brief the "glue" between libt2n and AsyncIo framework. + * + * contains our own server and client class which should fit into the asnychronous way of "AsyncIo". + * We use our own classes since the libt2n socket classes are made for synchronous operation + * which can lead to problems even if we import the connection fd's into "AsyncIo"... + * + * @author Reinhard Pfau \ + * + * @copyright © Copyright 2008 by Intra2net AG + * @license LGPL + * @contact info@intra2net.com + * + * @todo support for TCP/IP connections. + */ + +#ifndef __CONND_GLUE_T2N_HPP__ +#define __CONND_GLUE_T2N_HPP__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace AsyncIo +{ + +using namespace I2n; + +/** + * @brief specialized version of the libt2n client connection which fits into the AsyncIo framework. + * + */ +class T2NClientConnection +: public libt2n::client_connection +{ + public: + + T2NClientConnection(); + virtual ~T2NClientConnection(); + + bool isOpen(); + + bool reopen(bool force= false); + + + /* + ** overloaded methods from libt2n classes: + */ + + virtual void close(); + + protected: + + T2NClientConnection( AsyncIo::IOImplementationPtr connection ); + + void newDataSlot(); + + void eofSlot(); + + /* + ** overloaded methods from t2n classes: + */ + + virtual void real_write(const std::string& data); + + virtual bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL); + + protected: + + AsyncIo::IOImplementationPtr m_real_connection; + + bool m_got_new_data; + +}; // eo class T2NClientConnection + + +typedef boost::shared_ptr< T2NClientConnection > T2NClientConnectionPtr; + + +class T2NServerBase; + +typedef boost::shared_ptr< T2NServerBase > T2NServerBasePtr; +typedef boost::weak_ptr< T2NServerBase > T2NServerBaseWeakPtr; + +/** + * @brief specialized version of the libt2n server connection which fits into the AsyncIo framework. + * + */ +class T2NServerConnection +: public libt2n::server_connection +{ + friend class T2NServerBase; + public: + + T2NServerConnection(); + virtual ~T2NServerConnection(); + + + /* + ** overloaded methods from libt2n classes: + */ + + virtual void close(); + + protected: + + T2NServerConnection( + T2NServerBasePtr server, + AsyncIo::IOImplementationPtr connection, + int timeout); + + void newDataSlot(); + + void eofSlot(); + + bool low_fill_buffer(bool wait, long long usec_timeout=-1, long long* usec_timeout_remaining=NULL); + + /* + ** overloaded methods from t2n classes: + */ + + virtual void real_write(const std::string& data); + + virtual bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL); + + + protected: + + AsyncIo::IOImplementationPtr m_real_connection; + T2NServerBaseWeakPtr m_server_weak_ptr; + + bool m_got_new_data; + +}; // eo class T2NServerConnection + + + +/** + * @brief base server class for handling server ports for libt2n. + * + * Needs to be derived for the real type of connection (unix, IPv4, etc.). + * + * Does all necessary connection handling and realizes the abstract methods from + * the libt2n::server class. + */ +class T2NServerBase +: public libt2n::server +, virtual public SharedBase +{ + public: + virtual ~T2NServerBase(); + + bool isOpen(); + + /* + ** overloaded methods from t2n classes: + */ + + virtual bool fill_buffer(long long usec_timeout=-1, long long* timeout_remaining=NULL); + + public: + + boost::signal< void() > m_signal_client_got_new_data; + + protected: + + T2NServerBase( AsyncIo::ServerSocketBaseImplementationPtr server_port); + + void newConnectionSlot(AsyncIo::IOImplementationPtr io_ptr); + + void clientGotNewDataSlot(); + + /* + ** overloaded methods from t2n classes: + */ + + virtual bool fill_connection_buffers(void); + + + protected: + + AsyncIo::ServerSocketBaseImplementationPtr m_server_port; + bool m_new_data_available; + +}; // eo T2NServerBase + + +typedef boost::shared_ptr< T2NServerBase > T2NServerBasePtr; + + +T2NServerBasePtr createT2NUnixServerPort(const std::string& path, int mode= 0600); + +T2NClientConnectionPtr createT2NUnixClientConnection(const std::string& path); + + +} // eo namespace AsyncIo + +#endif diff --git a/templates/cpp b/templates/cpp index e89ea90..1e7c72f 100644 --- a/templates/cpp +++ b/templates/cpp @@ -5,7 +5,7 @@ * @author Reinhard Pfau \ * * @copyright © Copyright 2008 Intra2Net AG - * @license commercial + * @license LGPL * @contact info@intra2net.com * */ diff --git a/templates/hpp b/templates/hpp index e89ea90..6b422d3 100644 --- a/templates/hpp +++ b/templates/hpp @@ -5,7 +5,7 @@ * @author Reinhard Pfau \ * * @copyright © Copyright 2008 Intra2Net AG - * @license commercial + * @license LPGL * @contact info@intra2net.com * */ diff --git a/unittest/Makefile.am b/unittest/Makefile.am index 9466d50..00320f0 100644 --- a/unittest/Makefile.am +++ b/unittest/Makefile.am @@ -1,9 +1,9 @@ -INCLUDES = -I$(top_srcdir)/. -I$(top_srcdir)/common -I$(top_srcdir)/connd \ - -I$(top_srcdir)/simpleio @BOOST_CPPFLAGS@ @CPPUNIT_CFLAGS@ @LIBI2NCOMMON_CFLAGS@ +INCLUDES = -I$(top_srcdir)/. -I$(top_srcdir)/asyncio -I$(top_srcdir)/utils \ + @BOOST_CPPFLAGS@ @CPPUNIT_CFLAGS@ @LIBI2NCOMMON_CFLAGS@ METASOURCES = AUTO check_PROGRAMS = testsimpleio testsimpleio_SOURCES = test.cpp test_simpleio_basics.cpp -testsimpleio_LDADD = $(top_builddir)/simpleio/libsimpleio.la @BOOST_LDFLAGS@ \ - @BOOST_SIGNALS_LIB@ @CPPUNIT_LIBS@ @LIBI2NCOMMON_LIBS@ +testsimpleio_LDADD = $(top_builddir)/utils/libasyncio_utils.la \ + $(top_builddir)/asyncio/libasyncio.la @BOOST_LDFLAGS@ @BOOST_SIGNALS_LIB@ @CPPUNIT_LIBS@ @LIBI2NCOMMON_LIBS@ TESTS = testsimpleio diff --git a/unittest/test_simpleio_basics.cpp b/unittest/test_simpleio_basics.cpp index b791e33..5abc582 100644 --- a/unittest/test_simpleio_basics.cpp +++ b/unittest/test_simpleio_basics.cpp @@ -37,7 +37,7 @@ using namespace I2n; -using namespace I2n::AsyncIo; +using namespace AsyncIo; using namespace CppUnit; diff --git a/utils/Makefile.am b/utils/Makefile.am new file mode 100644 index 0000000..f0e9e6a --- /dev/null +++ b/utils/Makefile.am @@ -0,0 +1,18 @@ +INCLUDES = @BOOST_CPPFLAGS@ +METASOURCES = AUTO +lib_LTLIBRARIES = libasyncio_utils.la +include_HEADERS = asyncio_ptr_list.hpp asyncio_utils.hpp asyncio_time_tools.hpp +libasyncio_utils_la_SOURCES = asyncio_time_tools.cpp asyncio_utils.cpp + +libasyncio_utils_la_LDFLAGS = -version-info @LIBASYNCIO_LIB_VERSION@ + +pkgconfigdir = $(libdir)/pkgconfig +pkgconfig_DATA= libasyncio_utils.pc + + +$(top_srcdir)/headerlist.utils: $(include_HEADERS) + list='$(include_HEADERS)'; \ + ( for n in $$list; do echo $(includedir)/$$n ; done) > $@ + +.PHONY: headerlist +headerlist: $(top_srcdir)/headerlist.utils diff --git a/utils/asyncio_ptr_list.hpp b/utils/asyncio_ptr_list.hpp new file mode 100644 index 0000000..1a36483 --- /dev/null +++ b/utils/asyncio_ptr_list.hpp @@ -0,0 +1,221 @@ +/** + * @file + * @brief a pointer list + * + * provides a pointer list which is prepared for being used in nested + * iterator loops. + * + * @author Reinhard Pfau \ + * + * @copyright © Copyright 2008 Intra2Net AG + * @license LGPL + * @contact info@intra2net.com + * + */ + +#ifndef __ASYNCIO__PTR_LIST_HPP__ +#define __ASYNCIO__PTR_LIST_HPP__ + +#include +#include + + +namespace AsyncIo +{ +namespace Utils +{ + + +/** + * @brief provides a global instance counter. + * + * The global instance counter must be initialized by the module which uses it. + * The main purpose for this is to keep track of the instances when PtrList's are + * used as global objects. + */ +template< class T, bool globalcounted > +class GlobalCounted +{ + static int InstanceCount; + + public: + + GlobalCounted() + { + ++InstanceCount; + } // eo GlobalCounted + + + virtual ~GlobalCounted() + { + --InstanceCount; + } // eo ~GlobalCounted + + static int Instances() + { + return InstanceCount; + } +}; // eo GlobalCounted + + +template<> +class GlobalCounted< class T, false > +{ + public: + GlobalCounted() + { + } // eo GlobalCounted + + + virtual ~GlobalCounted() + { + } // eo ~GlobalCounted + +}; // eo GlobalCounted + + + +/** + * @brief pointer list which supports deletion by set to NULL. + * + * This list can be used in nested iterator loops as long as some conditions + * are fulfilled: + * - inside the loops each iterator value is tested for not get NULL. + * - cleaning the list is only allowed when the outmost loop exits. + * . + * + * + * @tparam T type of the pointers. + * @tparam globalcounted determine if a global instance counter should be included. + */ +template +class PtrList +: public GlobalCounted< PtrList< T, globalcounted >, globalcounted > +, protected std::list +{ + typedef std::list inherited; + + public: + typedef GlobalCounted< PtrList< T, globalcounted >, globalcounted > GlobalCountType; + + typedef typename inherited::iterator iterator; + typedef typename inherited::const_iterator const_iterator; + + public: + + PtrList() + : Dirty(false) + { + } // eo PtrList + + + ~PtrList() + { + } // eo ~PtrList + + + /** + * @brief add a new item pointer to the list. + * + * @param item item pointer which should be added + */ + void add_item(T* item) + { + typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item); + if (it != inherited::end()) + { + // nothing to do since item is already in the list + return; + } + push_back(item); + } // eo add + + + /** + * @brief remove an item pointer from the list by setting to NULL. + * + * This sets the pointer in the list to NULL and marks the list as dirty. + * + * @param item the io object which should be removed from the list. + */ + void remove_item(T* item) + { + typename inherited::iterator it= std::find(inherited::begin(), inherited::end(), item); + if (it == inherited::end()) + { + // nothing to do: + return; + } + *it = NULL; // forget the pointer + Dirty= true; // ..and mark the list as dirty (i.e. has NULL elements) + } // eo remove + + + /** + * @brief cleans the list of objects by removing the NULL elements (if any). + * + * @note this function should only be called when it is ensured that no + * other functions using iterators of this list. + */ + void clean_list() + { + if (!Dirty) + { + // nothing to do + return; + } + // remove the NULL elements now: + erase( + std::remove( inherited::begin(), inherited::end(), (T*)NULL), + inherited::end() ); + Dirty= false; + } // eo clean_list + + + /** + * @brief explicit set the list dirty. + * + * use this to mark the list as dirty, when an item is set to NULL via + * a non const iterator. + * + * @note This is really dirty; better use remove_item(T*) in that cases! + */ + void set_dirty() + { + Dirty= true; + } // eo set_dirty + + + iterator begin() + { + return inherited::begin(); + } + + + iterator end() + { + return inherited::end(); + } + + + const_iterator begin() const + { + return inherited::begin(); + } + + + const_iterator end() const + { + return inherited::end(); + } + + protected: + bool Dirty; + +}; // eo class PtrList + + +} // eo namespace Utils +} // eo namespace AsyncIo + +#endif diff --git a/utils/asyncio_time_tools.cpp b/utils/asyncio_time_tools.cpp new file mode 100644 index 0000000..a89a24c --- /dev/null +++ b/utils/asyncio_time_tools.cpp @@ -0,0 +1,214 @@ +/** + * @file + * @brief + * + * @author Reinhard Pfau \ + * + * @copyright © Copyright 2008 Intra2Net AG + * @license LGPL + * @contact info@intra2net.com + * + */ + +#include "asyncio_time_tools.hpp" + +#include +#include +#include +#include + + +// define missing POSIX.1b constants... + +#ifndef CLOCK_REALTIME +#define CLOCK_REALTIME 0 +#endif +#ifndef CLOCK_MONOTONIC +#define CLOCK_MONOTONIC 1 +#endif + +namespace AsyncIo +{ +namespace Utils +{ + + +namespace +{ + + +/** + * @brief fetches the value from the monotonic clock source. + * @param[out] seconds the seconds. + * @param[out] nano_seconds the nano seconds. + * @return @a true if the clock was successfully read. + */ +bool monotonic_clock_gettime(long int& seconds, long int& nano_seconds) +{ + struct timespec tp[1]; + int res= ::syscall(__NR_clock_gettime, CLOCK_MONOTONIC, tp); + if (0 == res) + { + seconds= tp->tv_sec; + nano_seconds= tp->tv_nsec; + } + return (res==0); +} // eo monotonic_clock_gettime(long int&,long int&) + + + + +void get_current_real_time(long& current_sec, long& current_msec) +{ + struct timeval tv; + gettimeofday(&tv,NULL); + current_sec= tv.tv_sec; + current_msec= (tv.tv_usec / 1000); + if (current_msec >= 1000) + { + current_sec += (current_msec / 1000); + current_msec%= 1000; + } +} // eo get_current_real_time + + +void get_current_monotonic_time(long& current_sec, long& current_msec) +{ + long nsec; + if (monotonic_clock_gettime(current_sec,nsec)) + { + current_msec= nsec / 1000000L; + } + else + { + //fallback... + get_current_real_time(current_sec,current_msec); + } +} // eo get_current_monotonic_time + + + +} // eo anonymous namespace + +/**************************************************************************\ +\**************************************************************************/ + + +/* + * implementation of MilliTime + */ + +MilliTime::MilliTime(long sec, long msec) +: mt_sec(sec), mt_msec(msec) +{ + normalize(); +} // eo MilliTime::MilliTime + + +void MilliTime::set(long sec, long msec) +{ + mt_sec= sec; + mt_msec= msec; + normalize(); +} // eo MilliTime::set + + +/** + * normalizes the values, so that mt_msec has a value between 0 and 999. + */ +void MilliTime::normalize() +{ + if (mt_msec < 0) + { + mt_sec += (mt_msec / 1000) - 1; + mt_msec = (mt_msec % 1000) + 1000; + } + else if (mt_msec>=1000) + { + mt_sec+= (mt_msec / 1000); + mt_msec %= 1000; + } +} // eo MilliTime::normalize + + +/** + * determine if the represented point in time is before another one. + * @param other the other point in time. + * @return true if the own point in time is before the other one. + */ +bool MilliTime::operator < (MilliTime& other) +{ + normalize(); + other.normalize(); + return + (mt_sec < other.mt_sec) + || (( mt_sec == other.mt_sec) && (mt_msec < other.mt_msec)); +} // eo MilliTime::operator < + + +/** + * determine if two point in times are equal. + * @param other the point in time to compare with. + * @return true if the represented times are equal. + */ +bool MilliTime::operator == (MilliTime& other) +{ + normalize(); + other.normalize(); + return (( mt_sec == other.mt_sec) && (mt_msec == other.mt_msec)); +} // eo MilliTime::operator < + +/** + * @brief subtracts a time delta from the object. + * @param lhs the time delta to subtract. + * @return reference to the object itself. + */ +MilliTime& MilliTime::operator -= (const MilliTime& lhs) +{ + mt_sec -= lhs.mt_sec; + mt_msec -= lhs.mt_msec; +} // eo operator -= + + +/** + * @brief adds a time delta from the object. + * @param lhs the time delta to add. + * @return reference to the object itself. + */ +MilliTime& MilliTime::operator += (const MilliTime& lhs) +{ + mt_sec += lhs.mt_sec; + mt_msec += lhs.mt_msec; +} // eo operator += + + + +/** + * @brief gets the current time as MilliTime structure. + * @param[out] mt reference to the MilliTime strcucture which is filled with the result. + */ +void get_current_real_time(MilliTime& mt) +{ + long sec, msec; + get_current_real_time(sec,msec); + mt.set(sec,msec); +} // eo get_current_real_time + + +/** + * @brief gets the current time as MilliTime structure. + * @param[out] mt reference to the MilliTime strcucture which is filled with the result. + */ +void get_current_monotonic_time(MilliTime& mt) +{ + long sec, msec; + get_current_monotonic_time(sec,msec); + mt.set(sec,msec); +} // eo get_current_monotonic_time + + + + + +} // eo namespace Utils +} // eo namespace AsyncIo diff --git a/utils/asyncio_time_tools.hpp b/utils/asyncio_time_tools.hpp new file mode 100644 index 0000000..38cab2b --- /dev/null +++ b/utils/asyncio_time_tools.hpp @@ -0,0 +1,83 @@ +/** + * @file + * @brief + * + * @author Reinhard Pfau \ + * + * @copyright © Copyright 2008 Intra2Net AG + * @license LPGL + * @contact info@intra2net.com + * + */ + +#ifndef __ASYNCIO__TIME_TOOLS_HPP__ +#define __ASYNCIO__TIME_TOOLS_HPP__ + +namespace AsyncIo +{ +namespace Utils +{ + +/** + * @brief structure for storing (a point in time as) seconds and milliseconds. + */ +struct MilliTime +{ + long mt_sec; + long mt_msec; + + MilliTime(long sec=0, long msec=0); + + void set(long sec, long msec=0); + + inline long long get_milliseconds() const + { + return ((long long)mt_sec * 1000L + mt_msec); + } // eo get_milliseconds + + void normalize(); + + bool operator < (MilliTime& other); + bool operator == (MilliTime& other); + + MilliTime& operator -= (const MilliTime& lhs); + MilliTime& operator += (const MilliTime& lhs); + +}; // eo struct MilliTime + + +inline MilliTime operator + (const MilliTime& rhs, const MilliTime& lhs) +{ + MilliTime t(rhs); + return t+= lhs; +} // eo operator + (const MilliTime& rhs, const MilliTime lhs) + +inline MilliTime operator - (const MilliTime& rhs, const MilliTime& lhs) +{ + MilliTime t(rhs); + return t-= lhs; +} // eo operator - (const MilliTime& rhs, const MilliTime lhs) + + +inline bool operator <= (MilliTime& rhs, MilliTime& lhs) +{ + return (rhs + * + * @copyright © Copyright 2008 Intra2Net AG + * @license LGPL + * @contact info@intra2net.com + * + */ + +namespace AsyncIo +{ +namespace Utils +{ + + + +} // eo namespace Utils +} // eo namespace AsyncIo diff --git a/utils/asyncio_utils.hpp b/utils/asyncio_utils.hpp new file mode 100644 index 0000000..a8e0e93 --- /dev/null +++ b/utils/asyncio_utils.hpp @@ -0,0 +1,15 @@ +/** + * @file + * @brief convenience file for including all util header at once. + * + * @author Reinhard Pfau \ + * + * @copyright © Copyright 2008 Intra2Net AG + * @license LPGL + * @contact info@intra2net.com + * + */ + + +#include "asyncio_ptr_list.hpp" +#include "asyncio_time_tools.hpp" diff --git a/utils/libasyncio_utils.pc.in b/utils/libasyncio_utils.pc.in new file mode 100644 index 0000000..7954ef6 --- /dev/null +++ b/utils/libasyncio_utils.pc.in @@ -0,0 +1,10 @@ +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + +Name: libasyncio_utils +Description: utils for the asynchrounous io lib +Version: @VERSION@ +Libs: -L${libdir} -lasyncio_utils +Cflags: -I${includedir} -- 1.7.1