Commit | Line | Data |
---|---|---|
8c15b8c7 TJ |
1 | /* |
2 | The software in this package is distributed under the GNU General | |
3 | Public License version 2 (with a special exception described below). | |
4 | ||
5 | A copy of GNU General Public License (GPL) is included in this distribution, | |
6 | in the file COPYING.GPL. | |
7 | ||
8 | As a special exception, if other files instantiate templates or use macros | |
9 | or inline functions from this file, or you compile this file and link it | |
10 | with other works to produce a work based on this file, this file | |
11 | does not by itself cause the resulting work to be covered | |
12 | by the GNU General Public License. | |
13 | ||
14 | However the source code for this file must still be made available | |
15 | in accordance with section (3) of the GNU General Public License. | |
16 | ||
17 | This exception does not invalidate any other reasons why a work based | |
18 | on this file might be covered by the GNU General Public License. | |
19 | */ | |
42b7c46d RP |
20 | /** |
21 | * @file | |
22 | * | |
23 | * @author Reinhard Pfau \<reinhard.pfau@intra2net.com\> | |
24 | * | |
25 | * @copyright © Copyright 2008 by Intra2net AG | |
42b7c46d | 26 | */ |
aba4c34d | 27 | #include "asyncio_t2n.hpp" |
42b7c46d RP |
28 | |
29 | #include <iostream> | |
30 | #include <boost/type_traits/is_base_of.hpp> | |
31 | #include <boost/static_assert.hpp> | |
32 | #include <boost/signal.hpp> | |
33 | #include <climits> | |
34 | #include <logfunc.hpp> | |
35 | #include <tracefunc.hpp> | |
36 | #include <log_macros.hpp> | |
37 | ||
38 | ||
39 | namespace AsyncIo | |
40 | { | |
41 | ||
aba4c34d | 42 | using namespace Utils; |
42b7c46d RP |
43 | |
44 | namespace | |
45 | { | |
46 | ||
47 | ||
48 | Logger::PartLogger& module_logger() | |
49 | { | |
50 | static Logger::PartLogger _module_logger(HERE); | |
51 | return _module_logger; | |
52 | } // eo module_logger(); | |
53 | ||
54 | ||
55 | ||
56 | /** | |
57 | * @brief a class with some methods we like to have on our io classes. | |
58 | * | |
59 | * This class is to be used as second base in a wrapper class and needs it's methods to | |
60 | * be redefined for "the real thing". | |
61 | */ | |
62 | class IOExportWrapperBase | |
63 | { | |
64 | public: | |
65 | ||
66 | IOExportWrapperBase() | |
67 | { | |
68 | } | |
69 | ||
70 | virtual ~IOExportWrapperBase() | |
71 | { | |
72 | } | |
73 | ||
74 | virtual void sendData(const std::string& data) | |
75 | { | |
76 | } | |
77 | ||
78 | ||
79 | virtual std::string receiveData() | |
80 | { | |
81 | return std::string(); | |
82 | } | |
83 | ||
84 | virtual boost::signals::connection connectEof( const boost::function< void() >& func ) | |
85 | { | |
86 | return boost::signals::connection(); | |
87 | } | |
88 | ||
89 | virtual boost::signals::connection connectRead( const boost::function< void() >& func ) | |
90 | { | |
91 | return boost::signals::connection(); | |
92 | } | |
93 | ||
94 | }; // eo class IOExportWrapperBase | |
95 | ||
96 | ||
97 | typedef boost::shared_ptr< IOExportWrapperBase > IOExportWrapperBasePtr; | |
98 | ||
99 | ||
100 | /** | |
101 | * @brief IO wrapper template. | |
102 | * @tparam IOClass a type based on AsyncIo::IOImplementation | |
103 | * | |
104 | * The type is used as a public base for the resulting class; the second public base is our | |
105 | * helper with the additional methods we need internally and which we (finally) define here. | |
106 | */ | |
107 | template< | |
108 | class IOClass | |
109 | > | |
110 | class IOExportWrapper | |
111 | : public IOClass | |
112 | , public IOExportWrapperBase | |
113 | { | |
114 | BOOST_STATIC_ASSERT(( boost::is_base_of< IOImplementation,IOClass >::value )); | |
115 | ||
116 | public: | |
117 | IOExportWrapper() | |
118 | { | |
119 | } | |
120 | ||
121 | template< | |
122 | typename Arg1 | |
123 | > | |
124 | IOExportWrapper(Arg1 arg1) | |
125 | : IOClass(arg1) | |
126 | {} | |
127 | ||
128 | ||
129 | template< | |
130 | typename Arg1, typename Arg2 | |
131 | > | |
132 | IOExportWrapper(Arg1 arg1, Arg2 arg2) | |
133 | : IOClass(arg1,arg2) | |
134 | {} | |
135 | ||
136 | ||
137 | template< | |
138 | typename Arg1, typename Arg2, typename Arg3 | |
139 | > | |
140 | IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3) | |
141 | : IOClass(arg1,arg2,arg3) | |
142 | {} | |
143 | ||
144 | ||
145 | template< | |
146 | typename Arg1, typename Arg2, typename Arg3, typename Arg4 | |
147 | > | |
148 | IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4) | |
149 | : IOClass(arg1,arg2,arg3,arg4) | |
150 | {} | |
151 | ||
152 | ||
153 | template< | |
154 | typename Arg1, typename Arg2, typename Arg3, typename Arg4, typename Arg5 | |
155 | > | |
156 | IOExportWrapper(Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4, Arg5 arg5) | |
157 | : IOClass(arg1,arg2,arg3,arg4,arg5) | |
158 | {} | |
159 | ||
160 | ||
161 | /** | |
162 | * @brief exposed funtion for sending data. | |
163 | * @param data the chunk to be send. | |
164 | */ | |
165 | virtual void sendData(const std::string& data) | |
166 | { | |
167 | IOClass::lowSend(data); | |
168 | } | |
169 | ||
170 | /** | |
171 | * @brief returns the new received data. | |
172 | * @return the receievd data. | |
173 | * | |
174 | * Clears the receive buffer. | |
175 | */ | |
176 | virtual std::string receiveData() | |
177 | { | |
178 | std::string result; | |
179 | result.swap(IOClass::m_input_buffer); | |
180 | return result; | |
181 | } | |
182 | ||
183 | /** | |
184 | * @brief exposed connect to EOF signal. | |
185 | * @param func the function which should be connected to the eof signal. | |
186 | * @return signal connection handle. | |
187 | */ | |
188 | virtual boost::signals::connection connectEof( const boost::function< void() >& func ) | |
189 | { | |
190 | return IOClass::m_signal_eof.connect(func); | |
191 | } | |
192 | ||
193 | /** | |
194 | * @brief exposed connect to "read" signal. | |
195 | * @param func the function which should be connected to the "read" signal. | |
196 | * @return signal connection handle. | |
197 | */ | |
198 | virtual boost::signals::connection connectRead( const boost::function< void() >& func ) | |
199 | { | |
200 | return IOClass::m_signal_read.connect(func); | |
201 | } | |
202 | ||
203 | protected: | |
204 | ||
205 | }; // eo class IOExportWrapper | |
206 | ||
207 | ||
208 | /* | |
209 | ** specialized versions of io classes: | |
210 | */ | |
211 | ||
212 | /** | |
213 | * @brief enhanced unix domain socket class with reconnect feature. | |
214 | * | |
215 | * Used for t2n client connections. | |
216 | */ | |
217 | class T2nUnixIOSocket | |
218 | : public AsyncIo::UnixIOSocket | |
219 | { | |
220 | typedef AsyncIo::UnixIOSocket inherited; | |
221 | public: | |
222 | T2nUnixIOSocket( const std::string& path ); | |
223 | ||
224 | virtual void close(AsyncIo::Direction direction = AsyncIo::Direction::both); | |
225 | ||
226 | bool reopen(bool force= false); | |
227 | ||
228 | protected: | |
229 | ||
230 | virtual void doRead(); | |
231 | ||
232 | ||
233 | protected: | |
234 | ||
235 | bool m_in_do_read; | |
236 | bool m_may_reconnect; | |
237 | ||
238 | }; // T2nUnixIOSocket | |
239 | ||
240 | ||
241 | T2nUnixIOSocket::T2nUnixIOSocket(const std::string& path) | |
242 | : inherited(path) | |
243 | , m_in_do_read(false) | |
244 | , m_may_reconnect(false) | |
245 | { | |
246 | } // eo T2nUnixIOSocket::T2nUnixIOSocket(const std::string&) | |
247 | ||
248 | ||
249 | void T2nUnixIOSocket::close(AsyncIo::Direction direction) | |
250 | { | |
251 | bool was_open= opened(); | |
252 | inherited::close(direction); | |
253 | if (m_in_do_read and not opened()) | |
254 | { | |
255 | m_may_reconnect= was_open; | |
256 | } | |
257 | } // eo T2nUnixIOSocket::close(AsyncIo::Direction) | |
258 | ||
259 | ||
260 | bool T2nUnixIOSocket::reopen(bool force) | |
261 | { | |
262 | if (m_path.empty()) | |
263 | { | |
264 | return false; | |
265 | } | |
266 | if (m_may_reconnect || force) | |
267 | { | |
268 | return inherited::open( m_path ); | |
269 | } | |
270 | return false; | |
271 | } // eo T2nUnixIOSocket::reopen() | |
272 | ||
273 | ||
274 | void T2nUnixIOSocket::doRead() | |
275 | { | |
276 | m_in_do_read= true; | |
277 | try | |
278 | { | |
279 | inherited::doRead(); | |
280 | } | |
281 | catch (...) | |
282 | { | |
283 | m_in_do_read= false; | |
284 | throw; | |
285 | } | |
286 | m_in_do_read= false; | |
287 | } // eo T2nUnixIOSocket::doRead() | |
288 | ||
289 | ||
290 | /** | |
291 | * @brief server class for libt2n using unix domain sockets. | |
292 | * | |
293 | * well, it's enough to provide an appropriate constructor. | |
294 | * (did i mention that templates are really cool stuff? :-) ) | |
295 | */ | |
296 | class T2NUnixServer | |
297 | : public T2NServerBase | |
298 | { | |
299 | public: | |
300 | ||
301 | T2NUnixServer(const std::string& path, int mode=0600) | |
302 | : T2NServerBase( ServerSocketBaseImplementationPtr( | |
303 | new UnixServerSocket< | |
304 | IOExportWrapper< UnixIOSocket > | |
305 | >(path, mode) | |
306 | ) ) | |
307 | { | |
308 | } // eo T2NServerBase | |
309 | ||
310 | }; // eo T2NUnixServer | |
311 | ||
312 | ||
313 | ||
314 | class RealT2NClientConnection | |
315 | : public T2NClientConnection | |
316 | { | |
317 | public: | |
318 | RealT2NClientConnection( AsyncIo::IOImplementationPtr connection ) | |
319 | : T2NClientConnection(connection) | |
320 | { | |
321 | } | |
322 | }; // eo class T2NClient | |
323 | ||
324 | } // eo namespace <anonymous> | |
325 | ||
326 | ||
327 | ||
328 | /* | |
329 | ** implementation of T2NClientConnection | |
330 | */ | |
331 | ||
332 | ||
333 | T2NClientConnection::T2NClientConnection( | |
334 | IOImplementationPtr connection | |
335 | ) | |
336 | : libt2n::client_connection() | |
337 | , m_real_connection(connection) | |
338 | , m_got_new_data(false) | |
339 | { | |
340 | SCOPETRACKER(); | |
341 | IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection); | |
342 | if (!ptr) | |
343 | { | |
344 | module_logger().error(HERE) << "illegal pointer passed"; | |
345 | close(); | |
346 | return; | |
347 | } | |
348 | if (!connection->opened()) | |
349 | { | |
350 | module_logger().warning(HERE) << "connection not open, either failed or already closed"; | |
351 | close(); | |
352 | return; | |
353 | } | |
354 | ||
355 | ptr->connectRead( boost::bind(&T2NClientConnection::newDataSlot, this) ); | |
356 | ptr->connectEof( boost::bind(&T2NClientConnection::eofSlot, this) ); | |
357 | ||
358 | } // eo T2NClientConnection::T2NClientConnection(IOImplementationPtr) | |
359 | ||
360 | ||
361 | T2NClientConnection::~T2NClientConnection() | |
362 | { | |
363 | SCOPETRACKER(); | |
364 | } // eo T2NClientConnection::~T2NClientConnection | |
365 | ||
366 | ||
367 | /** | |
368 | * @brief returns if the connection is open. | |
369 | * @return @a true if the connection is open. | |
370 | */ | |
371 | bool T2NClientConnection::isOpen() | |
372 | { | |
373 | return m_real_connection and m_real_connection->opened(); | |
374 | } // eo T2NClientConnection::isOpen() | |
375 | ||
376 | ||
377 | /** | |
378 | * @brief try to reopen a connection. | |
379 | * @return @a true if the connection was reopened. | |
380 | */ | |
381 | bool T2NClientConnection::reopen(bool force) | |
382 | { | |
383 | if (not m_real_connection) | |
384 | { | |
385 | return false; | |
386 | } | |
387 | boost::shared_ptr< T2nUnixIOSocket > t2n_socket= | |
388 | boost::shared_dynamic_cast< T2nUnixIOSocket >(m_real_connection); | |
389 | if (t2n_socket) | |
390 | { | |
391 | return t2n_socket->reopen(force); | |
392 | } | |
393 | return false; | |
394 | } // eo T2NClientConnection::reopen() | |
395 | ||
396 | ||
397 | /** | |
398 | * @brief closes the connection. | |
399 | * | |
400 | * This closes the underlying IO connection and calls libt2n::server_connection::close() to | |
401 | * mark the connection as closed for libt2n. | |
402 | */ | |
403 | void T2NClientConnection::close() | |
404 | { | |
405 | SCOPETRACKER(); | |
406 | if (m_real_connection) | |
407 | { | |
408 | m_real_connection->close(); | |
409 | m_real_connection.reset(); | |
410 | } | |
411 | libt2n::client_connection::close(); | |
412 | } // eo T2NClientConnection::close() | |
413 | ||
414 | ||
415 | /** | |
416 | * @brief sends a raw data chunk on the connection. | |
417 | * | |
418 | * @param data the (raw) data chunk which should be sended. | |
419 | */ | |
420 | void T2NClientConnection::real_write(const std::string& data) | |
421 | { | |
422 | SCOPETRACKER(); | |
423 | if (is_closed()) | |
424 | { | |
425 | module_logger().warning(HERE) << "attempt to write data on closed connection"; | |
426 | return; | |
427 | } | |
428 | IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); | |
429 | if (!ptr) | |
430 | { | |
431 | // should never happen... | |
432 | module_logger().error(HERE)<< "illegal io pointer"; | |
433 | close(); | |
434 | //TODO: throw an error?! | |
435 | NOT_REACHED(); | |
436 | return; | |
437 | } | |
438 | ptr->sendData(data); | |
439 | } // eo T2NClientConnection::real_write(const std::string) | |
440 | ||
441 | ||
442 | /** | |
443 | * @brief called to fill the connection buffer. | |
444 | * | |
445 | * Since this class uses the asnychronous AsyncIo framework, new data may already be read when | |
446 | * this method is called. | |
447 | * | |
448 | * @param usec_timeout | |
449 | * @param usec_timeout_remaining | |
450 | * @return @a true if new data is available. | |
451 | */ | |
452 | bool T2NClientConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining) | |
453 | { | |
454 | SCOPETRACKER(); | |
455 | if (is_closed()) | |
456 | { | |
457 | module_logger().debug(HERE) << "fill_buffer() called on closed connection"; | |
458 | return false; | |
459 | } | |
460 | AsyncIo::MilliTime t0,t1; | |
461 | AsyncIo::get_current_monotonic_time(t0); | |
462 | if (!m_got_new_data) | |
463 | { | |
464 | IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); | |
465 | if (!ptr) | |
466 | { | |
467 | module_logger().error(HERE) << "illegal io pointer"; | |
468 | close(); | |
469 | return false; | |
470 | } | |
471 | // try to fetch data (call the backend) | |
472 | int timeout= 0; | |
473 | ||
474 | if (usec_timeout<0) | |
475 | { | |
476 | timeout= -1; | |
477 | } | |
478 | else if (usec_timeout > 0) | |
479 | { | |
480 | long long msec_timeout= (usec_timeout + 500)/1000; | |
481 | ||
482 | if (msec_timeout >= INT_MAX) | |
483 | { | |
484 | timeout= INT_MAX; | |
485 | } | |
486 | else | |
487 | { | |
488 | timeout= (int)msec_timeout; | |
489 | } | |
490 | } | |
491 | Backend::getBackend()->doOneStep( timeout ); | |
492 | } | |
493 | AsyncIo::get_current_monotonic_time(t1); | |
494 | if (usec_timeout_remaining) | |
495 | { | |
496 | long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L; | |
497 | *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L; | |
498 | module_logger().debug() << "timeout: " << usec_timeout << " -> " << *usec_timeout_remaining; | |
499 | } | |
500 | if (m_got_new_data) | |
501 | { | |
502 | m_got_new_data= false; | |
503 | return true; | |
504 | } | |
505 | return false; | |
506 | } // eo T2NClientConnection::fill_buffer(long long,long long*) | |
507 | ||
508 | ||
509 | /** | |
510 | * @brief called when new data arrived on this connection. | |
511 | * | |
512 | * reads the new data from the underlying IO object and stores it in the connection buffer. | |
513 | * Also remembers (in the bool member var @a m_got_new_data) that new data was received. | |
514 | */ | |
515 | void T2NClientConnection::newDataSlot() | |
516 | { | |
517 | SCOPETRACKER(); | |
518 | IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); | |
519 | if (!ptr) | |
520 | { | |
521 | //TODO: throw an error?! | |
522 | NOT_REACHED(); | |
523 | return; | |
524 | } | |
525 | std::string new_data= ptr->receiveData(); | |
526 | module_logger().debug() << "got " << new_data.size() << " bytes of new data"; | |
527 | buffer+= new_data; | |
528 | m_got_new_data= true; | |
529 | } // eo T2NClientConnection::newDataSlot() | |
530 | ||
531 | ||
532 | /** | |
533 | * @brief called when an EOF was detected by the underlying IO object (i.e. the connection | |
534 | * was closed by the peer side). | |
535 | * | |
536 | * Calls close(). | |
537 | */ | |
538 | void T2NClientConnection::eofSlot() | |
539 | { | |
540 | SCOPETRACKER(); | |
541 | close(); | |
542 | } // eo T2NClientConnection::eofSlot() | |
543 | ||
544 | ||
545 | /* | |
546 | ** implementation of T2NServerConnection | |
547 | */ | |
548 | ||
549 | ||
550 | T2NServerConnection::T2NServerConnection( | |
551 | T2NServerBasePtr server, | |
552 | IOImplementationPtr connection, | |
553 | int timeout | |
554 | ) | |
555 | : libt2n::server_connection(timeout) | |
556 | , m_real_connection(connection) | |
557 | , m_server_weak_ptr(server) | |
558 | , m_got_new_data(false) | |
559 | { | |
560 | SCOPETRACKER(); | |
561 | IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(connection); | |
562 | if (!ptr) | |
563 | { | |
564 | module_logger().error(HERE) << "illegal pointer passed"; | |
565 | close(); | |
566 | return; | |
567 | } | |
568 | if (!connection->opened()) | |
569 | { | |
570 | module_logger().warning(HERE) << "connection not open, either failed or already closed"; | |
571 | close(); | |
572 | return; | |
573 | } | |
574 | ||
575 | ptr->connectRead( boost::bind(&T2NServerConnection::newDataSlot, this) ); | |
576 | ptr->connectEof( boost::bind(&T2NServerConnection::eofSlot, this) ); | |
577 | ||
578 | } // eo T2NServerConnection::T2NServerConnection(IOImplementationPtr) | |
579 | ||
580 | ||
581 | T2NServerConnection::~T2NServerConnection() | |
582 | { | |
583 | SCOPETRACKER(); | |
584 | } // eo T2NServerConnection::~T2NServerConnection | |
585 | ||
586 | ||
587 | /** | |
588 | * @brief closes the connection. | |
589 | * | |
590 | * This closes the underlying IO connection and calls libt2n::server_connection::close() to | |
591 | * mark the connection as closed for libt2n. | |
592 | */ | |
593 | void T2NServerConnection::close() | |
594 | { | |
595 | SCOPETRACKER(); | |
596 | if (m_real_connection) | |
597 | { | |
598 | m_real_connection->close(); | |
599 | m_real_connection.reset(); | |
600 | } | |
601 | libt2n::server_connection::close(); | |
602 | } // eo T2NServerConnection::close() | |
603 | ||
604 | ||
605 | /** | |
606 | * @brief sends a raw data chunk on the connection. | |
607 | * | |
608 | * @param data the (raw) data chunk which should be sended. | |
609 | */ | |
610 | void T2NServerConnection::real_write(const std::string& data) | |
611 | { | |
612 | SCOPETRACKER(); | |
613 | if (is_closed()) | |
614 | { | |
615 | module_logger().warning(HERE) << "attempt to write data on closed connection"; | |
616 | return; | |
617 | } | |
618 | IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); | |
619 | if (!ptr) | |
620 | { | |
621 | // should never happen... | |
622 | module_logger().error(HERE)<< "illegal io pointer"; | |
623 | close(); | |
624 | //TODO: throw an error?! | |
625 | NOT_REACHED(); | |
626 | return; | |
627 | } | |
628 | module_logger().debug() << "send " << data.size() << " bytes of data"; | |
629 | ptr->sendData(data); | |
630 | } // eo T2NServerConnection::real_write(const std::string) | |
631 | ||
632 | ||
633 | /** | |
634 | * @brief called to fill the connection buffer. | |
635 | * | |
636 | * Since this class uses the asnychronous AsyncIo framework, new data may already be read when | |
637 | * this method is called. | |
638 | * | |
639 | * @param wait determines if we need to wait; if @a false it is just checked if new data | |
640 | * was received, but no backend cycle is executed. | |
641 | * @param usec_timeout | |
642 | * @param usec_timeout_remaining | |
643 | * @return @a true if new data is available. | |
644 | */ | |
645 | bool T2NServerConnection::low_fill_buffer(bool wait, long long usec_timeout, long long* usec_timeout_remaining) | |
646 | { | |
647 | SCOPETRACKER(); | |
648 | if (is_closed()) | |
649 | { | |
650 | module_logger().debug(HERE) << "fill_buffer() called on closed connection"; | |
651 | return false; | |
652 | } | |
653 | if (not m_got_new_data and wait) | |
654 | { | |
655 | AsyncIo::MilliTime t0,t1; | |
656 | AsyncIo::get_current_monotonic_time(t0); | |
657 | IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); | |
658 | if (!ptr) | |
659 | { | |
660 | module_logger().error(HERE) << "illegal io pointer"; | |
661 | close(); | |
662 | return false; | |
663 | } | |
664 | // try to fetch data (call the backend) | |
665 | int timeout= 0; | |
666 | ||
667 | if (usec_timeout<0) | |
668 | { | |
669 | timeout= -1; | |
670 | } | |
671 | else if (usec_timeout > 0) | |
672 | { | |
673 | long long msec_timeout= (usec_timeout + 500)/1000; | |
674 | ||
675 | if (msec_timeout >= INT_MAX) | |
676 | { | |
677 | timeout= INT_MAX; | |
678 | } | |
679 | else | |
680 | { | |
681 | timeout= (int)msec_timeout; | |
682 | } | |
683 | } | |
684 | Backend::getBackend()->doOneStep( timeout ); | |
685 | AsyncIo::get_current_monotonic_time(t1); | |
686 | if (usec_timeout_remaining) | |
687 | { | |
688 | long long delta= ((long long)(t1 - t0).get_milliseconds())* 1000L; | |
689 | *usec_timeout_remaining= (usec_timeout > delta ) ? (usec_timeout - delta) : 0L; | |
690 | } | |
691 | } | |
692 | else | |
693 | { | |
694 | if (usec_timeout_remaining) | |
695 | { | |
696 | *usec_timeout_remaining= usec_timeout; | |
697 | } | |
698 | } | |
699 | if (m_got_new_data) | |
700 | { | |
701 | m_got_new_data= false; | |
702 | return true; | |
703 | } | |
704 | return false; | |
705 | } // eo T2NServerConnection::low_fill_buffer(bool,long long,long long*) | |
706 | ||
707 | ||
708 | /** | |
709 | * @brief called to fill the connection buffer. | |
710 | * | |
711 | * Since this class uses the asnychronous AsyncIo framework, new data may already be read when | |
712 | * this method is called. | |
713 | * | |
714 | * @param usec_timeout | |
715 | * @param usec_timeout_remaining | |
716 | * @return @a true if new data is available. | |
717 | */ | |
718 | bool T2NServerConnection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining) | |
719 | { | |
720 | return low_fill_buffer(true, usec_timeout, usec_timeout_remaining); | |
721 | } // eo T2NServerConnection::fill_buffer(long long,long long*) | |
722 | ||
723 | ||
724 | /** | |
725 | * @brief called when new data arrived on this connection. | |
726 | * | |
727 | * reads the new data from the underlying IO object and stores it in the connection buffer. | |
728 | * Also remembers (in the bool member var @a m_got_new_data that new data was received. | |
729 | */ | |
730 | void T2NServerConnection::newDataSlot() | |
731 | { | |
732 | SCOPETRACKER(); | |
733 | IOExportWrapperBasePtr ptr = boost::dynamic_pointer_cast< IOExportWrapperBase >(m_real_connection); | |
734 | if (!ptr) | |
735 | { | |
736 | //TODO:throw an error?! | |
737 | NOT_REACHED(); | |
738 | return; | |
739 | } | |
740 | std::string new_data= ptr->receiveData(); | |
741 | buffer+= new_data; | |
742 | module_logger().debug() << "got " << new_data.size() << " bytes of new data"; | |
743 | m_got_new_data= true; | |
744 | reset_timeout(); | |
745 | ||
746 | T2NServerBasePtr server =m_server_weak_ptr.lock(); | |
747 | if (server) | |
748 | { | |
749 | server->m_signal_client_got_new_data(); | |
750 | } | |
751 | } // eo T2NServerConnection::newDataSlot() | |
752 | ||
753 | ||
754 | /** | |
755 | * @brief called when an EOF was detected by the underlying IO object (i.e. the connection | |
756 | * was closed by the peer side). | |
757 | * | |
758 | * Calls close(). | |
759 | */ | |
760 | void T2NServerConnection::eofSlot() | |
761 | { | |
762 | SCOPETRACKER(); | |
763 | close(); | |
764 | } // eo T2NServerConnection::eofSlot() | |
765 | ||
766 | ||
767 | ||
768 | /* | |
769 | ** implementation of T2NServerBase | |
770 | */ | |
771 | ||
772 | ||
773 | /** | |
774 | * @brief constructs a libt2n server object. | |
775 | * | |
776 | * @param server_port shared pointer to a (AsyncIo) port server object which | |
777 | * is used as underlying port handler. | |
778 | */ | |
779 | T2NServerBase::T2NServerBase( ServerSocketBaseImplementationPtr server_port) | |
780 | : m_server_port(server_port) | |
781 | , m_new_data_available(false) | |
782 | { | |
783 | SCOPETRACKER(); | |
784 | // register our callback for new incoming conncetions. | |
785 | server_port->setNewConnectionBaseCallback( | |
786 | boost::bind(&T2NServerBase::newConnectionSlot, this, _1) | |
787 | ); | |
788 | m_signal_client_got_new_data.connect | |
789 | ( | |
790 | boost::bind(&T2NServerBase::clientGotNewDataSlot, this) | |
791 | ); | |
792 | } // eo T2NServerBase::T2NServerBase(ServerSocketBaseImplementationPtr) | |
793 | ||
794 | ||
795 | /** | |
796 | * @brief destructor. | |
797 | * | |
798 | */ | |
799 | T2NServerBase::~T2NServerBase() | |
800 | { | |
801 | SCOPETRACKER(); | |
802 | } // eo T2NServerBase::~T2NServerBase() | |
803 | ||
804 | ||
805 | /** | |
806 | * @brief returns wether the server port is opened. | |
807 | * | |
808 | * @return @a true iff the server port is open. | |
809 | */ | |
810 | bool T2NServerBase::isOpen() | |
811 | { | |
812 | return (m_server_port && m_server_port->opened()); | |
813 | } // eo T2NServerBase | |
814 | ||
815 | ||
816 | ||
817 | /** | |
818 | * @brief callback for the port server object when a new connection is established. | |
819 | * | |
820 | * @param io_ptr the (shared) pointer to the new connection. | |
821 | */ | |
822 | void T2NServerBase::newConnectionSlot(IOImplementationPtr io_ptr) | |
823 | { | |
824 | SCOPETRACKER(); | |
825 | add_connection( new T2NServerConnection( get_ptr_as< T2NServerBase >(), io_ptr, get_default_timeout() ) ); | |
826 | } // eo T2NServerBase::newConnectionSlot(IOImplementationPtr) | |
827 | ||
828 | ||
829 | /** | |
830 | * @brief callback for "new data available" signal | |
831 | */ | |
832 | void T2NServerBase::clientGotNewDataSlot() | |
833 | { | |
834 | m_new_data_available= true; | |
835 | } // eo T2NServerBase::clientGotNewDataSlot() | |
836 | ||
837 | ||
838 | /** | |
839 | * @brief try to fill the buffers of the managed connections. | |
840 | * | |
841 | * will be called by T2NServerBase::fill_buffer(). | |
842 | * | |
843 | * @return @a true if at least one connection buffer has new data. | |
844 | */ | |
845 | bool T2NServerBase::fill_connection_buffers() | |
846 | { | |
847 | SCOPETRACKER(); | |
848 | Backend::getBackend()->doOneStep(0); | |
849 | bool result= false; | |
850 | for(std::map<unsigned int, libt2n::server_connection*>::iterator it=connections.begin(); | |
851 | it != connections.end(); | |
852 | ++it) | |
853 | { | |
854 | T2NServerConnection *conn = dynamic_cast<T2NServerConnection*>(it->second); | |
855 | if (!conn) | |
856 | { | |
857 | if (it->second) | |
858 | { | |
859 | // react somehow if (it->second) is not NULL... | |
860 | module_logger().error(HERE) << "illegal connection pointer"; | |
861 | it->second->close(); | |
862 | } | |
863 | continue; | |
864 | } | |
865 | if ( conn->low_fill_buffer(false, 0) ) | |
866 | { | |
867 | result= true; | |
868 | } | |
869 | } | |
870 | return result; | |
871 | } // eo T2NServerBase::fill_connection_buffers() | |
872 | ||
873 | ||
874 | /** | |
875 | * @brief fills the connection buffers. | |
876 | * | |
877 | * Uses the AsyncIo Backend to wait for new data. | |
878 | * | |
879 | * @param usec_timeout the maximum time period to wait for new data (in microseconds). | |
880 | * 0 returns immediately, -1 waits until some event occurred. | |
881 | * @param timeout_remaining ignored! | |
882 | * @return @a true if new data for at least one connection arrived. | |
883 | * | |
884 | * @note since this method uses the AsyncIo backend, the timeout will have only milli second | |
885 | * resolution. | |
886 | */ | |
887 | bool T2NServerBase::fill_buffer(long long usec_timeout, long long* timeout_remaining) | |
888 | { | |
889 | SCOPETRACKER(); | |
890 | ||
891 | if (m_new_data_available) | |
892 | { | |
893 | // short cut if we already know that we have new data: | |
894 | m_new_data_available= false; | |
895 | return true; | |
896 | } | |
897 | ||
898 | int timeout= 0; | |
899 | ||
900 | if (usec_timeout<0) | |
901 | { | |
902 | timeout= -1; | |
903 | } | |
904 | else if (usec_timeout > 0) | |
905 | { | |
906 | long long msec_timeout= (usec_timeout + 500)/1000; | |
907 | ||
908 | if (msec_timeout >= INT_MAX) | |
909 | { | |
910 | timeout= INT_MAX; | |
911 | } | |
912 | else | |
913 | { | |
914 | timeout= (int)msec_timeout; | |
915 | } | |
916 | } | |
917 | // not really.. but it shouldn't be used either... | |
918 | if (timeout_remaining) *timeout_remaining= 0L; | |
919 | ||
920 | if (! fill_connection_buffers() && timeout>0) | |
921 | { | |
922 | bool had_activity= Backend::getBackend()->doOneStep( timeout ); | |
923 | return fill_connection_buffers(); | |
924 | } | |
925 | return true; | |
926 | } // to T2NServerBase::fill_buffer(long long,long long*) | |
927 | ||
928 | ||
929 | ||
930 | /* | |
931 | ** creator functions: | |
932 | */ | |
933 | ||
934 | ||
935 | /** | |
936 | * @brief creates a server object with unix domain server socket. | |
937 | * @param path path of the unix domain socket. | |
938 | * @param mode mode for the socket. | |
939 | * @return shared pointer with the new server object; empty if none could be created.. | |
940 | */ | |
941 | T2NServerBasePtr createT2NUnixServerPort(const std::string& path, int mode) | |
942 | { | |
943 | SCOPETRACKER(); | |
944 | boost::shared_ptr< T2NUnixServer > result( new T2NUnixServer(path,mode) ); | |
945 | if (!result->isOpen()) | |
946 | { | |
947 | module_logger().error(HERE) | |
948 | << "failed to open unix domain server socket on \"" << path << "\""; | |
949 | } | |
950 | return result; | |
951 | } // eo createT2NUnixServerPort(const std::string&,int) | |
952 | ||
953 | ||
954 | /** | |
955 | * @brief creates a client object connected to a server via unix daomain socket. | |
956 | * @param path path of cthe unix domain socket. | |
957 | * @return shared pointer with the new client object; empty if none could be created.. | |
958 | */ | |
959 | T2NClientConnectionPtr createT2NUnixClientConnection(const std::string& path) | |
960 | { | |
961 | typedef IOExportWrapper< AsyncIo::UnixIOSocket > MyIo; | |
962 | typedef boost::shared_ptr< MyIo > MyIoPtr; | |
963 | SCOPETRACKER(); | |
964 | MyIoPtr connection( new MyIo(path) ); | |
965 | boost::shared_ptr< RealT2NClientConnection > result( new RealT2NClientConnection( connection ) ); | |
966 | if (not result->isOpen()) | |
967 | { | |
968 | module_logger().error(HERE) | |
969 | << "failed to open unix domain client socket on \"" << path << "\""; | |
970 | return T2NClientConnectionPtr(); | |
971 | } | |
972 | return result; | |
973 | } // eo createT2NUnixClientConnection(const std::string&) | |
974 | ||
975 | } // eo namespace AsyncIo |