From: Reinhard Pfau Date: Tue, 21 Oct 2008 08:04:26 +0000 (+0000) Subject: libasyncio: (reinhard) more renaming; compiles now. X-Git-Tag: v0.3~92 X-Git-Url: http://developer.intra2net.com/git/?a=commitdiff_plain;h=42b7c46d505c1f2355514e88aa880a088f738a48;p=libasyncio libasyncio: (reinhard) more renaming; compiles now. --- diff --git a/asyncio/Makefile.am b/asyncio/Makefile.am index 2694ef2..4e25b19 100644 --- a/asyncio/Makefile.am +++ b/asyncio/Makefile.am @@ -1,13 +1,13 @@ INCLUDES = @LIBI2NCOMMON_CFLAGS@ @BOOST_CPPFLAGS@ METASOURCES = AUTO -lib_LTLIBRARIES = libsimpleio.la -libsimpleio_la_SOURCES = simplecallout.cpp simpleio.cpp simplepipe.cpp \ - simpleprocess.cpp simplesocket.cpp simpletimer.cpp -include_HEADERS = simplecallout.hpp simpleio.hpp simplepipe.hpp \ - simpleprocess.hpp simplesocket.hpp simpletimer.hpp -libsimpleio_la_LIBADD = @LIBI2NCOMMON_LIBS@ @BOOST_LDFLAGS@ @BOOST_SIGNALS_LIB@ +lib_LTLIBRARIES = libasyncio.la +libasyncio_la_SOURCES = async_callout.cpp async_io.cpp async_pipe.cpp \ + async_process.cpp async_socket.cpp async_timer.cpp +include_HEADERS = async_callout.hpp async_io.hpp async_pipe.hpp \ + async_process.hpp async_socket.hpp async_timer.hpp +libasyncio_la_LIBADD = @LIBI2NCOMMON_LIBS@ @BOOST_LDFLAGS@ @BOOST_SIGNALS_LIB@ -libsimpleio_la_LDFLAGS = -version-info @LIBSIMPLEIO_LIB_VERSION@ +libasyncio_la_LDFLAGS = -version-info @LIBASYNCIO_LIB_VERSION@ pkgconfigdir=$(libdir)/pkgconfig -pkgconfig_DATA= libsimpleio.pc +pkgconfig_DATA= libasyncio.pc diff --git a/asyncio/simplecallout.cpp b/asyncio/async_callout.cpp similarity index 98% rename from asyncio/simplecallout.cpp rename to asyncio/async_callout.cpp index 13f5c63..976d3ed 100644 --- a/asyncio/simplecallout.cpp +++ b/asyncio/async_callout.cpp @@ -2,12 +2,12 @@ * @file * * @copyright © Copyright 2008 by Intra2net AG - * @license commercial + * @license LGPL * * info@intra2net.com */ -#include "simplecallout.hpp" +#include "async_callout.hpp" #include #include @@ -15,9 +15,7 @@ #include -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { namespace @@ -332,5 +330,4 @@ CallOutId frozenCall( boost::function< void() > f, float delta_sec ) } // eo frozenCall(boost::function< void() >,float) -} // eo namespace SimpleIo -} // eo namespace I2n +} // eo namespace AsyncIo diff --git a/asyncio/simplecallout.hpp b/asyncio/async_callout.hpp similarity index 93% rename from asyncio/simplecallout.hpp rename to asyncio/async_callout.hpp index c895bbc..93b507b 100644 --- a/asyncio/simplecallout.hpp +++ b/asyncio/async_callout.hpp @@ -4,24 +4,22 @@ * * * @copyright © Copyright 2008 by Intra2net AG - * @license commercial + * @license LGPL * * info@intra2net.com */ -#ifndef __SIMPLEIO_SIMPLECALLOUT_HPP_ -#define __SIMPLEIO_SIMPLECALLOUT_HPP_ +#ifndef ___ASYNC_CALLOUT_HPP__ +#define ___ASYNC_CALLOUT_HPP__ -#include "simpleio.hpp" +#include "async_io.hpp" #include #include #include -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { // forward declarations: @@ -152,7 +150,6 @@ template<> CallOutId frozenCall( boost::function< void() > f, int delta_sec ); bool removeCallOut( CallOutId& id ); -} // eo namespace SimpleIo -} // eo namespace I2n +} // eo namespace AsyncIo #endif diff --git a/asyncio/simpleio.cpp b/asyncio/async_io.cpp similarity index 98% rename from asyncio/simpleio.cpp rename to asyncio/async_io.cpp index 9bf0cad..28f7b77 100644 --- a/asyncio/simpleio.cpp +++ b/asyncio/async_io.cpp @@ -2,13 +2,13 @@ * * * @copyright Copyright © 2007-2008 by Intra2net AG - * @license commercial + * @license LGPL * @contact info@intra2net.com */ //#define NOISEDEBUG -#include "simpleio.hpp" +#include "async_io.hpp" #include #include @@ -99,7 +99,7 @@ struct PollFd : public ::pollfd typedef std::vector PollVector; typedef std::map FdPollMap; -typedef std::map FdIOMap; +typedef std::map FdIOMap; /** @@ -112,8 +112,8 @@ struct PollDataCluster FdIOMap m_read_fd_io_map; FdIOMap m_write_fd_io_map; - void add_read_fd( int fd, I2n::SimpleIo::IOImplementation* io); - void add_write_fd( int fd, I2n::SimpleIo::IOImplementation* io); + void add_read_fd( int fd, AsyncIo::IOImplementation* io); + void add_write_fd( int fd, AsyncIo::IOImplementation* io); pollfd* get_pollfd_ptr(); unsigned int get_num_pollfds() const; @@ -205,8 +205,8 @@ class PtrList : public std::list }; // eo class PtrList -typedef PtrList IOList; -typedef PtrList TimerList; +typedef PtrList IOList; +typedef PtrList TimerList; template<> int IOList::Instances= 0; template<> int TimerList::Instances= 0; @@ -242,7 +242,7 @@ TimerList& g_timer_list() * @param fd the file descriptor. * @param io the io object which uses the fd for reading. */ -void PollDataCluster::add_read_fd( int fd, I2n::SimpleIo::IOImplementation* io) +void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io) { FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd); if (itPollMap != m_fd_poll_map.end()) @@ -266,7 +266,7 @@ void PollDataCluster::add_read_fd( int fd, I2n::SimpleIo::IOImplementation* io) * @param fd the file descriptor. * @param io the io object which uses the fd for writing. */ -void PollDataCluster::add_write_fd( int fd, I2n::SimpleIo::IOImplementation* io) +void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io) { FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd); if (itPollMap != m_fd_poll_map.end()) @@ -315,13 +315,13 @@ unsigned int PollDataCluster::get_num_pollfds() const */ struct FilterMatch { - I2n::SimpleIo::FilterBasePtr m_filter; + AsyncIo::FilterBasePtr m_filter; - FilterMatch(I2n::SimpleIo::FilterBasePtr filter) + FilterMatch(AsyncIo::FilterBasePtr filter) : m_filter(filter) {} - bool operator () (const I2n::SimpleIo::FilterBasePtr& item) + bool operator () (const AsyncIo::FilterBasePtr& item) { return item && item == m_filter; } @@ -364,9 +364,7 @@ void get_current_monotonic_time(long& current_sec, long& current_msec) -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { @@ -1691,5 +1689,4 @@ void Backend::stop() -} // eo namespace SimpleIo -} // eo namespace I2n +} // eo namespace AsyncIo diff --git a/asyncio/simpleio.hpp b/asyncio/async_io.hpp similarity index 98% rename from asyncio/simpleio.hpp rename to asyncio/async_io.hpp index d3727be..fb2278f 100644 --- a/asyncio/simpleio.hpp +++ b/asyncio/async_io.hpp @@ -3,7 +3,7 @@ * @brief simple basic IO handling. * * @copyright © Copyright 2007-2008 by Intra2net AG - * @license commercial + * @license LGPL * @contact info@intra2net.com * * Deals with POSIX file descriptors; provides an additional abstraction @@ -11,8 +11,8 @@ * Also provides basic functionality for dealing with timer events. */ -#ifndef __I2N_SIMPLEIO_HPP__ -#define __I2N_SIMPLEIO_HPP__ +#ifndef __ASYNC_IO_HPP__ +#define __ASYNC_IO_HPP__ #include #include @@ -24,11 +24,11 @@ #include -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { +using namespace I2n; + /* * forward declarations @@ -457,7 +457,6 @@ void get_current_real_time(MilliTime& mt); void get_current_monotonic_time(MilliTime& mt); -} // eo namespace SimpleIo -} // eo namespace I2n +} // eo namespace AsyncIo #endif diff --git a/asyncio/simplepipe.cpp b/asyncio/async_pipe.cpp similarity index 93% rename from asyncio/simplepipe.cpp rename to asyncio/async_pipe.cpp index 4eba4bc..086f579 100644 --- a/asyncio/simplepipe.cpp +++ b/asyncio/async_pipe.cpp @@ -5,7 +5,7 @@ * info@intra2net.com */ -#include "simplepipe.hpp" +#include "async_pipe.hpp" #include #include @@ -16,9 +16,7 @@ #include -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { @@ -94,5 +92,4 @@ void SimplePipe::slotReceived() } // eo SimplePipe::slotReceived() -} // eo namespace SimpleIo -} // eo namespace I2n +} // eo namespace AsyncIo diff --git a/asyncio/simplepipe.hpp b/asyncio/async_pipe.hpp similarity index 71% rename from asyncio/simplepipe.hpp rename to asyncio/async_pipe.hpp index 80b7cc6..113614a 100644 --- a/asyncio/simplepipe.hpp +++ b/asyncio/async_pipe.hpp @@ -5,14 +5,12 @@ * info@intra2net.com */ -#ifndef _SIMPLEIO_SIMPLEPIPE_HPP_ -#define _SIMPLEIO_SIMPLEPIPE_HPP_ +#ifndef __ASYNC_PIPE_HPP__ +#define __ASYNC_PIPE_HPP__ -#include "simpleio.hpp" +#include "async_io.hpp" -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { @@ -37,8 +35,7 @@ class SimplePipe : public IOImplementation }; // eo SimplePipe -} // eo namespace SimpleIo -} // eo namespace I2n +} // eo namespace AsyncIo #endif diff --git a/asyncio/simpleprocess.cpp b/asyncio/async_process.cpp similarity index 99% rename from asyncio/simpleprocess.cpp rename to asyncio/async_process.cpp index 8e3754d..aa0fb9a 100644 --- a/asyncio/simpleprocess.cpp +++ b/asyncio/async_process.cpp @@ -8,7 +8,7 @@ //#define NOISEDEBUG -#include "simpleprocess.hpp" +#include "async_process.hpp" #include #include @@ -40,7 +40,7 @@ namespace { -using namespace I2n::SimpleIo; +using namespace AsyncIo; /** * local configuration values @@ -195,9 +195,7 @@ struct FdCloser } // eo namespace -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { @@ -813,5 +811,4 @@ void ProcessManager::execute() } // eo ProcessManager::execute -} // eo namespace SimpleIo -} // eo namespace I2n +} // eo namespace AsyncIo diff --git a/asyncio/simpleprocess.hpp b/asyncio/async_process.hpp similarity index 96% rename from asyncio/simpleprocess.hpp rename to asyncio/async_process.hpp index d42c755..de4b973 100644 --- a/asyncio/simpleprocess.hpp +++ b/asyncio/async_process.hpp @@ -7,8 +7,8 @@ * info@intra2net.com */ -#ifndef _CONND_SIMPLEPROCESS_HPP_ -#define _CONND_SIMPLEPROCESS_HPP_ +#ifndef __ASYNC_PROCESS_HPP__ +#define __ASYNC_PROCESS_HPP__ #include #include @@ -17,12 +17,10 @@ #include #include -#include "simpleio.hpp" +#include "async_io.hpp" -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { using SystemTools::Signal; @@ -192,7 +190,6 @@ bool installChildHandler(); bool restoreChildHandler(); -} // eo namespace SimpleIo -} // eo I2n +} // eo namespace AsyncIo #endif diff --git a/asyncio/simplesocket.cpp b/asyncio/async_socket.cpp similarity index 98% rename from asyncio/simplesocket.cpp rename to asyncio/async_socket.cpp index def93c0..b4bb193 100644 --- a/asyncio/simplesocket.cpp +++ b/asyncio/async_socket.cpp @@ -7,7 +7,7 @@ * @todo unlink unix server socket on close. */ -#include "simplesocket.hpp" +#include "async_socket.hpp" #include #include #include @@ -18,9 +18,7 @@ #include -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { @@ -393,5 +391,4 @@ UnixIOSocketPtr UnixServerSocketBase::createIOSocket( -}// eo namespace SimpleIo -}// eo namespace I2n +}// eo namespace AsyncIo diff --git a/asyncio/simplesocket.hpp b/asyncio/async_socket.hpp similarity index 96% rename from asyncio/simplesocket.hpp rename to asyncio/async_socket.hpp index 745c787..1d60aa3 100644 --- a/asyncio/simplesocket.hpp +++ b/asyncio/async_socket.hpp @@ -1,5 +1,5 @@ /** @file - * @brief socket classes for the SimpleIo framework. + * @brief socket classes for the AsyncIo framework. * * * (c) Copyright 2008 by Intra2net AG @@ -10,7 +10,7 @@ #ifndef __SIMPLEIO__SIMPLESOCKET_HPP__ #define __SIMPLEIO__SIMPLESOCKET_HPP__ -#include "simpleio.hpp" +#include "async_io.hpp" #include #include @@ -21,9 +21,7 @@ #include -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { @@ -208,8 +206,7 @@ class UnixServerSocket }; // eo class UnixServerSocket -}// eo namespace SimpleIo -}// eo namespace I2n +}// eo namespace AsyncIo #endif diff --git a/asyncio/simpletimer.cpp b/asyncio/async_timer.cpp similarity index 95% rename from asyncio/simpletimer.cpp rename to asyncio/async_timer.cpp index f716981..67d7fd3 100644 --- a/asyncio/simpletimer.cpp +++ b/asyncio/async_timer.cpp @@ -8,7 +8,7 @@ //#define NOISEDEBUG -#include "simpletimer.hpp" +#include "async_timer.hpp" #ifdef NOISEDEBUG @@ -24,9 +24,7 @@ #endif -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { @@ -103,5 +101,4 @@ void SimpleTimer::stopTimer() } // eo SimpleTimer::stopTimer -} // eo namespace SimpleIo -} // eo namespace I2n +} // eo namespace AsyncIo diff --git a/asyncio/simpletimer.hpp b/asyncio/async_timer.hpp similarity index 86% rename from asyncio/simpletimer.hpp rename to asyncio/async_timer.hpp index 64b3d41..a2b2dc5 100644 --- a/asyncio/simpletimer.hpp +++ b/asyncio/async_timer.hpp @@ -7,16 +7,14 @@ * info@intra2net.com */ -#ifndef _SIMPLEIO_SIMPLETIMER_HPP_ -#define _SIMPLEIO_SIMPLETIMER_HPP_ +#ifndef __ASYNC_TIMER_HPP__ +#define __ASYNC_TIMER_HPP__ -#include "simpleio.hpp" +#include "async_io.hpp" #include -namespace I2n -{ -namespace SimpleIo +namespace AsyncIo { @@ -56,7 +54,6 @@ class SimpleTimer : public TimerBase }; // eo class SimpleTimer -} // eo namespace SimpleIo -} // eo namespace I2n +} // eo namespace AsyncIo #endif diff --git a/asyncio/libsimpleio.pc.in b/asyncio/libasyncio.pc.in similarity index 100% copy from asyncio/libsimpleio.pc.in copy to asyncio/libasyncio.pc.in diff --git a/glue_t2n/async_io_t2n.cpp b/glue_t2n/async_io_t2n.cpp new file mode 100644 index 0000000..cb5f0d8 --- /dev/null +++ b/glue_t2n/async_io_t2n.cpp @@ -0,0 +1,958 @@ +/** + * @file + * + * @author Reinhard Pfau \ + * + * @copyright © Copyright 2008 by Intra2net AG + * @license LGPL + * @contact info@intra2net.com + */ + +#include "async_io_t2n.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace AsyncIo +{ + + +namespace +{ + + +Logger::PartLogger& module_logger() +{ + static Logger::PartLogger _module_logger(HERE); + return _module_logger; +} // eo module_logger(); + + + +/** + * @brief a class with some methods we like to have on our io classes. + * + * This class is to be used as second base in a wrapper class and needs it's methods to + * be redefined for "the real thing". + */ +class IOExportWrapperBase +{ + public: + + IOExportWrapperBase() + { + } + + virtual ~IOExportWrapperBase() + { + } + + virtual void sendData(const std::string& data) + { + } + + + virtual std::string receiveData() + { + return std::string(); + } + + virtual boost::signals::connection connectEof( const boost::function< void() >& func ) + { + return boost::signals::connection(); + } + + virtual boost::signals::connection connectRead( const boost::function< void() >& func ) + { + return boost::signals::connection(); + } + +}; // eo class IOExportWrapperBase + + +typedef boost::shared_ptr< IOExportWrapperBase > IOExportWrapperBasePtr; + + +/** + * @brief IO wrapper template. + * @tparam IOClass a type based on AsyncIo::IOImplementation + * + * The type is used as a public base for the resulting class; the second public base is our + * helper with the additional methods we need internally and which we (finally) define here. + */ +template< + class IOClass +> +class IOExportWrapper +: public IOClass +, public IOExportWrapperBase +{ + BOOST_STATIC_ASSERT(( boost::is_base_of< IOImplementation,IOClass >::value )); + + public: + IOExportWrapper() + { + } + + template< + typename Arg1 + > + IOExportWrapper(Arg1 arg1) + : IOClass(arg1) + {} + + + template< + typename Arg1, typename Arg2 + > + IOExportWrapper(Arg1 arg1, Arg2 arg2) + : IOClass(arg1,arg2) + {} + + + template< + typename Arg1, typename Arg2, typename Arg3 + > + IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3) + : IOClass(arg1,arg2,arg3) + {} + + + template< + typename Arg1, typename Arg2, typename Arg3, typename Arg4 + > + IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4) + : IOClass(arg1,arg2,arg3,arg4) + {} + + + template< + typename Arg1, typename Arg2, typename Arg3, typename Arg4, typename Arg5 + > + IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4, Arg5 arg5) + : IOClass(arg1,arg2,arg3,arg4,arg5) + {} + + + /** + * @brief exposed funtion for sending data. + * @param data the chunk to be send. + */ + virtual void sendData(const std::string& data) + { + IOClass::lowSend(data); + } + + /** + * @brief returns the new received data. + * @return the receievd data. + * + * Clears the receive buffer. + */ + virtual std::string receiveData() + { + std::string result; + result.swap(IOClass::m_input_buffer); + return result; + } + + /** + * @brief exposed connect to EOF signal. + * @param func the function which should be connected to the eof signal. + * @return signal connection handle. + */ + virtual boost::signals::connection connectEof( const boost::function< void() >& func ) + { + return IOClass::m_signal_eof.connect(func); + } + + /** + * @brief exposed connect to "read" signal. + * @param func the function which should be connected to the "read" signal. + * @return signal connection handle. + */ + virtual boost::signals::connection connectRead( const boost::function< void() >& func ) + { + return IOClass::m_signal_read.connect(func); + } + + protected: + +}; // eo class IOExportWrapper + + +/* +** specialized versions of io classes: +*/ + +/** + * @brief enhanced unix domain socket class with reconnect feature. + * + * Used for t2n client connections. + */ +class T2nUnixIOSocket +: public AsyncIo::UnixIOSocket +{ + typedef AsyncIo::UnixIOSocket inherited; + public: + T2nUnixIOSocket( const std::string& path ); + + virtual void close(AsyncIo::Direction direction = AsyncIo::Direction::both); + + bool reopen(bool force= false); + + protected: + + virtual void doRead(); + + + protected: + + bool m_in_do_read; + bool m_may_reconnect; + +}; // T2nUnixIOSocket + + +T2nUnixIOSocket::T2nUnixIOSocket(const std::string& path) +: inherited(path) +, m_in_do_read(false) +, m_may_reconnect(false) +{ +} // eo T2nUnixIOSocket::T2nUnixIOSocket(const std::string&) + + +void T2nUnixIOSocket::close(AsyncIo::Direction direction) +{ + bool was_open= opened(); + inherited::close(direction); + if (m_in_do_read and not opened()) + { + m_may_reconnect= was_open; + } +} // eo T2nUnixIOSocket::close(AsyncIo::Direction) + + +bool T2nUnixIOSocket::reopen(bool force) +{ + if (m_path.empty()) + { + return false; + } + if (m_may_reconnect || force) + { + return inherited::open( m_path ); + } + return false; +} // eo T2nUnixIOSocket::reopen() + + +void T2nUnixIOSocket::doRead() +{ + m_in_do_read= true; + try + { + inherited::doRead(); + } + catch (...) + { + m_in_do_read= false; + throw; + } + m_in_do_read= false; +} // eo T2nUnixIOSocket::doRead() + + +/** + * @brief server class for libt2n using unix domain sockets. + * + * well, it's enough to provide an appropriate constructor. + * (did i mention that templates are really cool stuff? :-) ) + */ +class T2NUnixServer +: public T2NServerBase +{ + public: + + T2NUnixServer(const std::string& path, int mode=0600) + : T2NServerBase( ServerSocketBaseImplementationPtr( + new UnixServerSocket< + IOExportWrapper< UnixIOSocket > + >(path, mode) + ) ) + { + } // eo T2NServerBase + +}; // eo T2NUnixServer + + + +class RealT2NClientConnection +: public T2NClientConnection +{ + public: + RealT2NClientConnection( AsyncIo::IOImplementationPtr connection ) + : T2NClientConnection(connection) + { + } +}; // eo class T2NClient + +} // eo namespace + + + +/* +** implementation of T2NClientConnection +*/ + + +T2NClientConnection::T2NClientConnection( + IOImplementationPtr connection +) +: libt2n::client_connection() +, m_real_connection(connection) +, m_got_new_data(false) +{ + SCOPETRACKER(); + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection); + if (!ptr) + { + module_logger().error(HERE) << "illegal pointer passed"; + close(); + return; + } + if (!connection->opened()) + { + module_logger().warning(HERE) << "connection not open, either failed or already closed"; + close(); + return; + } + + ptr->connectRead( boost::bind(&T2NClientConnection::newDataSlot, this) ); + ptr->connectEof( boost::bind(&T2NClientConnection::eofSlot, this) ); + +} // eo T2NClientConnection::T2NClientConnection(IOImplementationPtr) + + +T2NClientConnection::~T2NClientConnection() +{ + SCOPETRACKER(); +} // eo T2NClientConnection::~T2NClientConnection + + +/** + * @brief returns if the connection is open. + * @return @a true if the connection is open. + */ +bool T2NClientConnection::isOpen() +{ + return m_real_connection and m_real_connection->opened(); +} // eo T2NClientConnection::isOpen() + + +/** + * @brief try to reopen a connection. + * @return @a true if the connection was reopened. + */ +bool T2NClientConnection::reopen(bool force) +{ + if (not m_real_connection) + { + return false; + } + boost::shared_ptr< T2nUnixIOSocket > t2n_socket= + boost::shared_dynamic_cast< T2nUnixIOSocket >(m_real_connection); + if (t2n_socket) + { + return t2n_socket->reopen(force); + } + return false; +} // eo T2NClientConnection::reopen() + + +/** + * @brief closes the connection. + * + * This closes the underlying IO connection and calls libt2n::server_connection::close() to + * mark the connection as closed for libt2n. + */ +void T2NClientConnection::close() +{ + SCOPETRACKER(); + if (m_real_connection) + { + m_real_connection->close(); + m_real_connection.reset(); + } + libt2n::client_connection::close(); +} // eo T2NClientConnection::close() + + +/** + * @brief sends a raw data chunk on the connection. + * + * @param data the (raw) data chunk which should be sended. + */ +void T2NClientConnection::real_write(const std::string& data) +{ + SCOPETRACKER(); + if (is_closed()) + { + module_logger().warning(HERE) << "attempt to write data on closed connection"; + return; + } + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + // should never happen... + module_logger().error(HERE)<< "illegal io pointer"; + close(); + //TODO: throw an error?! + NOT_REACHED(); + return; + } + ptr->sendData(data); +} // eo T2NClientConnection::real_write(const std::string) + + +/** + * @brief called to fill the connection buffer. + * + * Since this class uses the asnychronous AsyncIo framework, new data may already be read when + * this method is called. + * + * @param usec_timeout + * @param usec_timeout_remaining + * @return @a true if new data is available. + */ +bool T2NClientConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining) +{ + SCOPETRACKER(); + if (is_closed()) + { + module_logger().debug(HERE) << "fill_buffer() called on closed connection"; + return false; + } + AsyncIo::MilliTime t0,t1; + AsyncIo::get_current_monotonic_time(t0); + if (!m_got_new_data) + { + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + module_logger().error(HERE) << "illegal io pointer"; + close(); + return false; + } + // try to fetch data (call the backend) + int timeout= 0; + + if (usec_timeout<0) + { + timeout= -1; + } + else if (usec_timeout > 0) + { + long long msec_timeout= (usec_timeout + 500)/1000; + + if (msec_timeout >= INT_MAX) + { + timeout= INT_MAX; + } + else + { + timeout= (int)msec_timeout; + } + } + Backend::getBackend()->doOneStep( timeout ); + } + AsyncIo::get_current_monotonic_time(t1); + if (usec_timeout_remaining) + { + long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L; + *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L; + module_logger().debug() << "timeout: " << usec_timeout << " -> " << *usec_timeout_remaining; + } + if (m_got_new_data) + { + m_got_new_data= false; + return true; + } + return false; +} // eo T2NClientConnection::fill_buffer(long long,long long*) + + +/** + * @brief called when new data arrived on this connection. + * + * reads the new data from the underlying IO object and stores it in the connection buffer. + * Also remembers (in the bool member var @a m_got_new_data) that new data was received. + */ +void T2NClientConnection::newDataSlot() +{ + SCOPETRACKER(); + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + //TODO: throw an error?! + NOT_REACHED(); + return; + } + std::string new_data= ptr->receiveData(); + module_logger().debug() << "got " << new_data.size() << " bytes of new data"; + buffer+= new_data; + m_got_new_data= true; +} // eo T2NClientConnection::newDataSlot() + + +/** + * @brief called when an EOF was detected by the underlying IO object (i.e. the connection + * was closed by the peer side). + * + * Calls close(). + */ +void T2NClientConnection::eofSlot() +{ + SCOPETRACKER(); + close(); +} // eo T2NClientConnection::eofSlot() + + +/* +** implementation of T2NServerConnection +*/ + + +T2NServerConnection::T2NServerConnection( + T2NServerBasePtr server, + IOImplementationPtr connection, + int timeout +) +: libt2n::server_connection(timeout) +, m_real_connection(connection) +, m_server_weak_ptr(server) +, m_got_new_data(false) +{ + SCOPETRACKER(); + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection); + if (!ptr) + { + module_logger().error(HERE) << "illegal pointer passed"; + close(); + return; + } + if (!connection->opened()) + { + module_logger().warning(HERE) << "connection not open, either failed or already closed"; + close(); + return; + } + + ptr->connectRead( boost::bind(&T2NServerConnection::newDataSlot, this) ); + ptr->connectEof( boost::bind(&T2NServerConnection::eofSlot, this) ); + +} // eo T2NServerConnection::T2NServerConnection(IOImplementationPtr) + + +T2NServerConnection::~T2NServerConnection() +{ + SCOPETRACKER(); +} // eo T2NServerConnection::~T2NServerConnection + + +/** + * @brief closes the connection. + * + * This closes the underlying IO connection and calls libt2n::server_connection::close() to + * mark the connection as closed for libt2n. + */ +void T2NServerConnection::close() +{ + SCOPETRACKER(); + if (m_real_connection) + { + m_real_connection->close(); + m_real_connection.reset(); + } + libt2n::server_connection::close(); +} // eo T2NServerConnection::close() + + +/** + * @brief sends a raw data chunk on the connection. + * + * @param data the (raw) data chunk which should be sended. + */ +void T2NServerConnection::real_write(const std::string& data) +{ + SCOPETRACKER(); + if (is_closed()) + { + module_logger().warning(HERE) << "attempt to write data on closed connection"; + return; + } + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + // should never happen... + module_logger().error(HERE)<< "illegal io pointer"; + close(); + //TODO: throw an error?! + NOT_REACHED(); + return; + } + module_logger().debug() << "send " << data.size() << " bytes of data"; + ptr->sendData(data); +} // eo T2NServerConnection::real_write(const std::string) + + +/** + * @brief called to fill the connection buffer. + * + * Since this class uses the asnychronous AsyncIo framework, new data may already be read when + * this method is called. + * + * @param wait determines if we need to wait; if @a false it is just checked if new data + * was received, but no backend cycle is executed. + * @param usec_timeout + * @param usec_timeout_remaining + * @return @a true if new data is available. + */ +bool T2NServerConnection::low_fill_buffer(bool wait, long long usec_timeout, long long* usec_timeout_remaining) +{ + SCOPETRACKER(); + if (is_closed()) + { + module_logger().debug(HERE) << "fill_buffer() called on closed connection"; + return false; + } + if (not m_got_new_data and wait) + { + AsyncIo::MilliTime t0,t1; + AsyncIo::get_current_monotonic_time(t0); + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + module_logger().error(HERE) << "illegal io pointer"; + close(); + return false; + } + // try to fetch data (call the backend) + int timeout= 0; + + if (usec_timeout<0) + { + timeout= -1; + } + else if (usec_timeout > 0) + { + long long msec_timeout= (usec_timeout + 500)/1000; + + if (msec_timeout >= INT_MAX) + { + timeout= INT_MAX; + } + else + { + timeout= (int)msec_timeout; + } + } + Backend::getBackend()->doOneStep( timeout ); + AsyncIo::get_current_monotonic_time(t1); + if (usec_timeout_remaining) + { + long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L; + *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L; + } + } + else + { + if (usec_timeout_remaining) + { + *usec_timeout_remaining= usec_timeout; + } + } + if (m_got_new_data) + { + m_got_new_data= false; + return true; + } + return false; +} // eo T2NServerConnection::low_fill_buffer(bool,long long,long long*) + + +/** + * @brief called to fill the connection buffer. + * + * Since this class uses the asnychronous AsyncIo framework, new data may already be read when + * this method is called. + * + * @param usec_timeout + * @param usec_timeout_remaining + * @return @a true if new data is available. + */ +bool T2NServerConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining) +{ + return low_fill_buffer(true, usec_timeout, usec_timeout_remaining); +} // eo T2NServerConnection::fill_buffer(long long,long long*) + + +/** + * @brief called when new data arrived on this connection. + * + * reads the new data from the underlying IO object and stores it in the connection buffer. + * Also remembers (in the bool member var @a m_got_new_data that new data was received. + */ +void T2NServerConnection::newDataSlot() +{ + SCOPETRACKER(); + IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); + if (!ptr) + { + //TODO:throw an error?! + NOT_REACHED(); + return; + } + std::string new_data= ptr->receiveData(); + buffer+= new_data; + module_logger().debug() << "got " << new_data.size() << " bytes of new data"; + m_got_new_data= true; + reset_timeout(); + + T2NServerBasePtr server =m_server_weak_ptr.lock(); + if (server) + { + server->m_signal_client_got_new_data(); + } +} // eo T2NServerConnection::newDataSlot() + + +/** + * @brief called when an EOF was detected by the underlying IO object (i.e. the connection + * was closed by the peer side). + * + * Calls close(). + */ +void T2NServerConnection::eofSlot() +{ + SCOPETRACKER(); + close(); +} // eo T2NServerConnection::eofSlot() + + + +/* +** implementation of T2NServerBase +*/ + + +/** + * @brief constructs a libt2n server object. + * + * @param server_port shared pointer to a (AsyncIo) port server object which + * is used as underlying port handler. + */ +T2NServerBase::T2NServerBase( ServerSocketBaseImplementationPtr server_port) +: m_server_port(server_port) +, m_new_data_available(false) +{ + SCOPETRACKER(); + // register our callback for new incoming conncetions. + server_port->setNewConnectionBaseCallback( + boost::bind(&T2NServerBase::newConnectionSlot, this, _1) + ); + m_signal_client_got_new_data.connect + ( + boost::bind(&T2NServerBase::clientGotNewDataSlot, this) + ); +} // eo T2NServerBase::T2NServerBase(ServerSocketBaseImplementationPtr) + + +/** + * @brief destructor. + * + */ +T2NServerBase::~T2NServerBase() +{ + SCOPETRACKER(); +} // eo T2NServerBase::~T2NServerBase() + + +/** + * @brief returns wether the server port is opened. + * + * @return @a true iff the server port is open. + */ +bool T2NServerBase::isOpen() +{ + return (m_server_port && m_server_port->opened()); +} // eo T2NServerBase + + + +/** + * @brief callback for the port server object when a new connection is established. + * + * @param io_ptr the (shared) pointer to the new connection. + */ +void T2NServerBase::newConnectionSlot(IOImplementationPtr io_ptr) +{ + SCOPETRACKER(); + add_connection( new T2NServerConnection( get_ptr_as< T2NServerBase >(), io_ptr, get_default_timeout() ) ); +} // eo T2NServerBase::newConnectionSlot(IOImplementationPtr) + + +/** + * @brief callback for "new data available" signal + */ +void T2NServerBase::clientGotNewDataSlot() +{ + m_new_data_available= true; +} // eo T2NServerBase::clientGotNewDataSlot() + + +/** + * @brief try to fill the buffers of the managed connections. + * + * will be called by T2NServerBase::fill_buffer(). + * + * @return @a true if at least one connection buffer has new data. + */ +bool T2NServerBase::fill_connection_buffers() +{ + SCOPETRACKER(); + Backend::getBackend()->doOneStep(0); + bool result= false; + for(std::map::iterator it=connections.begin(); + it != connections.end(); + ++it) + { + T2NServerConnection *conn = dynamic_cast(it->second); + if (!conn) + { + if (it->second) + { + // react somehow if (it->second) is not NULL... + module_logger().error(HERE) << "illegal connection pointer"; + it->second->close(); + } + continue; + } + if ( conn->low_fill_buffer(false, 0) ) + { + result= true; + } + } + return result; +} // eo T2NServerBase::fill_connection_buffers() + + +/** + * @brief fills the connection buffers. + * + * Uses the AsyncIo Backend to wait for new data. + * + * @param usec_timeout the maximum time period to wait for new data (in microseconds). + * 0 returns immediately, -1 waits until some event occurred. + * @param timeout_remaining ignored! + * @return @a true if new data for at least one connection arrived. + * + * @note since this method uses the AsyncIo backend, the timeout will have only milli second + * resolution. + */ +bool T2NServerBase::fill_buffer(long long usec_timeout, long long* timeout_remaining) +{ + SCOPETRACKER(); + + if (m_new_data_available) + { + // short cut if we already know that we have new data: + m_new_data_available= false; + return true; + } + + int timeout= 0; + + if (usec_timeout<0) + { + timeout= -1; + } + else if (usec_timeout > 0) + { + long long msec_timeout= (usec_timeout + 500)/1000; + + if (msec_timeout >= INT_MAX) + { + timeout= INT_MAX; + } + else + { + timeout= (int)msec_timeout; + } + } + // not really.. but it shouldn't be used either... + if (timeout_remaining) *timeout_remaining= 0L; + + if (! fill_connection_buffers() && timeout>0) + { + bool had_activity= Backend::getBackend()->doOneStep( timeout ); + return fill_connection_buffers(); + } + return true; +} // to T2NServerBase::fill_buffer(long long,long long*) + + + +/* +** creator functions: +*/ + + +/** + * @brief creates a server object with unix domain server socket. + * @param path path of the unix domain socket. + * @param mode mode for the socket. + * @return shared pointer with the new server object; empty if none could be created.. + */ +T2NServerBasePtr createT2NUnixServerPort(const std::string& path, int mode) +{ + SCOPETRACKER(); + boost::shared_ptr< T2NUnixServer > result( new T2NUnixServer(path,mode) ); + if (!result->isOpen()) + { + module_logger().error(HERE) + << "failed to open unix domain server socket on \"" << path << "\""; + } + return result; +} // eo createT2NUnixServerPort(const std::string&,int) + + +/** + * @brief creates a client object connected to a server via unix daomain socket. + * @param path path of cthe unix domain socket. + * @return shared pointer with the new client object; empty if none could be created.. + */ +T2NClientConnectionPtr createT2NUnixClientConnection(const std::string& path) +{ + typedef IOExportWrapper< AsyncIo::UnixIOSocket > MyIo; + typedef boost::shared_ptr< MyIo > MyIoPtr; + SCOPETRACKER(); + MyIoPtr connection( new MyIo(path) ); + boost::shared_ptr< RealT2NClientConnection > result( new RealT2NClientConnection( connection ) ); + if (not result->isOpen()) + { + module_logger().error(HERE) + << "failed to open unix domain client socket on \"" << path << "\""; + return T2NClientConnectionPtr(); + } + return result; +} // eo createT2NUnixClientConnection(const std::string&) + +} // eo namespace AsyncIo diff --git a/glue_t2n/async_io_t2n.hpp b/glue_t2n/async_io_t2n.hpp new file mode 100644 index 0000000..0a998ce --- /dev/null +++ b/glue_t2n/async_io_t2n.hpp @@ -0,0 +1,206 @@ +/** + * @file + * @brief the "glue" between libt2n and AsyncIo framework. + * + * contains our own server and client class which should fit into the asnychronous way of "AsyncIo". + * We use our own classes since the libt2n socket classes are made for synchronous operation + * which can lead to problems even if we import the connection fd's into "AsyncIo"... + * + * @author Reinhard Pfau \ + * + * @copyright © Copyright 2008 by Intra2net AG + * @license LGPL + * @contact info@intra2net.com + * + * @todo support for TCP/IP connections. + */ + +#ifndef __CONND_GLUE_T2N_HPP__ +#define __CONND_GLUE_T2N_HPP__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace AsyncIo +{ + +using namespace I2n; + +/** + * @brief specialized version of the libt2n client connection which fits into the AsyncIo framework. + * + */ +class T2NClientConnection +: public libt2n::client_connection +{ + public: + + T2NClientConnection(); + virtual ~T2NClientConnection(); + + bool isOpen(); + + bool reopen(bool force= false); + + + /* + ** overloaded methods from libt2n classes: + */ + + virtual void close(); + + protected: + + T2NClientConnection( AsyncIo::IOImplementationPtr connection ); + + void newDataSlot(); + + void eofSlot(); + + /* + ** overloaded methods from t2n classes: + */ + + virtual void real_write(const std::string& data); + + virtual bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL); + + protected: + + AsyncIo::IOImplementationPtr m_real_connection; + + bool m_got_new_data; + +}; // eo class T2NClientConnection + + +typedef boost::shared_ptr< T2NClientConnection > T2NClientConnectionPtr; + + +class T2NServerBase; + +typedef boost::shared_ptr< T2NServerBase > T2NServerBasePtr; +typedef boost::weak_ptr< T2NServerBase > T2NServerBaseWeakPtr; + +/** + * @brief specialized version of the libt2n server connection which fits into the AsyncIo framework. + * + */ +class T2NServerConnection +: public libt2n::server_connection +{ + friend class T2NServerBase; + public: + + T2NServerConnection(); + virtual ~T2NServerConnection(); + + + /* + ** overloaded methods from libt2n classes: + */ + + virtual void close(); + + protected: + + T2NServerConnection( + T2NServerBasePtr server, + AsyncIo::IOImplementationPtr connection, + int timeout); + + void newDataSlot(); + + void eofSlot(); + + bool low_fill_buffer(bool wait, long long usec_timeout=-1, long long* usec_timeout_remaining=NULL); + + /* + ** overloaded methods from t2n classes: + */ + + virtual void real_write(const std::string& data); + + virtual bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL); + + + protected: + + AsyncIo::IOImplementationPtr m_real_connection; + T2NServerBaseWeakPtr m_server_weak_ptr; + + bool m_got_new_data; + +}; // eo class T2NServerConnection + + + +/** + * @brief base server class for handling server ports for libt2n. + * + * Needs to be derived for the real type of connection (unix, IPv4, etc.). + * + * Does all necessary connection handling and realizes the abstract methods from + * the libt2n::server class. + */ +class T2NServerBase +: public libt2n::server +, virtual public SharedBase +{ + public: + virtual ~T2NServerBase(); + + bool isOpen(); + + /* + ** overloaded methods from t2n classes: + */ + + virtual bool fill_buffer(long long usec_timeout=-1, long long* timeout_remaining=NULL); + + public: + + boost::signal< void() > m_signal_client_got_new_data; + + protected: + + T2NServerBase( AsyncIo::ServerSocketBaseImplementationPtr server_port); + + void newConnectionSlot(AsyncIo::IOImplementationPtr io_ptr); + + void clientGotNewDataSlot(); + + /* + ** overloaded methods from t2n classes: + */ + + virtual bool fill_connection_buffers(void); + + + protected: + + AsyncIo::ServerSocketBaseImplementationPtr m_server_port; + bool m_new_data_available; + +}; // eo T2NServerBase + + +typedef boost::shared_ptr< T2NServerBase > T2NServerBasePtr; + + +T2NServerBasePtr createT2NUnixServerPort(const std::string& path, int mode= 0600); + +T2NClientConnectionPtr createT2NUnixClientConnection(const std::string& path); + + +} // eo namespace AsyncIo + +#endif diff --git a/asyncio/libsimpleio.pc.in b/glue_t2n/libasyncio_t2n.pc.in similarity index 50% rename from asyncio/libsimpleio.pc.in rename to glue_t2n/libasyncio_t2n.pc.in index a671d47..44ac70b 100644 --- a/asyncio/libsimpleio.pc.in +++ b/glue_t2n/libasyncio_t2n.pc.in @@ -3,9 +3,9 @@ exec_prefix=@exec_prefix@ libdir=@libdir@ includedir=@includedir@ -Name: libsimpleio -Description: asynchrounous io lib +Name: libsimpleio_t2n +Description: t2n glue for libsimpleio Version: @VERSION@ -Requires: libi2ncommon -Libs: -L${libdir} -lsimpleio +Requires: libsimpleio libt2n +Libs: -L${libdir} -lsimpleio_t2n Cflags: -I${includedir}