2 The software in this package is distributed under the GNU General
3 Public License version 2 (with a special exception described below).
5 A copy of GNU General Public License (GPL) is included in this distribution,
6 in the file COPYING.GPL.
8 As a special exception, if other files instantiate templates or use macros
9 or inline functions from this file, or you compile this file and link it
10 with other works to produce a work based on this file, this file
11 does not by itself cause the resulting work to be covered
12 by the GNU General Public License.
14 However the source code for this file must still be made available
15 in accordance with section (3) of the GNU General Public License.
17 This exception does not invalidate any other reasons why a work based
18 on this file might be covered by the GNU General Public License.
23 * @author Reinhard Pfau \<reinhard.pfau@intra2net.com\>
25 * @copyright © Copyright 2008 by Intra2net AG
27 #include "asyncio_t2n.hpp"
30 #include <boost/type_traits/is_base_of.hpp>
31 #include <boost/static_assert.hpp>
32 #include <boost/signals2.hpp>
34 #include <logfunc.hpp>
35 #include <tracefunc.hpp>
36 #include <log_macros.hpp>
42 using namespace Utils;
48 Logger::PartLogger& module_logger()
50 static Logger::PartLogger _module_logger(HERE);
51 return _module_logger;
52 } // eo module_logger();
57 * @brief a class with some methods we like to have on our io classes.
59 * This class is to be used as second base in a wrapper class and needs it's methods to
60 * be redefined for "the real thing".
62 class IOExportWrapperBase
70 virtual ~IOExportWrapperBase()
74 virtual void sendData(const std::string& data)
79 virtual std::string receiveData()
84 virtual boost::signals2::connection connectEof( const boost::function< void() >& func )
86 return boost::signals2::connection();
89 virtual boost::signals2::connection connectRead( const boost::function< void() >& func )
91 return boost::signals2::connection();
94 }; // eo class IOExportWrapperBase
97 typedef boost::shared_ptr< IOExportWrapperBase > IOExportWrapperBasePtr;
101 * @brief IO wrapper template.
102 * @tparam IOClass a type based on AsyncIo::IOImplementation
104 * The type is used as a public base for the resulting class; the second public base is our
105 * helper with the additional methods we need internally and which we (finally) define here.
110 class IOExportWrapper
112 , public IOExportWrapperBase
114 BOOST_STATIC_ASSERT(( boost::is_base_of< IOImplementation,IOClass >::value ));
124 IOExportWrapper(Arg1 arg1)
130 typename Arg1, typename Arg2
132 IOExportWrapper(Arg1 arg1, Arg2 arg2)
138 typename Arg1, typename Arg2, typename Arg3
140 IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3)
141 : IOClass(arg1,arg2,arg3)
146 typename Arg1, typename Arg2, typename Arg3, typename Arg4
148 IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4)
149 : IOClass(arg1,arg2,arg3,arg4)
154 typename Arg1, typename Arg2, typename Arg3, typename Arg4, typename Arg5
156 IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4, Arg5 arg5)
157 : IOClass(arg1,arg2,arg3,arg4,arg5)
162 * @brief exposed funtion for sending data.
163 * @param data the chunk to be send.
165 virtual void sendData(const std::string& data)
167 IOClass::lowSend(data);
171 * @brief returns the new received data.
172 * @return the receievd data.
174 * Clears the receive buffer.
176 virtual std::string receiveData()
179 result.swap(IOClass::m_input_buffer);
184 * @brief exposed connect to EOF signal.
185 * @param func the function which should be connected to the eof signal.
186 * @return signal connection handle.
188 virtual boost::signals2::connection connectEof( const boost::function< void() >& func )
190 return IOClass::m_signal_eof.connect(func);
194 * @brief exposed connect to "read" signal.
195 * @param func the function which should be connected to the "read" signal.
196 * @return signal connection handle.
198 virtual boost::signals2::connection connectRead( const boost::function< void() >& func )
200 return IOClass::m_signal_read.connect(func);
205 }; // eo class IOExportWrapper
209 ** specialized versions of io classes:
213 * @brief enhanced unix domain socket class with reconnect feature.
215 * Used for t2n client connections.
217 class T2nUnixIOSocket
218 : public AsyncIo::UnixIOSocket
220 typedef AsyncIo::UnixIOSocket inherited;
222 T2nUnixIOSocket( const std::string& path );
224 virtual void close(AsyncIo::Direction direction = AsyncIo::Direction::both);
226 bool reopen(bool force= false);
230 virtual void doRead();
236 bool m_may_reconnect;
238 }; // T2nUnixIOSocket
241 T2nUnixIOSocket::T2nUnixIOSocket(const std::string& path)
243 , m_in_do_read(false)
244 , m_may_reconnect(false)
246 } // eo T2nUnixIOSocket::T2nUnixIOSocket(const std::string&)
249 void T2nUnixIOSocket::close(AsyncIo::Direction direction)
251 bool was_open= opened();
252 inherited::close(direction);
253 if (m_in_do_read and not opened())
255 m_may_reconnect= was_open;
257 } // eo T2nUnixIOSocket::close(AsyncIo::Direction)
260 bool T2nUnixIOSocket::reopen(bool force)
266 if (m_may_reconnect || force)
268 return inherited::open( m_path );
271 } // eo T2nUnixIOSocket::reopen()
274 void T2nUnixIOSocket::doRead()
287 } // eo T2nUnixIOSocket::doRead()
291 * @brief server class for libt2n using unix domain sockets.
293 * well, it's enough to provide an appropriate constructor.
294 * (did i mention that templates are really cool stuff? :-) )
297 : public T2NServerBase
301 T2NUnixServer(const std::string& path, int mode=0600)
302 : T2NServerBase( ServerSocketBaseImplementationPtr(
303 new UnixServerSocket<
304 IOExportWrapper< UnixIOSocket >
308 } // eo T2NServerBase
310 }; // eo T2NUnixServer
314 class RealT2NClientConnection
315 : public T2NClientConnection
318 RealT2NClientConnection( AsyncIo::IOImplementationPtr connection )
319 : T2NClientConnection(connection)
322 }; // eo class T2NClient
324 } // eo namespace <anonymous>
329 ** implementation of T2NClientConnection
333 T2NClientConnection::T2NClientConnection(
334 IOImplementationPtr connection
336 : libt2n::client_connection()
337 , m_real_connection(connection)
338 , m_got_new_data(false)
341 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection);
344 module_logger().error(HERE) << "illegal pointer passed";
348 if (!connection->opened())
350 module_logger().warning(HERE) << "connection not open, either failed or already closed";
355 ptr->connectRead( boost::bind(&T2NClientConnection::newDataSlot, this) );
356 ptr->connectEof( boost::bind(&T2NClientConnection::eofSlot, this) );
358 } // eo T2NClientConnection::T2NClientConnection(IOImplementationPtr)
361 T2NClientConnection::~T2NClientConnection()
364 } // eo T2NClientConnection::~T2NClientConnection
368 * @brief returns if the connection is open.
369 * @return @a true if the connection is open.
371 bool T2NClientConnection::isOpen()
373 return m_real_connection and m_real_connection->opened();
374 } // eo T2NClientConnection::isOpen()
378 * @brief try to reopen a connection.
379 * @return @a true if the connection was reopened.
381 bool T2NClientConnection::reopen(bool force)
383 if (not m_real_connection)
387 boost::shared_ptr< T2nUnixIOSocket > t2n_socket=
388 boost::dynamic_pointer_cast< T2nUnixIOSocket >(m_real_connection);
391 return t2n_socket->reopen(force);
394 } // eo T2NClientConnection::reopen()
398 * @brief closes the connection.
400 * This closes the underlying IO connection and calls libt2n::server_connection::close() to
401 * mark the connection as closed for libt2n.
403 void T2NClientConnection::close()
406 if (m_real_connection)
408 m_real_connection->close();
409 m_real_connection.reset();
411 libt2n::client_connection::close();
412 } // eo T2NClientConnection::close()
416 * @brief sends a raw data chunk on the connection.
418 * @param data the (raw) data chunk which should be sended.
420 void T2NClientConnection::real_write(const std::string& data)
425 module_logger().warning(HERE) << "attempt to write data on closed connection";
428 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
431 // should never happen...
432 module_logger().error(HERE)<< "illegal io pointer";
434 //TODO: throw an error?!
439 } // eo T2NClientConnection::real_write(const std::string)
443 * @brief called to fill the connection buffer.
445 * Since this class uses the asnychronous AsyncIo framework, new data may already be read when
446 * this method is called.
448 * @param usec_timeout
449 * @param usec_timeout_remaining
450 * @return @a true if new data is available.
452 bool T2NClientConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining)
457 module_logger().debug(HERE) << "fill_buffer() called on closed connection";
460 AsyncIo::MilliTime t0,t1;
461 AsyncIo::get_current_monotonic_time(t0);
464 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
467 module_logger().error(HERE) << "illegal io pointer";
471 // try to fetch data (call the backend)
478 else if (usec_timeout > 0)
480 long long msec_timeout= (usec_timeout + 500)/1000;
482 if (msec_timeout >= INT_MAX)
488 timeout= (int)msec_timeout;
491 Backend::getBackend()->doOneStep( timeout );
493 AsyncIo::get_current_monotonic_time(t1);
494 if (usec_timeout_remaining)
496 long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L;
497 *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L;
498 module_logger().debug() << "timeout: " << usec_timeout << " -> " << *usec_timeout_remaining;
502 m_got_new_data= false;
506 } // eo T2NClientConnection::fill_buffer(long long,long long*)
510 * @brief called when new data arrived on this connection.
512 * reads the new data from the underlying IO object and stores it in the connection buffer.
513 * Also remembers (in the bool member var @a m_got_new_data) that new data was received.
515 void T2NClientConnection::newDataSlot()
518 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
521 //TODO: throw an error?!
525 std::string new_data= ptr->receiveData();
526 module_logger().debug() << "got " << new_data.size() << " bytes of new data";
528 m_got_new_data= true;
529 } // eo T2NClientConnection::newDataSlot()
533 * @brief called when an EOF was detected by the underlying IO object (i.e. the connection
534 * was closed by the peer side).
538 void T2NClientConnection::eofSlot()
542 } // eo T2NClientConnection::eofSlot()
546 ** implementation of T2NServerConnection
550 T2NServerConnection::T2NServerConnection(
551 T2NServerBasePtr server,
552 IOImplementationPtr connection,
555 : libt2n::server_connection(timeout)
556 , m_real_connection(connection)
557 , m_server_weak_ptr(server)
558 , m_got_new_data(false)
561 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection);
564 module_logger().error(HERE) << "illegal pointer passed";
568 if (!connection->opened())
570 module_logger().warning(HERE) << "connection not open, either failed or already closed";
575 ptr->connectRead( boost::bind(&T2NServerConnection::newDataSlot, this) );
576 ptr->connectEof( boost::bind(&T2NServerConnection::eofSlot, this) );
578 } // eo T2NServerConnection::T2NServerConnection(IOImplementationPtr)
581 T2NServerConnection::~T2NServerConnection()
584 } // eo T2NServerConnection::~T2NServerConnection
588 * @brief closes the connection.
590 * This closes the underlying IO connection and calls libt2n::server_connection::close() to
591 * mark the connection as closed for libt2n.
593 void T2NServerConnection::close()
596 if (m_real_connection)
598 m_real_connection->close();
599 m_real_connection.reset();
601 libt2n::server_connection::close();
602 } // eo T2NServerConnection::close()
606 * @brief sends a raw data chunk on the connection.
608 * @param data the (raw) data chunk which should be sended.
610 void T2NServerConnection::real_write(const std::string& data)
615 module_logger().warning(HERE) << "attempt to write data on closed connection";
618 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
621 // should never happen...
622 module_logger().error(HERE)<< "illegal io pointer";
624 //TODO: throw an error?!
628 module_logger().debug() << "send " << data.size() << " bytes of data";
630 } // eo T2NServerConnection::real_write(const std::string)
634 * @brief called to fill the connection buffer.
636 * Since this class uses the asnychronous AsyncIo framework, new data may already be read when
637 * this method is called.
639 * @param wait determines if we need to wait; if @a false it is just checked if new data
640 * was received, but no backend cycle is executed.
641 * @param usec_timeout
642 * @param usec_timeout_remaining
643 * @return @a true if new data is available.
645 bool T2NServerConnection::low_fill_buffer(bool wait, long long usec_timeout, long long* usec_timeout_remaining)
650 module_logger().debug(HERE) << "fill_buffer() called on closed connection";
653 if (not m_got_new_data and wait)
655 AsyncIo::MilliTime t0,t1;
656 AsyncIo::get_current_monotonic_time(t0);
657 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
660 module_logger().error(HERE) << "illegal io pointer";
664 // try to fetch data (call the backend)
671 else if (usec_timeout > 0)
673 long long msec_timeout= (usec_timeout + 500)/1000;
675 if (msec_timeout >= INT_MAX)
681 timeout= (int)msec_timeout;
684 Backend::getBackend()->doOneStep( timeout );
685 AsyncIo::get_current_monotonic_time(t1);
686 if (usec_timeout_remaining)
688 long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L;
689 *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L;
694 if (usec_timeout_remaining)
696 *usec_timeout_remaining= usec_timeout;
701 m_got_new_data= false;
705 } // eo T2NServerConnection::low_fill_buffer(bool,long long,long long*)
709 * @brief called to fill the connection buffer.
711 * Since this class uses the asnychronous AsyncIo framework, new data may already be read when
712 * this method is called.
714 * @param usec_timeout
715 * @param usec_timeout_remaining
716 * @return @a true if new data is available.
718 bool T2NServerConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining)
720 return low_fill_buffer(true, usec_timeout, usec_timeout_remaining);
721 } // eo T2NServerConnection::fill_buffer(long long,long long*)
725 * @brief called when new data arrived on this connection.
727 * reads the new data from the underlying IO object and stores it in the connection buffer.
728 * Also remembers (in the bool member var @a m_got_new_data that new data was received.
730 void T2NServerConnection::newDataSlot()
733 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
736 //TODO:throw an error?!
740 std::string new_data= ptr->receiveData();
742 module_logger().debug() << "got " << new_data.size() << " bytes of new data";
743 m_got_new_data= true;
746 T2NServerBasePtr server =m_server_weak_ptr.lock();
749 server->m_signal_client_got_new_data();
751 } // eo T2NServerConnection::newDataSlot()
755 * @brief called when an EOF was detected by the underlying IO object (i.e. the connection
756 * was closed by the peer side).
760 void T2NServerConnection::eofSlot()
764 } // eo T2NServerConnection::eofSlot()
769 ** implementation of T2NServerBase
774 * @brief constructs a libt2n server object.
776 * @param server_port shared pointer to a (AsyncIo) port server object which
777 * is used as underlying port handler.
779 T2NServerBase::T2NServerBase( ServerSocketBaseImplementationPtr server_port)
780 : m_server_port(server_port)
781 , m_new_data_available(false)
784 // register our callback for new incoming conncetions.
785 server_port->setNewConnectionBaseCallback(
786 boost::bind(&T2NServerBase::newConnectionSlot, this, _1)
788 m_signal_client_got_new_data.connect
790 boost::bind(&T2NServerBase::clientGotNewDataSlot, this)
792 } // eo T2NServerBase::T2NServerBase(ServerSocketBaseImplementationPtr)
799 T2NServerBase::~T2NServerBase()
802 } // eo T2NServerBase::~T2NServerBase()
806 * @brief returns wether the server port is opened.
808 * @return @a true iff the server port is open.
810 bool T2NServerBase::isOpen()
812 return (m_server_port && m_server_port->opened());
813 } // eo T2NServerBase
818 * @brief callback for the port server object when a new connection is established.
820 * @param io_ptr the (shared) pointer to the new connection.
822 void T2NServerBase::newConnectionSlot(IOImplementationPtr io_ptr)
825 add_connection( new T2NServerConnection( get_ptr_as< T2NServerBase >(), io_ptr, get_default_timeout() ) );
826 } // eo T2NServerBase::newConnectionSlot(IOImplementationPtr)
830 * @brief callback for "new data available" signal
832 void T2NServerBase::clientGotNewDataSlot()
834 m_new_data_available= true;
835 } // eo T2NServerBase::clientGotNewDataSlot()
839 * @brief try to fill the buffers of the managed connections.
841 * will be called by T2NServerBase::fill_buffer().
843 * @return @a true if at least one connection buffer has new data.
845 bool T2NServerBase::fill_connection_buffers()
848 Backend::getBackend()->doOneStep(0);
850 for(std::map<unsigned int, libt2n::server_connection*>::iterator it=connections.begin();
851 it != connections.end();
854 T2NServerConnection *conn = dynamic_cast<T2NServerConnection*>(it->second);
859 // react somehow if (it->second) is not NULL...
860 module_logger().error(HERE) << "illegal connection pointer";
865 if ( conn->low_fill_buffer(false, 0) )
871 } // eo T2NServerBase::fill_connection_buffers()
875 * @brief fills the connection buffers.
877 * Uses the AsyncIo Backend to wait for new data.
879 * @param usec_timeout the maximum time period to wait for new data (in microseconds).
880 * 0 returns immediately, -1 waits until some event occurred.
881 * @param timeout_remaining ignored!
882 * @return @a true if new data for at least one connection arrived.
884 * @note since this method uses the AsyncIo backend, the timeout will have only milli second
887 bool T2NServerBase::fill_buffer(long long usec_timeout, long long* timeout_remaining)
891 if (m_new_data_available)
893 // short cut if we already know that we have new data:
894 m_new_data_available= false;
904 else if (usec_timeout > 0)
906 long long msec_timeout= (usec_timeout + 500)/1000;
908 if (msec_timeout >= INT_MAX)
914 timeout= (int)msec_timeout;
917 // not really.. but it shouldn't be used either...
918 if (timeout_remaining) *timeout_remaining= 0L;
920 if (! fill_connection_buffers() && timeout>0)
922 bool had_activity= Backend::getBackend()->doOneStep( timeout );
923 return fill_connection_buffers();
926 } // to T2NServerBase::fill_buffer(long long,long long*)
931 ** creator functions:
936 * @brief creates a server object with unix domain server socket.
937 * @param path path of the unix domain socket.
938 * @param mode mode for the socket.
939 * @return shared pointer with the new server object; empty if none could be created..
941 T2NServerBasePtr createT2NUnixServerPort(const std::string& path, int mode)
944 boost::shared_ptr< T2NUnixServer > result( new T2NUnixServer(path,mode) );
945 if (!result->isOpen())
947 module_logger().error(HERE)
948 << "failed to open unix domain server socket on \"" << path << "\"";
951 } // eo createT2NUnixServerPort(const std::string&,int)
955 * @brief creates a client object connected to a server via unix daomain socket.
956 * @param path path of cthe unix domain socket.
957 * @return shared pointer with the new client object; empty if none could be created..
959 T2NClientConnectionPtr createT2NUnixClientConnection(const std::string& path)
961 typedef IOExportWrapper< AsyncIo::UnixIOSocket > MyIo;
962 typedef boost::shared_ptr< MyIo > MyIoPtr;
964 MyIoPtr connection( new MyIo(path) );
965 boost::shared_ptr< RealT2NClientConnection > result( new RealT2NClientConnection( connection ) );
966 if (not result->isOpen())
968 module_logger().error(HERE)
969 << "failed to open unix domain client socket on \"" << path << "\"";
970 return T2NClientConnectionPtr();
973 } // eo createT2NUnixClientConnection(const std::string&)
975 } // eo namespace AsyncIo