boost::shared_dynamic_cast is deprecated and removed in recent versions of boost...
[libasyncio] / glue_t2n / asyncio_t2n.cpp
CommitLineData
8c15b8c7
TJ
1/*
2The software in this package is distributed under the GNU General
3Public License version 2 (with a special exception described below).
4
5A copy of GNU General Public License (GPL) is included in this distribution,
6in the file COPYING.GPL.
7
8As a special exception, if other files instantiate templates or use macros
9or inline functions from this file, or you compile this file and link it
10with other works to produce a work based on this file, this file
11does not by itself cause the resulting work to be covered
12by the GNU General Public License.
13
14However the source code for this file must still be made available
15in accordance with section (3) of the GNU General Public License.
16
17This exception does not invalidate any other reasons why a work based
18on this file might be covered by the GNU General Public License.
19*/
42b7c46d
RP
20/**
21 * @file
22 *
23 * @author Reinhard Pfau \<reinhard.pfau@intra2net.com\>
24 *
25 * @copyright &copy; Copyright 2008 by Intra2net AG
42b7c46d 26 */
aba4c34d 27#include "asyncio_t2n.hpp"
42b7c46d
RP
28
29#include <iostream>
30#include <boost/type_traits/is_base_of.hpp>
31#include <boost/static_assert.hpp>
32#include <boost/signal.hpp>
33#include <climits>
34#include <logfunc.hpp>
35#include <tracefunc.hpp>
36#include <log_macros.hpp>
37
38
39namespace AsyncIo
40{
41
aba4c34d 42using namespace Utils;
42b7c46d
RP
43
44namespace
45{
46
47
48Logger::PartLogger& module_logger()
49{
50 static Logger::PartLogger _module_logger(HERE);
51 return _module_logger;
52} // eo module_logger();
53
54
55
56/**
57 * @brief a class with some methods we like to have on our io classes.
58 *
59 * This class is to be used as second base in a wrapper class and needs it's methods to
60 * be redefined for "the real thing".
61 */
62class IOExportWrapperBase
63{
64 public:
65
66 IOExportWrapperBase()
67 {
68 }
69
70 virtual ~IOExportWrapperBase()
71 {
72 }
73
74 virtual void sendData(const std::string& data)
75 {
76 }
77
78
79 virtual std::string receiveData()
80 {
81 return std::string();
82 }
83
84 virtual boost::signals::connection connectEof( const boost::function< void() >& func )
85 {
86 return boost::signals::connection();
87 }
88
89 virtual boost::signals::connection connectRead( const boost::function< void() >& func )
90 {
91 return boost::signals::connection();
92 }
93
94}; // eo class IOExportWrapperBase
95
96
97typedef boost::shared_ptr< IOExportWrapperBase > IOExportWrapperBasePtr;
98
99
100/**
101 * @brief IO wrapper template.
102 * @tparam IOClass a type based on AsyncIo::IOImplementation
103 *
104 * The type is used as a public base for the resulting class; the second public base is our
105 * helper with the additional methods we need internally and which we (finally) define here.
106 */
107template<
108 class IOClass
109>
110class IOExportWrapper
111: public IOClass
112, public IOExportWrapperBase
113{
114 BOOST_STATIC_ASSERT(( boost::is_base_of< IOImplementation,IOClass >::value ));
115
116 public:
117 IOExportWrapper()
118 {
119 }
120
121 template<
122 typename Arg1
123 >
124 IOExportWrapper(Arg1 arg1)
125 : IOClass(arg1)
126 {}
127
128
129 template<
130 typename Arg1, typename Arg2
131 >
132 IOExportWrapper(Arg1 arg1, Arg2 arg2)
133 : IOClass(arg1,arg2)
134 {}
135
136
137 template<
138 typename Arg1, typename Arg2, typename Arg3
139 >
140 IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3)
141 : IOClass(arg1,arg2,arg3)
142 {}
143
144
145 template<
146 typename Arg1, typename Arg2, typename Arg3, typename Arg4
147 >
148 IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4)
149 : IOClass(arg1,arg2,arg3,arg4)
150 {}
151
152
153 template<
154 typename Arg1, typename Arg2, typename Arg3, typename Arg4, typename Arg5
155 >
156 IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4, Arg5 arg5)
157 : IOClass(arg1,arg2,arg3,arg4,arg5)
158 {}
159
160
161 /**
162 * @brief exposed funtion for sending data.
163 * @param data the chunk to be send.
164 */
165 virtual void sendData(const std::string& data)
166 {
167 IOClass::lowSend(data);
168 }
169
170 /**
171 * @brief returns the new received data.
172 * @return the receievd data.
173 *
174 * Clears the receive buffer.
175 */
176 virtual std::string receiveData()
177 {
178 std::string result;
179 result.swap(IOClass::m_input_buffer);
180 return result;
181 }
182
183 /**
184 * @brief exposed connect to EOF signal.
185 * @param func the function which should be connected to the eof signal.
186 * @return signal connection handle.
187 */
188 virtual boost::signals::connection connectEof( const boost::function< void() >& func )
189 {
190 return IOClass::m_signal_eof.connect(func);
191 }
192
193 /**
194 * @brief exposed connect to "read" signal.
195 * @param func the function which should be connected to the "read" signal.
196 * @return signal connection handle.
197 */
198 virtual boost::signals::connection connectRead( const boost::function< void() >& func )
199 {
200 return IOClass::m_signal_read.connect(func);
201 }
202
203 protected:
204
205}; // eo class IOExportWrapper
206
207
208/*
209** specialized versions of io classes:
210*/
211
212/**
213 * @brief enhanced unix domain socket class with reconnect feature.
214 *
215 * Used for t2n client connections.
216 */
217class T2nUnixIOSocket
218: public AsyncIo::UnixIOSocket
219{
220 typedef AsyncIo::UnixIOSocket inherited;
221 public:
222 T2nUnixIOSocket( const std::string& path );
223
224 virtual void close(AsyncIo::Direction direction = AsyncIo::Direction::both);
225
226 bool reopen(bool force= false);
227
228 protected:
229
230 virtual void doRead();
231
232
233 protected:
234
235 bool m_in_do_read;
236 bool m_may_reconnect;
237
238}; // T2nUnixIOSocket
239
240
241T2nUnixIOSocket::T2nUnixIOSocket(const std::string& path)
242: inherited(path)
243, m_in_do_read(false)
244, m_may_reconnect(false)
245{
246} // eo T2nUnixIOSocket::T2nUnixIOSocket(const std::string&)
247
248
249void T2nUnixIOSocket::close(AsyncIo::Direction direction)
250{
251 bool was_open= opened();
252 inherited::close(direction);
253 if (m_in_do_read and not opened())
254 {
255 m_may_reconnect= was_open;
256 }
257} // eo T2nUnixIOSocket::close(AsyncIo::Direction)
258
259
260bool T2nUnixIOSocket::reopen(bool force)
261{
262 if (m_path.empty())
263 {
264 return false;
265 }
266 if (m_may_reconnect || force)
267 {
268 return inherited::open( m_path );
269 }
270 return false;
271} // eo T2nUnixIOSocket::reopen()
272
273
274void T2nUnixIOSocket::doRead()
275{
276 m_in_do_read= true;
277 try
278 {
279 inherited::doRead();
280 }
281 catch (...)
282 {
283 m_in_do_read= false;
284 throw;
285 }
286 m_in_do_read= false;
287} // eo T2nUnixIOSocket::doRead()
288
289
290/**
291 * @brief server class for libt2n using unix domain sockets.
292 *
293 * well, it's enough to provide an appropriate constructor.
294 * (did i mention that templates are really cool stuff? :-) )
295 */
296class T2NUnixServer
297: public T2NServerBase
298{
299 public:
300
301 T2NUnixServer(const std::string& path, int mode=0600)
302 : T2NServerBase( ServerSocketBaseImplementationPtr(
303 new UnixServerSocket<
304 IOExportWrapper< UnixIOSocket >
305 >(path, mode)
306 ) )
307 {
308 } // eo T2NServerBase
309
310}; // eo T2NUnixServer
311
312
313
314class RealT2NClientConnection
315: public T2NClientConnection
316{
317 public:
318 RealT2NClientConnection( AsyncIo::IOImplementationPtr connection )
319 : T2NClientConnection(connection)
320 {
321 }
322}; // eo class T2NClient
323
324} // eo namespace <anonymous>
325
326
327
328/*
329** implementation of T2NClientConnection
330*/
331
332
333T2NClientConnection::T2NClientConnection(
334 IOImplementationPtr connection
335)
336: libt2n::client_connection()
337, m_real_connection(connection)
338, m_got_new_data(false)
339{
340 SCOPETRACKER();
341 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection);
342 if (!ptr)
343 {
344 module_logger().error(HERE) << "illegal pointer passed";
345 close();
346 return;
347 }
348 if (!connection->opened())
349 {
350 module_logger().warning(HERE) << "connection not open, either failed or already closed";
351 close();
352 return;
353 }
354
355 ptr->connectRead( boost::bind(&T2NClientConnection::newDataSlot, this) );
356 ptr->connectEof( boost::bind(&T2NClientConnection::eofSlot, this) );
357
358} // eo T2NClientConnection::T2NClientConnection(IOImplementationPtr)
359
360
361T2NClientConnection::~T2NClientConnection()
362{
363 SCOPETRACKER();
364} // eo T2NClientConnection::~T2NClientConnection
365
366
367/**
368 * @brief returns if the connection is open.
369 * @return @a true if the connection is open.
370 */
371bool T2NClientConnection::isOpen()
372{
373 return m_real_connection and m_real_connection->opened();
374} // eo T2NClientConnection::isOpen()
375
376
377/**
378 * @brief try to reopen a connection.
379 * @return @a true if the connection was reopened.
380 */
381bool T2NClientConnection::reopen(bool force)
382{
383 if (not m_real_connection)
384 {
385 return false;
386 }
387 boost::shared_ptr< T2nUnixIOSocket > t2n_socket=
d0290d78 388 boost::dynamic_pointer_cast< T2nUnixIOSocket >(m_real_connection);
42b7c46d
RP
389 if (t2n_socket)
390 {
391 return t2n_socket->reopen(force);
392 }
393 return false;
394} // eo T2NClientConnection::reopen()
395
396
397/**
398 * @brief closes the connection.
399 *
400 * This closes the underlying IO connection and calls libt2n::server_connection::close() to
401 * mark the connection as closed for libt2n.
402 */
403void T2NClientConnection::close()
404{
405 SCOPETRACKER();
406 if (m_real_connection)
407 {
408 m_real_connection->close();
409 m_real_connection.reset();
410 }
411 libt2n::client_connection::close();
412} // eo T2NClientConnection::close()
413
414
415/**
416 * @brief sends a raw data chunk on the connection.
417 *
418 * @param data the (raw) data chunk which should be sended.
419 */
420void T2NClientConnection::real_write(const std::string& data)
421{
422 SCOPETRACKER();
423 if (is_closed())
424 {
425 module_logger().warning(HERE) << "attempt to write data on closed connection";
426 return;
427 }
428 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
429 if (!ptr)
430 {
431 // should never happen...
432 module_logger().error(HERE)<< "illegal io pointer";
433 close();
434 //TODO: throw an error?!
435 NOT_REACHED();
436 return;
437 }
438 ptr->sendData(data);
439} // eo T2NClientConnection::real_write(const std::string)
440
441
442/**
443 * @brief called to fill the connection buffer.
444 *
445 * Since this class uses the asnychronous AsyncIo framework, new data may already be read when
446 * this method is called.
447 *
448 * @param usec_timeout
449 * @param usec_timeout_remaining
450 * @return @a true if new data is available.
451 */
452bool T2NClientConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining)
453{
454 SCOPETRACKER();
455 if (is_closed())
456 {
457 module_logger().debug(HERE) << "fill_buffer() called on closed connection";
458 return false;
459 }
460 AsyncIo::MilliTime t0,t1;
461 AsyncIo::get_current_monotonic_time(t0);
462 if (!m_got_new_data)
463 {
464 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
465 if (!ptr)
466 {
467 module_logger().error(HERE) << "illegal io pointer";
468 close();
469 return false;
470 }
471 // try to fetch data (call the backend)
472 int timeout= 0;
473
474 if (usec_timeout<0)
475 {
476 timeout= -1;
477 }
478 else if (usec_timeout > 0)
479 {
480 long long msec_timeout= (usec_timeout + 500)/1000;
481
482 if (msec_timeout >= INT_MAX)
483 {
484 timeout= INT_MAX;
485 }
486 else
487 {
488 timeout= (int)msec_timeout;
489 }
490 }
491 Backend::getBackend()->doOneStep( timeout );
492 }
493 AsyncIo::get_current_monotonic_time(t1);
494 if (usec_timeout_remaining)
495 {
496 long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L;
497 *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L;
498 module_logger().debug() << "timeout: " << usec_timeout << " -> " << *usec_timeout_remaining;
499 }
500 if (m_got_new_data)
501 {
502 m_got_new_data= false;
503 return true;
504 }
505 return false;
506} // eo T2NClientConnection::fill_buffer(long long,long long*)
507
508
509/**
510 * @brief called when new data arrived on this connection.
511 *
512 * reads the new data from the underlying IO object and stores it in the connection buffer.
513 * Also remembers (in the bool member var @a m_got_new_data) that new data was received.
514 */
515void T2NClientConnection::newDataSlot()
516{
517 SCOPETRACKER();
518 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
519 if (!ptr)
520 {
521 //TODO: throw an error?!
522 NOT_REACHED();
523 return;
524 }
525 std::string new_data= ptr->receiveData();
526 module_logger().debug() << "got " << new_data.size() << " bytes of new data";
527 buffer+= new_data;
528 m_got_new_data= true;
529} // eo T2NClientConnection::newDataSlot()
530
531
532/**
533 * @brief called when an EOF was detected by the underlying IO object (i.e. the connection
534 * was closed by the peer side).
535 *
536 * Calls close().
537 */
538void T2NClientConnection::eofSlot()
539{
540 SCOPETRACKER();
541 close();
542} // eo T2NClientConnection::eofSlot()
543
544
545/*
546** implementation of T2NServerConnection
547*/
548
549
550T2NServerConnection::T2NServerConnection(
551 T2NServerBasePtr server,
552 IOImplementationPtr connection,
553 int timeout
554)
555: libt2n::server_connection(timeout)
556, m_real_connection(connection)
557, m_server_weak_ptr(server)
558, m_got_new_data(false)
559{
560 SCOPETRACKER();
561 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection);
562 if (!ptr)
563 {
564 module_logger().error(HERE) << "illegal pointer passed";
565 close();
566 return;
567 }
568 if (!connection->opened())
569 {
570 module_logger().warning(HERE) << "connection not open, either failed or already closed";
571 close();
572 return;
573 }
574
575 ptr->connectRead( boost::bind(&T2NServerConnection::newDataSlot, this) );
576 ptr->connectEof( boost::bind(&T2NServerConnection::eofSlot, this) );
577
578} // eo T2NServerConnection::T2NServerConnection(IOImplementationPtr)
579
580
581T2NServerConnection::~T2NServerConnection()
582{
583 SCOPETRACKER();
584} // eo T2NServerConnection::~T2NServerConnection
585
586
587/**
588 * @brief closes the connection.
589 *
590 * This closes the underlying IO connection and calls libt2n::server_connection::close() to
591 * mark the connection as closed for libt2n.
592 */
593void T2NServerConnection::close()
594{
595 SCOPETRACKER();
596 if (m_real_connection)
597 {
598 m_real_connection->close();
599 m_real_connection.reset();
600 }
601 libt2n::server_connection::close();
602} // eo T2NServerConnection::close()
603
604
605/**
606 * @brief sends a raw data chunk on the connection.
607 *
608 * @param data the (raw) data chunk which should be sended.
609 */
610void T2NServerConnection::real_write(const std::string& data)
611{
612 SCOPETRACKER();
613 if (is_closed())
614 {
615 module_logger().warning(HERE) << "attempt to write data on closed connection";
616 return;
617 }
618 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
619 if (!ptr)
620 {
621 // should never happen...
622 module_logger().error(HERE)<< "illegal io pointer";
623 close();
624 //TODO: throw an error?!
625 NOT_REACHED();
626 return;
627 }
628 module_logger().debug() << "send " << data.size() << " bytes of data";
629 ptr->sendData(data);
630} // eo T2NServerConnection::real_write(const std::string)
631
632
633/**
634 * @brief called to fill the connection buffer.
635 *
636 * Since this class uses the asnychronous AsyncIo framework, new data may already be read when
637 * this method is called.
638 *
639 * @param wait determines if we need to wait; if @a false it is just checked if new data
640 * was received, but no backend cycle is executed.
641 * @param usec_timeout
642 * @param usec_timeout_remaining
643 * @return @a true if new data is available.
644 */
645bool T2NServerConnection::low_fill_buffer(bool wait, long long usec_timeout, long long* usec_timeout_remaining)
646{
647 SCOPETRACKER();
648 if (is_closed())
649 {
650 module_logger().debug(HERE) << "fill_buffer() called on closed connection";
651 return false;
652 }
653 if (not m_got_new_data and wait)
654 {
655 AsyncIo::MilliTime t0,t1;
656 AsyncIo::get_current_monotonic_time(t0);
657 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
658 if (!ptr)
659 {
660 module_logger().error(HERE) << "illegal io pointer";
661 close();
662 return false;
663 }
664 // try to fetch data (call the backend)
665 int timeout= 0;
666
667 if (usec_timeout<0)
668 {
669 timeout= -1;
670 }
671 else if (usec_timeout > 0)
672 {
673 long long msec_timeout= (usec_timeout + 500)/1000;
674
675 if (msec_timeout >= INT_MAX)
676 {
677 timeout= INT_MAX;
678 }
679 else
680 {
681 timeout= (int)msec_timeout;
682 }
683 }
684 Backend::getBackend()->doOneStep( timeout );
685 AsyncIo::get_current_monotonic_time(t1);
686 if (usec_timeout_remaining)
687 {
688 long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L;
689 *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L;
690 }
691 }
692 else
693 {
694 if (usec_timeout_remaining)
695 {
696 *usec_timeout_remaining= usec_timeout;
697 }
698 }
699 if (m_got_new_data)
700 {
701 m_got_new_data= false;
702 return true;
703 }
704 return false;
705} // eo T2NServerConnection::low_fill_buffer(bool,long long,long long*)
706
707
708/**
709 * @brief called to fill the connection buffer.
710 *
711 * Since this class uses the asnychronous AsyncIo framework, new data may already be read when
712 * this method is called.
713 *
714 * @param usec_timeout
715 * @param usec_timeout_remaining
716 * @return @a true if new data is available.
717 */
718bool T2NServerConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining)
719{
720 return low_fill_buffer(true, usec_timeout, usec_timeout_remaining);
721} // eo T2NServerConnection::fill_buffer(long long,long long*)
722
723
724/**
725 * @brief called when new data arrived on this connection.
726 *
727 * reads the new data from the underlying IO object and stores it in the connection buffer.
728 * Also remembers (in the bool member var @a m_got_new_data that new data was received.
729 */
730void T2NServerConnection::newDataSlot()
731{
732 SCOPETRACKER();
733 IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection);
734 if (!ptr)
735 {
736 //TODO:throw an error?!
737 NOT_REACHED();
738 return;
739 }
740 std::string new_data= ptr->receiveData();
741 buffer+= new_data;
742 module_logger().debug() << "got " << new_data.size() << " bytes of new data";
743 m_got_new_data= true;
744 reset_timeout();
745
746 T2NServerBasePtr server =m_server_weak_ptr.lock();
747 if (server)
748 {
749 server->m_signal_client_got_new_data();
750 }
751} // eo T2NServerConnection::newDataSlot()
752
753
754/**
755 * @brief called when an EOF was detected by the underlying IO object (i.e. the connection
756 * was closed by the peer side).
757 *
758 * Calls close().
759 */
760void T2NServerConnection::eofSlot()
761{
762 SCOPETRACKER();
763 close();
764} // eo T2NServerConnection::eofSlot()
765
766
767
768/*
769** implementation of T2NServerBase
770*/
771
772
773/**
774 * @brief constructs a libt2n server object.
775 *
776 * @param server_port shared pointer to a (AsyncIo) port server object which
777 * is used as underlying port handler.
778 */
779T2NServerBase::T2NServerBase( ServerSocketBaseImplementationPtr server_port)
780: m_server_port(server_port)
781, m_new_data_available(false)
782{
783 SCOPETRACKER();
784 // register our callback for new incoming conncetions.
785 server_port->setNewConnectionBaseCallback(
786 boost::bind(&T2NServerBase::newConnectionSlot, this, _1)
787 );
788 m_signal_client_got_new_data.connect
789 (
790 boost::bind(&T2NServerBase::clientGotNewDataSlot, this)
791 );
792} // eo T2NServerBase::T2NServerBase(ServerSocketBaseImplementationPtr)
793
794
795/**
796 * @brief destructor.
797 *
798 */
799T2NServerBase::~T2NServerBase()
800{
801 SCOPETRACKER();
802} // eo T2NServerBase::~T2NServerBase()
803
804
805/**
806 * @brief returns wether the server port is opened.
807 *
808 * @return @a true iff the server port is open.
809 */
810bool T2NServerBase::isOpen()
811{
812 return (m_server_port && m_server_port->opened());
813} // eo T2NServerBase
814
815
816
817/**
818 * @brief callback for the port server object when a new connection is established.
819 *
820 * @param io_ptr the (shared) pointer to the new connection.
821 */
822void T2NServerBase::newConnectionSlot(IOImplementationPtr io_ptr)
823{
824 SCOPETRACKER();
825 add_connection( new T2NServerConnection( get_ptr_as< T2NServerBase >(), io_ptr, get_default_timeout() ) );
826} // eo T2NServerBase::newConnectionSlot(IOImplementationPtr)
827
828
829/**
830 * @brief callback for "new data available" signal
831 */
832void T2NServerBase::clientGotNewDataSlot()
833{
834 m_new_data_available= true;
835} // eo T2NServerBase::clientGotNewDataSlot()
836
837
838/**
839 * @brief try to fill the buffers of the managed connections.
840 *
841 * will be called by T2NServerBase::fill_buffer().
842 *
843 * @return @a true if at least one connection buffer has new data.
844 */
845bool T2NServerBase::fill_connection_buffers()
846{
847 SCOPETRACKER();
848 Backend::getBackend()->doOneStep(0);
849 bool result= false;
850 for(std::map<unsigned int, libt2n::server_connection*>::iterator it=connections.begin();
851 it != connections.end();
852 ++it)
853 {
854 T2NServerConnection *conn = dynamic_cast<T2NServerConnection*>(it->second);
855 if (!conn)
856 {
857 if (it->second)
858 {
859 // react somehow if (it->second) is not NULL...
860 module_logger().error(HERE) << "illegal connection pointer";
861 it->second->close();
862 }
863 continue;
864 }
865 if ( conn->low_fill_buffer(false, 0) )
866 {
867 result= true;
868 }
869 }
870 return result;
871} // eo T2NServerBase::fill_connection_buffers()
872
873
874/**
875 * @brief fills the connection buffers.
876 *
877 * Uses the AsyncIo Backend to wait for new data.
878 *
879 * @param usec_timeout the maximum time period to wait for new data (in microseconds).
880 * 0 returns immediately, -1 waits until some event occurred.
881 * @param timeout_remaining ignored!
882 * @return @a true if new data for at least one connection arrived.
883 *
884 * @note since this method uses the AsyncIo backend, the timeout will have only milli second
885 * resolution.
886 */
887bool T2NServerBase::fill_buffer(long long usec_timeout, long long* timeout_remaining)
888{
889 SCOPETRACKER();
890
891 if (m_new_data_available)
892 {
893 // short cut if we already know that we have new data:
894 m_new_data_available= false;
895 return true;
896 }
897
898 int timeout= 0;
899
900 if (usec_timeout<0)
901 {
902 timeout= -1;
903 }
904 else if (usec_timeout > 0)
905 {
906 long long msec_timeout= (usec_timeout + 500)/1000;
907
908 if (msec_timeout >= INT_MAX)
909 {
910 timeout= INT_MAX;
911 }
912 else
913 {
914 timeout= (int)msec_timeout;
915 }
916 }
917 // not really.. but it shouldn't be used either...
918 if (timeout_remaining) *timeout_remaining= 0L;
919
920 if (! fill_connection_buffers() && timeout>0)
921 {
922 bool had_activity= Backend::getBackend()->doOneStep( timeout );
923 return fill_connection_buffers();
924 }
925 return true;
926} // to T2NServerBase::fill_buffer(long long,long long*)
927
928
929
930/*
931** creator functions:
932*/
933
934
935/**
936 * @brief creates a server object with unix domain server socket.
937 * @param path path of the unix domain socket.
938 * @param mode mode for the socket.
939 * @return shared pointer with the new server object; empty if none could be created..
940 */
941T2NServerBasePtr createT2NUnixServerPort(const std::string& path, int mode)
942{
943 SCOPETRACKER();
944 boost::shared_ptr< T2NUnixServer > result( new T2NUnixServer(path,mode) );
945 if (!result->isOpen())
946 {
947 module_logger().error(HERE)
948 << "failed to open unix domain server socket on \"" << path << "\"";
949 }
950 return result;
951} // eo createT2NUnixServerPort(const std::string&,int)
952
953
954/**
955 * @brief creates a client object connected to a server via unix daomain socket.
956 * @param path path of cthe unix domain socket.
957 * @return shared pointer with the new client object; empty if none could be created..
958 */
959T2NClientConnectionPtr createT2NUnixClientConnection(const std::string& path)
960{
961 typedef IOExportWrapper< AsyncIo::UnixIOSocket > MyIo;
962 typedef boost::shared_ptr< MyIo > MyIoPtr;
963 SCOPETRACKER();
964 MyIoPtr connection( new MyIo(path) );
965 boost::shared_ptr< RealT2NClientConnection > result( new RealT2NClientConnection( connection ) );
966 if (not result->isOpen())
967 {
968 module_logger().error(HERE)
969 << "failed to open unix domain client socket on \"" << path << "\"";
970 return T2NClientConnectionPtr();
971 }
972 return result;
973} // eo createT2NUnixClientConnection(const std::string&)
974
975} // eo namespace AsyncIo