Migrate libasyncio from boost.signal to signals2 (#8756)
[libasyncio] / glue_t2n / asyncio_t2n.cpp
1 /*
2 The software in this package is distributed under the GNU General
3 Public License version 2 (with a special exception described below).
4
5 A copy of GNU General Public License (GPL) is included in this distribution,
6 in the file COPYING.GPL.
7
8 As a special exception, if other files instantiate templates or use macros
9 or inline functions from this file, or you compile this file and link it
10 with other works to produce a work based on this file, this file
11 does not by itself cause the resulting work to be covered
12 by the GNU General Public License.
13
14 However the source code for this file must still be made available
15 in accordance with section (3) of the GNU General Public License.
16
17 This exception does not invalidate any other reasons why a work based
18 on this file might be covered by the GNU General Public License.
19 */
20 /**
21  * @file
22  *
23  * @author Reinhard Pfau \<reinhard.pfau@intra2net.com\>
24  *
25  * @copyright &copy; Copyright 2008 by Intra2net AG
26  */
27 #include "asyncio_t2n.hpp"
28
29 #include <iostream>
30 #include <boost/type_traits/is_base_of.hpp>
31 #include <boost/static_assert.hpp>
32 #include <boost/signals2.hpp>
33 #include <climits>
34 #include <logfunc.hpp>
35 #include <tracefunc.hpp>
36 #include <log_macros.hpp>
37
38
39 namespace AsyncIo
40 {
41
42 using namespace Utils;
43
44 namespace
45 {
46
47
48 Logger::PartLogger& module_logger()
49 {
50     static Logger::PartLogger _module_logger(HERE);
51     return _module_logger;
52 } // eo module_logger();
53
54
55
56 /**
57  * @brief a class with some methods we like to have on our io classes.
58  *
59  * This class is to be used as second base in a wrapper class and needs it's methods to
60  * be redefined for "the real thing".
61  */
62 class IOExportWrapperBase
63 {
64     public:
65
66         IOExportWrapperBase()
67         {
68         }
69
70         virtual ~IOExportWrapperBase()
71         {
72         }
73
74         virtual void sendData(const std::string& data)
75         {
76         }
77
78
79         virtual std::string receiveData()
80         {
81             return std::string();
82         }
83
84         virtual boost::signals2::connection connectEof( const boost::function< void() >& func )
85         {
86             return boost::signals2::connection();
87         }
88
89         virtual boost::signals2::connection connectRead( const boost::function< void() >& func )
90         {
91             return boost::signals2::connection();
92         }
93
94 }; // eo class IOExportWrapperBase
95
96
97 typedef boost::shared_ptr< IOExportWrapperBase > IOExportWrapperBasePtr;
98
99
100 /**
101  * @brief IO wrapper template.
102  * @tparam IOClass a type based on AsyncIo::IOImplementation
103  *
104  * The type is used as a public base for the resulting class; the second public base is our
105  * helper with the additional methods we need internally and which we (finally) define here.
106  */
107 template<
108     class IOClass
109 >
110 class IOExportWrapper
111 : public IOClass
112 , public IOExportWrapperBase
113 {
114         BOOST_STATIC_ASSERT(( boost::is_base_of< IOImplementation,IOClass >::value ));
115
116     public:
117         IOExportWrapper()
118         {
119         }
120
121         template<
122             typename Arg1
123         >
124         IOExportWrapper(Arg1 arg1)
125         : IOClass(arg1)
126         {}
127
128
129         template<
130             typename Arg1, typename Arg2
131         >
132         IOExportWrapper(Arg1 arg1, Arg2 arg2)
133         : IOClass(arg1,arg2)
134         {}
135
136
137         template<
138             typename Arg1, typename Arg2, typename Arg3
139         >
140         IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3)
141         : IOClass(arg1,arg2,arg3)
142         {}
143
144
145         template<
146             typename Arg1, typename Arg2, typename Arg3, typename Arg4
147         >
148         IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4)
149         : IOClass(arg1,arg2,arg3,arg4)
150         {}
151
152
153         template<
154             typename Arg1, typename Arg2, typename Arg3, typename Arg4, typename Arg5
155         >
156         IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4, Arg5 arg5)
157         : IOClass(arg1,arg2,arg3,arg4,arg5)
158         {}
159
160
161         /**
162          * @brief exposed funtion for sending data.
163          * @param data the chunk to be send.
164          */
165         virtual void sendData(const std::string& data)
166         {
167             IOClass::lowSend(data);
168         }
169
170         /**
171          * @brief returns the new received data.
172          * @return the receievd data.
173          *
174          * Clears the receive buffer.
175          */
176         virtual std::string receiveData()
177         {
178             std::string result;
179             result.swap(IOClass::m_input_buffer);
180             return result;
181         }
182
183         /**
184          * @brief exposed connect to EOF signal.
185          * @param func the function which should be connected to the eof signal.
186          * @return signal connection handle.
187          */
188         virtual boost::signals2::connection connectEof( const boost::function< void() >& func )
189         {
190             return IOClass::m_signal_eof.connect(func);
191         }
192
193         /**
194          * @brief exposed connect to "read" signal.
195          * @param func the function which should be connected to the "read" signal.
196          * @return signal connection handle.
197          */
198         virtual boost::signals2::connection connectRead( const boost::function< void() >& func )
199         {
200             return IOClass::m_signal_read.connect(func);
201         }
202
203     protected:
204
205 }; // eo class IOExportWrapper
206
207
208 /*
209 ** specialized versions of io classes:
210 */
211
212 /**
213  * @brief enhanced unix domain socket class with reconnect feature.
214  *
215  * Used for t2n client connections.
216  */
217 class T2nUnixIOSocket
218 : public AsyncIo::UnixIOSocket
219 {
220         typedef AsyncIo::UnixIOSocket inherited;
221     public:
222         T2nUnixIOSocket( const std::string& path );
223
224         virtual void close(AsyncIo::Direction direction = AsyncIo::Direction::both);
225
226         bool reopen(bool force= false);
227
228     protected:
229
230         virtual void doRead();
231
232
233     protected:
234
235         bool m_in_do_read;
236         bool m_may_reconnect;
237
238 }; // T2nUnixIOSocket
239
240
241 T2nUnixIOSocket::T2nUnixIOSocket(const std::string& path)
242 : inherited(path)
243 , m_in_do_read(false)
244 , m_may_reconnect(false)
245 {
246 } // eo T2nUnixIOSocket::T2nUnixIOSocket(const std::string&)
247
248
249 void T2nUnixIOSocket::close(AsyncIo::Direction direction)
250 {
251     bool was_open= opened();
252     inherited::close(direction);
253     if (m_in_do_read and not opened())
254     {
255         m_may_reconnect= was_open;
256     }
257 } // eo T2nUnixIOSocket::close(AsyncIo::Direction)
258
259
260 bool T2nUnixIOSocket::reopen(bool force)
261 {
262     if (m_path.empty())
263     {
264         return false;
265     }
266     if (m_may_reconnect || force)
267     {
268         return inherited::open( m_path );
269     }
270     return false;
271 } // eo T2nUnixIOSocket::reopen()
272
273
274 void T2nUnixIOSocket::doRead()
275 {
276     m_in_do_read= true;
277     try
278     {
279         inherited::doRead();
280     }
281     catch (...)
282     {
283         m_in_do_read= false;
284         throw;
285     }
286     m_in_do_read= false;
287 } // eo T2nUnixIOSocket::doRead()
288
289
290 /**
291  * @brief server class for libt2n using unix domain sockets.
292  *
293  * well, it's enough to provide an appropriate constructor.
294  * (did i mention that templates are really cool stuff? :-) )
295  */
296 class T2NUnixServer
297 : public T2NServerBase
298 {
299     public:
300
301         T2NUnixServer(const std::string& path, int mode=0600)
302         : T2NServerBase( ServerSocketBaseImplementationPtr(
303             new UnixServerSocket<
304                 IOExportWrapper< UnixIOSocket >
305             >(path, mode)
306         ) )
307         {
308         } // eo T2NServerBase
309
310 }; // eo T2NUnixServer
311
312
313
314 class RealT2NClientConnection
315 : public T2NClientConnection
316 {
317     public:
318         RealT2NClientConnection( AsyncIo::IOImplementationPtr connection )
319         : T2NClientConnection(connection)
320         {
321         }
322 }; // eo class T2NClient
323
324 } // eo namespace <anonymous>
325
326
327
328 /*
329 ** implementation of T2NClientConnection
330 */
331
332
333 T2NClientConnection::T2NClientConnection(
334     IOImplementationPtr connection
335 )
336 : libt2n::client_connection()
337 , m_real_connection(connection)
338 , m_got_new_data(false)
339 {
340     SCOPETRACKER();
341     IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection);
342     if (!ptr)
343     {
344         module_logger().error(HERE) << "illegal pointer passed";
345         close();
346         return;
347     }
348     if (!connection->opened())
349     {
350         module_logger().warning(HERE) << "connection not open, either failed or already closed";
351         close();
352         return;
353     }
354
355     ptr->connectRead( boost::bind(&T2NClientConnection::newDataSlot, this) );
356     ptr->connectEof( boost::bind(&T2NClientConnection::eofSlot, this) );
357
358 } // eo T2NClientConnection::T2NClientConnection(IOImplementationPtr)
359
360
361 T2NClientConnection::~T2NClientConnection()
362 {
363     SCOPETRACKER();
364 } // eo T2NClientConnection::~T2NClientConnection
365
366
367 /**
368  * @brief returns if the connection is open.
369  * @return @a true if the connection is open.
370  */
371 bool T2NClientConnection::isOpen()
372 {
373     return m_real_connection and m_real_connection->opened();
374 } // eo T2NClientConnection::isOpen()
375
376
377 /**
378  * @brief try to reopen a connection.
379  * @return @a true if the connection was reopened.
380  */
381 bool T2NClientConnection::reopen(bool force)
382 {
383     if (not m_real_connection)
384     {
385         return false;
386     }
387     boost::shared_ptr< T2nUnixIOSocket > t2n_socket=
388         boost::dynamic_pointer_cast< T2nUnixIOSocket >(m_real_connection);
389     if (t2n_socket)
390     {
391         return t2n_socket->reopen(force);
392     }
393     return false;
394 } // eo T2NClientConnection::reopen()
395
396
397 /**
398  * @brief closes the connection.
399  *
400  * This closes the underlying IO connection and calls libt2n::server_connection::close() to
401  * mark the connection as closed for libt2n.
402  */
403 void T2NClientConnection::close()
404 {
405     SCOPETRACKER();
406     if (m_real_connection)
407     {
408         m_real_connection->close();
409         m_real_connection.reset();
410     }
411     libt2n::client_connection::close();
412 } // eo T2NClientConnection::close()
413
414
415 /**
416  * @brief sends a raw data chunk on the connection.
417  *
418  * @param data the (raw) data chunk which should be sended.
419  */
420 void T2NClientConnection::real_write(const std::string& data)
421 {
422     SCOPETRACKER();
423     if (is_closed())
424     {
425         module_logger().warning(HERE) << "attempt to write data on closed connection";
426         return;
427     }
428     IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
429     if (!ptr)
430     {
431         // should never happen...
432         module_logger().error(HERE)<< "illegal io pointer";
433         close();
434         //TODO: throw an error?!
435         NOT_REACHED();
436         return;
437     }
438     ptr->sendData(data);
439 } // eo T2NClientConnection::real_write(const std::string)
440
441
442 /**
443  * @brief called to fill the connection buffer.
444  *
445  * Since this class uses the asnychronous AsyncIo framework, new data may already be read when
446  * this method is called.
447  *
448  * @param usec_timeout 
449  * @param usec_timeout_remaining 
450  * @return @a true if new data is available.
451  */
452 bool T2NClientConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining)
453 {
454     SCOPETRACKER();
455     if (is_closed())
456     {
457         module_logger().debug(HERE) << "fill_buffer() called on closed connection";
458         return false;
459     }
460     AsyncIo::MilliTime t0,t1;
461     AsyncIo::get_current_monotonic_time(t0);
462     if (!m_got_new_data)
463     {
464         IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
465         if (!ptr)
466         {
467             module_logger().error(HERE) << "illegal io pointer";
468             close();
469             return false;
470         }
471         // try to fetch data (call the backend)
472         int timeout= 0;
473
474         if (usec_timeout<0)
475         {
476             timeout= -1;
477         }
478         else if (usec_timeout > 0)
479         {
480             long long msec_timeout= (usec_timeout + 500)/1000;
481
482             if (msec_timeout >= INT_MAX)
483             {
484                 timeout= INT_MAX;
485             }
486             else
487             {
488                 timeout= (int)msec_timeout;
489             }
490         }
491         Backend::getBackend()->doOneStep( timeout );
492     }
493     AsyncIo::get_current_monotonic_time(t1);
494     if (usec_timeout_remaining)
495     {
496         long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L;
497         *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L;
498         module_logger().debug() << "timeout: " << usec_timeout << " -> " << *usec_timeout_remaining;
499     }
500     if (m_got_new_data)
501     {
502         m_got_new_data= false;
503         return true;
504     }
505     return false;
506 } // eo T2NClientConnection::fill_buffer(long long,long long*)
507
508
509 /**
510  * @brief called when new data arrived on this connection.
511  *
512  * reads the new data from the underlying IO object and stores it in the connection buffer.
513  * Also remembers (in the bool member var @a m_got_new_data) that new data was received.
514  */
515 void T2NClientConnection::newDataSlot()
516 {
517     SCOPETRACKER();
518     IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
519     if (!ptr)
520     {
521         //TODO: throw an error?!
522         NOT_REACHED();
523         return;
524     }
525     std::string new_data= ptr->receiveData();
526     module_logger().debug() << "got " << new_data.size() << " bytes of new data";
527     buffer+= new_data;
528     m_got_new_data= true;
529 } // eo T2NClientConnection::newDataSlot()
530
531
532 /**
533  * @brief called when an EOF was detected by the underlying IO object (i.e. the connection
534  * was closed by the peer side).
535  *
536  * Calls close().
537  */
538 void T2NClientConnection::eofSlot()
539 {
540     SCOPETRACKER();
541     close();
542 } // eo T2NClientConnection::eofSlot()
543
544
545 /*
546 ** implementation of T2NServerConnection
547 */
548
549
550 T2NServerConnection::T2NServerConnection(
551     T2NServerBasePtr server,
552     IOImplementationPtr connection,
553     int timeout
554 )
555 : libt2n::server_connection(timeout)
556 , m_real_connection(connection)
557 , m_server_weak_ptr(server)
558 , m_got_new_data(false)
559 {
560     SCOPETRACKER();
561     IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection);
562     if (!ptr)
563     {
564         module_logger().error(HERE) << "illegal pointer passed";
565         close();
566         return;
567     }
568     if (!connection->opened())
569     {
570         module_logger().warning(HERE) << "connection not open, either failed or already closed";
571         close();
572         return;
573     }
574
575     ptr->connectRead( boost::bind(&T2NServerConnection::newDataSlot, this) );
576     ptr->connectEof( boost::bind(&T2NServerConnection::eofSlot, this) );
577
578 } // eo T2NServerConnection::T2NServerConnection(IOImplementationPtr)
579
580
581 T2NServerConnection::~T2NServerConnection()
582 {
583     SCOPETRACKER();
584 } // eo T2NServerConnection::~T2NServerConnection
585
586
587 /**
588  * @brief closes the connection.
589  *
590  * This closes the underlying IO connection and calls libt2n::server_connection::close() to
591  * mark the connection as closed for libt2n.
592  */
593 void T2NServerConnection::close()
594 {
595     SCOPETRACKER();
596     if (m_real_connection)
597     {
598         m_real_connection->close();
599         m_real_connection.reset();
600     }
601     libt2n::server_connection::close();
602 } // eo T2NServerConnection::close()
603
604
605 /**
606  * @brief sends a raw data chunk on the connection.
607  *
608  * @param data the (raw) data chunk which should be sended.
609  */
610 void T2NServerConnection::real_write(const std::string& data)
611 {
612     SCOPETRACKER();
613     if (is_closed())
614     {
615         module_logger().warning(HERE) << "attempt to write data on closed connection";
616         return;
617     }
618     IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
619     if (!ptr)
620     {
621         // should never happen...
622         module_logger().error(HERE)<< "illegal io pointer";
623         close();
624         //TODO: throw an error?!
625         NOT_REACHED();
626         return;
627     }
628     module_logger().debug() << "send " << data.size() << " bytes of data";
629     ptr->sendData(data);
630 } // eo T2NServerConnection::real_write(const std::string)
631
632
633 /**
634  * @brief called to fill the connection buffer.
635  *
636  * Since this class uses the asnychronous AsyncIo framework, new data may already be read when
637  * this method is called.
638  *
639  * @param wait determines if we need to wait; if @a false it is just checked if new data
640  *  was received, but no backend cycle is executed.
641  * @param usec_timeout 
642  * @param usec_timeout_remaining 
643  * @return @a true if new data is available.
644  */
645 bool T2NServerConnection::low_fill_buffer(bool wait, long long usec_timeout, long long* usec_timeout_remaining)
646 {
647     SCOPETRACKER();
648     if (is_closed())
649     {
650         module_logger().debug(HERE) << "fill_buffer() called on closed connection";
651         return false;
652     }
653     if (not m_got_new_data and wait)
654     {
655         AsyncIo::MilliTime t0,t1;
656         AsyncIo::get_current_monotonic_time(t0);
657         IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
658         if (!ptr)
659         {
660             module_logger().error(HERE) << "illegal io pointer";
661             close();
662             return false;
663         }
664         // try to fetch data (call the backend)
665         int timeout= 0;
666
667         if (usec_timeout<0)
668         {
669             timeout= -1;
670         }
671         else if (usec_timeout > 0)
672         {
673             long long msec_timeout= (usec_timeout + 500)/1000;
674
675             if (msec_timeout >= INT_MAX)
676             {
677                 timeout= INT_MAX;
678             }
679             else
680             {
681                 timeout= (int)msec_timeout;
682             }
683         }
684         Backend::getBackend()->doOneStep( timeout );
685         AsyncIo::get_current_monotonic_time(t1);
686         if (usec_timeout_remaining)
687         {
688             long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L;
689             *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L;
690         }
691     }
692     else
693     {
694         if (usec_timeout_remaining)
695         {
696             *usec_timeout_remaining= usec_timeout;
697         }
698     }
699     if (m_got_new_data)
700     {
701         m_got_new_data= false;
702         return true;
703     }
704     return false;
705 } // eo T2NServerConnection::low_fill_buffer(bool,long long,long long*)
706
707
708 /**
709  * @brief called to fill the connection buffer.
710  *
711  * Since this class uses the asnychronous AsyncIo framework, new data may already be read when
712  * this method is called.
713  *
714  * @param usec_timeout 
715  * @param usec_timeout_remaining 
716  * @return @a true if new data is available.
717  */
718 bool T2NServerConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining)
719 {
720     return low_fill_buffer(true, usec_timeout, usec_timeout_remaining);
721 } // eo T2NServerConnection::fill_buffer(long long,long long*)
722
723
724 /**
725  * @brief called when new data arrived on this connection.
726  *
727  * reads the new data from the underlying IO object and stores it in the connection buffer.
728  * Also remembers (in the bool member var @a m_got_new_data that new data was received.
729  */
730 void T2NServerConnection::newDataSlot()
731 {
732     SCOPETRACKER();
733     IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
734     if (!ptr)
735     {
736         //TODO:throw an error?!
737         NOT_REACHED();
738         return;
739     }
740     std::string new_data= ptr->receiveData();
741     buffer+= new_data;
742     module_logger().debug() << "got " << new_data.size() << " bytes of new data";
743     m_got_new_data= true;
744     reset_timeout();
745
746     T2NServerBasePtr server =m_server_weak_ptr.lock();
747     if (server)
748     {
749         server->m_signal_client_got_new_data();
750     }
751 } // eo T2NServerConnection::newDataSlot()
752
753
754 /**
755  * @brief called when an EOF was detected by the underlying IO object (i.e. the connection
756  * was closed by the peer side).
757  *
758  * Calls close().
759  */
760 void T2NServerConnection::eofSlot()
761 {
762     SCOPETRACKER();
763     close();
764 } // eo T2NServerConnection::eofSlot()
765
766
767
768 /*
769 ** implementation of T2NServerBase
770 */
771
772
773 /**
774  * @brief constructs a libt2n server object.
775  *
776  * @param server_port shared pointer to a (AsyncIo) port server object which
777  * is used as underlying port handler.
778  */
779 T2NServerBase::T2NServerBase( ServerSocketBaseImplementationPtr server_port)
780 : m_server_port(server_port)
781 , m_new_data_available(false)
782 {
783     SCOPETRACKER();
784     // register our callback for new incoming conncetions.
785     server_port->setNewConnectionBaseCallback(
786         boost::bind(&T2NServerBase::newConnectionSlot, this, _1)
787     );
788     m_signal_client_got_new_data.connect
789     (
790         boost::bind(&T2NServerBase::clientGotNewDataSlot, this)
791     );
792 } // eo T2NServerBase::T2NServerBase(ServerSocketBaseImplementationPtr)
793
794
795 /**
796  * @brief destructor.
797  *
798  */
799 T2NServerBase::~T2NServerBase()
800 {
801     SCOPETRACKER();
802 } // eo T2NServerBase::~T2NServerBase()
803
804
805 /**
806  * @brief returns wether the server port is opened.
807  *
808  * @return @a true iff the server port is open.
809  */
810 bool T2NServerBase::isOpen()
811 {
812     return (m_server_port && m_server_port->opened());
813 } // eo T2NServerBase
814
815
816
817 /**
818  * @brief callback for the port server object when a new connection is established.
819  *
820  * @param io_ptr the (shared) pointer to the new connection.
821  */
822 void T2NServerBase::newConnectionSlot(IOImplementationPtr io_ptr)
823 {
824     SCOPETRACKER();
825     add_connection( new T2NServerConnection( get_ptr_as< T2NServerBase >(), io_ptr, get_default_timeout() ) );
826 } // eo T2NServerBase::newConnectionSlot(IOImplementationPtr)
827
828
829 /**
830  * @brief callback for "new data available" signal
831  */
832 void T2NServerBase::clientGotNewDataSlot()
833 {
834     m_new_data_available= true;
835 } // eo T2NServerBase::clientGotNewDataSlot()
836
837
838 /**
839  * @brief try to fill the buffers of the managed connections.
840  *
841  * will be called by T2NServerBase::fill_buffer().
842  *
843  * @return @a true if at least one connection buffer has new data.
844  */
845 bool T2NServerBase::fill_connection_buffers()
846 {
847     SCOPETRACKER();
848     Backend::getBackend()->doOneStep(0);
849     bool result= false;
850     for(std::map<unsigned int, libt2n::server_connection*>::iterator it=connections.begin();
851         it != connections.end();
852         ++it)
853     {
854         T2NServerConnection *conn = dynamic_cast<T2NServerConnection*>(it->second);
855         if (!conn)
856         {
857             if (it->second)
858             {
859                 // react somehow if (it->second) is not NULL...
860                 module_logger().error(HERE) << "illegal connection pointer";
861                 it->second->close();
862             }
863             continue;
864         }
865         if ( conn->low_fill_buffer(false, 0) )
866         {
867             result= true;
868         }
869     }
870     return result;
871 } // eo T2NServerBase::fill_connection_buffers()
872
873
874 /**
875  * @brief fills the connection buffers.
876  *
877  * Uses the AsyncIo Backend to wait for new data.
878  *
879  * @param usec_timeout the maximum time period to wait for new data (in microseconds). 
880  *  0 returns immediately, -1 waits until some event occurred.
881  * @param timeout_remaining ignored!
882  * @return @a true if new data for at least one connection arrived.
883  *
884  * @note since this method uses the AsyncIo backend, the timeout will have only milli second
885  * resolution.
886  */
887 bool T2NServerBase::fill_buffer(long long usec_timeout, long long* timeout_remaining)
888 {
889     SCOPETRACKER();
890
891     if (m_new_data_available)
892     {
893         // short cut if we already know that we have new data:
894         m_new_data_available= false;
895         return true;
896     }
897
898     int timeout= 0;
899
900     if (usec_timeout<0)
901     {
902         timeout= -1;
903     }
904     else if (usec_timeout > 0)
905     {
906         long long msec_timeout= (usec_timeout + 500)/1000;
907
908         if (msec_timeout >= INT_MAX)
909         {
910             timeout= INT_MAX;
911         }
912         else
913         {
914             timeout= (int)msec_timeout;
915         }
916     }
917     // not really.. but it shouldn't be used either...
918     if (timeout_remaining) *timeout_remaining= 0L;
919
920     if (! fill_connection_buffers() && timeout>0)
921     {
922         bool had_activity= Backend::getBackend()->doOneStep( timeout );
923         return fill_connection_buffers();
924     }
925     return true;
926 } // to T2NServerBase::fill_buffer(long long,long long*)
927
928
929
930 /*
931 ** creator functions:
932 */
933
934
935 /**
936  * @brief creates a server object with unix domain server socket.
937  * @param path path of the unix domain socket.
938  * @param mode mode for the socket.
939  * @return shared pointer with the new server object; empty if none could be created..
940  */
941 T2NServerBasePtr createT2NUnixServerPort(const std::string& path, int mode)
942 {
943     SCOPETRACKER();
944     boost::shared_ptr< T2NUnixServer > result( new T2NUnixServer(path,mode) );
945     if (!result->isOpen())
946     {
947         module_logger().error(HERE)
948             << "failed to open unix domain server socket on \"" << path << "\"";
949     }
950     return result;
951 } // eo createT2NUnixServerPort(const std::string&,int)
952
953
954 /**
955  * @brief creates a client object connected to a server via unix daomain socket.
956  * @param path path of cthe unix domain socket.
957  * @return shared pointer with the new client object; empty if none could be created..
958  */
959 T2NClientConnectionPtr createT2NUnixClientConnection(const std::string& path)
960 {
961     typedef IOExportWrapper< AsyncIo::UnixIOSocket > MyIo;
962     typedef boost::shared_ptr< MyIo > MyIoPtr;
963     SCOPETRACKER();
964     MyIoPtr connection( new MyIo(path) );
965     boost::shared_ptr< RealT2NClientConnection > result( new RealT2NClientConnection( connection ) );
966     if (not result->isOpen())
967     {
968         module_logger().error(HERE)
969             << "failed to open unix domain client socket on \"" << path << "\"";
970         return T2NClientConnectionPtr();
971     }
972     return result;
973 } // eo createT2NUnixClientConnection(const std::string&)
974
975 } // eo namespace AsyncIo