a69494f3a2378cb25f913943ee60e1dc1a7e337b
[libasyncio] / asyncio / async_io.cpp
1 /** @file
2  *
3  *
4  * @copyright Copyright © 2007-2008 by Intra2net AG
5  * @license LGPL
6  * @contact info@intra2net.com
7  */
8
9 //#define NOISEDEBUG
10
11 #include "async_io.hpp"
12 #include <asyncio_config.hpp>
13
14 #include <list>
15 #include <vector>
16 #include <map>
17 #include <algorithm>
18 #include <utility>
19
20 #include <sys/poll.h>
21 #include <sys/time.h>
22 #include <sys/socket.h>
23 #include <unistd.h>
24 #include <errno.h>
25 #include <fcntl.h>
26
27 #include <boost/bind.hpp>
28
29 #ifdef HAVE_LIBI2NCOMMON
30 #  include <signalfunc.hpp>
31 //#  include <timefunc.hxx>
32 #else
33 #include <signal.h>
34 #endif
35
36 #include <iostream>
37
38 #ifdef NOISEDEBUG
39 #include <iostream>
40 #include <iomanip>
41 #define DOUT(msg) std::cout << msg << std::endl
42 #define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
43 //#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
44 #define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
45 #else
46 #define DOUT(msg) do {} while (0)
47 #define FODOUT(obj,msg) do {} while (0)
48 #define ODOUT(msg) do {} while (0)
49 #endif
50
51
52 namespace
53 {
54
55 using namespace AsyncIo::Utils;
56
57 /*
58  * configuration:
59  */
60
61
62 const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
63
64
65 /**
66  * contains internal helper structs and functions for io handling.
67  */
68 namespace internal_io
69 {
70
71 /**
72  * extends struct pollfd with some convenience functions
73  */
74 struct PollFd : public ::pollfd
75 {
76     PollFd()
77     {
78         fd= 0;
79         events= revents= 0;
80     } // eo PollFd
81
82
83     /**
84      * initializes the struct with a given file descriptor and clears the event mask(s).
85      * @param _fd 
86      */
87     PollFd(int _fd)
88     {
89         fd= _fd;
90         events= revents= 0;
91     } // eo PollFd
92
93
94     /**
95      * set that we want to be notified about new incoming data
96      */
97     void setPOLLIN()  { events |= POLLIN; }
98
99     /**
100      * set that we want to be notified if we can send (more) data.
101      */
102     void setPOLLOUT() { events |= POLLOUT; }
103
104 }; // eo struct PollFd
105
106
107 typedef std::vector<PollFd> PollVector;
108 typedef std::map<int,PollVector::size_type> FdPollMap;
109 typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap;
110
111
112 /**
113  * struct for interfacing our local structures with poll()
114  */
115 struct PollDataCluster
116 {
117     PollVector m_poll_vector;
118     FdPollMap  m_fd_poll_map;
119     FdIOMap    m_read_fd_io_map;
120     FdIOMap    m_write_fd_io_map;
121
122     void add_read_fd( int fd, AsyncIo::IOImplementation* io);
123     void add_write_fd( int fd, AsyncIo::IOImplementation* io);
124
125     pollfd* get_pollfd_ptr();
126     unsigned int get_num_pollfds() const;
127
128 }; // eo struct PollDataCluster
129
130
131 typedef PtrList< AsyncIo::IOImplementation, true > IOList;
132 typedef PtrList< AsyncIo::TimerBase, true >        TimerList;
133
134 template<> int IOList::GlobalCountType::InstanceCount= 0;
135 template<> int TimerList::GlobalCountType::InstanceCount= 0;
136
137
138 /**
139  * the (internal) global list of io objects (object pointers)
140  */
141 IOList& g_io_list()
142 {
143     static IOList _the_io_list;
144     return _the_io_list;
145 };
146
147
148 /**
149  * the (internal) global list of timer objects (object pointers)
150  */
151 TimerList& g_timer_list()
152 {
153     static TimerList _the_timer_list;
154     return _the_timer_list;
155 }
156
157 /*
158  * implementation of PollDataCluster
159  */
160
161
162 /**
163  * add a new file descriptor to the read list.
164  *
165  * @param fd the file descriptor.
166  * @param io the io object which uses the fd for reading.
167  */
168 void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io)
169 {
170     FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
171     if (itPollMap != m_fd_poll_map.end())
172     {
173         m_poll_vector[ itPollMap->second ].setPOLLIN();
174     }
175     else
176     {
177         PollFd item(fd);
178         item.setPOLLIN();
179         m_fd_poll_map[fd] = m_poll_vector.size();
180         m_poll_vector.push_back( item );
181     }
182     m_read_fd_io_map[fd]= io;
183 } // eo PollDataCluster::add_read_fd
184
185
186 /**
187  * add a new file descriptor to the write list.
188  *
189  * @param fd the file descriptor.
190  * @param io the io object which uses the fd for writing.
191  */
192 void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io)
193 {
194     FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
195     if (itPollMap != m_fd_poll_map.end())
196     {
197         m_poll_vector[ itPollMap->second ].setPOLLOUT();
198     }
199     else
200     {
201         PollFd item(fd);
202         item.setPOLLOUT();
203         m_fd_poll_map[fd] = m_poll_vector.size();
204         m_poll_vector.push_back( item );
205     }
206     m_write_fd_io_map[fd]= io;
207 } // eo PollDataCluster::add_write_fd
208
209
210 /**
211  * returns a pointer to a pollfd array; suitable for passing to poll()
212  *
213  * @return pointer to pollfd array
214  */
215 pollfd* PollDataCluster::get_pollfd_ptr()
216 {
217     return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
218 } // eo get_pollfd_ptr
219
220
221 /**
222  * returns the number of entries in the pollfd array; suitable for passing to poll()
223  *
224  * @return the number of entries in the pollfd array
225  */
226 unsigned int PollDataCluster::get_num_pollfds() const
227 {
228     return m_poll_vector.size();
229 } // eo get_num_pollfds
230
231
232
233 } // eo namespace internal_io
234
235
236 /*
237  * some internal tool functions and structures
238  */
239
240 struct FilterMatch {
241     AsyncIo::FilterBasePtr m_filter;
242
243     FilterMatch(AsyncIo::FilterBasePtr filter)
244     : m_filter(filter)
245     {}
246
247     bool operator () (const AsyncIo::FilterBasePtr& item)
248     {
249         return item && item == m_filter;
250     }
251
252 }; // eo struct FilterMatch
253
254
255
256 } // eo anonymous namespace
257
258
259
260
261 namespace AsyncIo
262 {
263
264
265 /*
266  * implementation of TimerBase
267  */
268
269 /**
270  * constructor. Adds the object to the internal timer list.
271  */
272 TimerBase::TimerBase()
273 : m_active(false)
274 , m_marked(false)
275 {
276     internal_io::g_timer_list().add_item(this);
277 } // eo TimerBase::TimerBase
278
279
280 /**
281  * destructor. Removes the object from the internal timer list.
282  */
283 TimerBase::~TimerBase()
284 {
285     ODOUT("enter");
286     if (internal_io::TimerList::Instances())
287     {
288         ODOUT("remove from list");
289         internal_io::g_timer_list().remove_item(this);
290     }
291 } // eo TimerBase::~TimerBase
292
293
294 /**
295  * @brief returns the point in time when the time is executed in real time.
296  * @return the point in time when the timer is to be executed.
297  */
298 MilliTime TimerBase::getRealWhenTime() const
299 {
300     MilliTime mono_time;
301     MilliTime real_time;
302     get_current_monotonic_time(mono_time);
303     get_current_real_time(real_time);
304     MilliTime result= m_when - mono_time + real_time;
305     return result;
306 } // eo TimerBase::getRealWhenTime() const
307
308
309 /**
310  * sets the time when the event should be executed.
311  * @param sec the seconds part of the point in time.
312  * @param msec  the milliseconds part of the point in time.
313  */
314 void TimerBase::setWhenTime(long sec, long msec)
315 {
316     m_when.set(sec,msec);
317     m_marked= false;
318 } // eo TimerBase::setWhenTime
319
320
321 /**
322  * sets the time when the event should be executed.
323  * @param mt the point in time.
324  */
325 void TimerBase::setWhenTime(const MilliTime& mt)
326 {
327     m_when= mt;
328     m_marked= false;
329 } // eo TimerBase::setWhenTime
330
331
332 /**
333  * sets the time delta measured from current time when the event should be executed.
334  * @param sec the seconds of the time delta
335  * @param msec the milli seconds of the time delta
336  */
337 void TimerBase::setDeltaWhenTime(long sec, long msec)
338 {
339     setDeltaWhenTime( MilliTime(sec,msec) );
340 } // eo TimerBase::setWhenTime
341
342
343
344 /**
345  * sets the time delta measured from current time when the event should be executed.
346  * @param mt the time delta
347  */
348 void TimerBase::setDeltaWhenTime(const MilliTime& mt)
349 {
350     get_current_monotonic_time(m_when);
351     m_when+= mt;
352     m_marked= false;
353 } // eo TimerBase::setWhenTime
354
355
356 /**
357  * set the active state of the timer event.
358  * @param active determines if the object should be active (default: yes).
359  */
360 void TimerBase::activate(bool active)
361 {
362     m_active = active;
363     if (!active)
364     {
365         // clear the mark if we are not active.
366         m_marked= false;
367     }
368 } // eo TimerBase::activate
369
370
371 /** @fn void TimerBase::deactivate()
372  *  deactivates the event by clearing the active state.
373  */
374
375
376 /**
377  * called when the timer event occured.
378  */
379 void TimerBase::execute()
380 {
381 } // eo TimerBase::execute
382
383
384 /*
385  * implementation of FilterBase class
386  */
387
388
389 FilterBase::FilterBase()
390 : m_io(NULL)
391 {
392 } // eo FilterBase::FilterBase()
393
394
395 /**
396  * injects incoming data.
397  * @param data  the new data
398  */
399 void FilterBase::injectIncomingData(const std::string& data)
400 {
401     if (m_io)
402     {
403         FilterBasePtr ptr= get_ptr_as< FilterBase >();
404         if (ptr)
405         {
406             m_io->injectIncomingData(ptr,data);
407         }
408     }
409 } // FilterBase::injectIncomingData(const std::string&)
410
411
412 /**
413  * injects outgoing data.
414  * @param data the new data
415  */
416 void FilterBase::injectOutgoingData(const std::string& data)
417 {
418     if (m_io)
419     {
420         FilterBasePtr ptr= get_ptr_as< FilterBase >();
421         if (ptr)
422         {
423             m_io->injectOutgoingData(ptr,data);
424         }
425     }
426 } // eo FilterBase::injectOutgoingData(const std::string&)
427
428
429
430 /**
431  * called when EOF detected on incoming channel (or incoming channel closed)
432  */
433 void FilterBase::endOfIncomingData()
434 {
435 } // eo FilterBase::endOfIncomingData()
436
437 /**
438  * called when the filter should reset.
439  * This is used when a new channel is opened or when the filter is taken out of a filter chain.
440  */
441 void FilterBase::reset()
442 {
443 } // eo FilterBase::reset()
444
445
446 /*
447  * implementation of IOImplementation class
448  */
449
450
451 /**
452  * constructor for the base io class.
453  *
454  * Also adds the object to internal list of io objects (which is used by the backend).
455  *
456  * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
457  * @param write_fd  the file descriptor which should be used for writing (default -1 for no value)
458  */
459 IOImplementation::IOImplementation(int read_fd, int write_fd)
460 : m_read_fd(-1)
461 , m_write_fd(-1)
462 , m_eof(false)
463 , m_not_writable(false)
464 , m_input_buffer()
465 , m_output_buffer()
466 , m_marked_for_reading(false)
467 , m_marked_for_writing(false)
468 {
469     internal_io::g_io_list().add_item(this);
470     if (read_fd >= 0)
471     {
472         setReadFd( read_fd );
473     }
474     if (write_fd >= 0)
475     {
476         setWriteFd( write_fd );
477     }
478 } // eo IOImplementation::IOImplementation
479
480
481 /**
482  * destructor of the base io class.
483  *
484  * Removes the object from the interal list of io objects.
485  */
486 IOImplementation::~IOImplementation()
487 {
488     close();
489     if (internal_io::IOList::Instances())
490     {
491         internal_io::g_io_list().remove_item(this);
492     }
493     // now clear the filters:
494     while (! m_filter_chain.empty() )
495     {
496         FilterChain::iterator it = m_filter_chain.begin();
497         (*it)->reset();
498         (*it)->m_io= NULL;
499         //TODO: signal the filter that it is removed ?!
500         m_filter_chain.erase(it);
501     }
502 } // eo IOImplementation::~IOImplementation
503
504
505 /**
506  * adds another filter to the filter chain.
507  * @param filter pointer to the new filter.
508  */
509 void IOImplementation::addFilter
510 (
511     FilterBasePtr filter
512 )
513 {
514     if (!filter)
515     {
516         return; // nothing to do
517     }
518     if (filter->m_io)
519     {
520         filter->m_io->removeFilter(filter);
521     }
522     m_filter_chain.push_back( filter );
523 } // eo IOImplementation::addFilter
524
525
526 /**
527  * removes a filter from the filter chain.
528  * @param filter the pointer to the filter which is removed.
529  * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
530  * deleting the filter if it was dynamically allocated.
531  */
532 void IOImplementation::removeFilter
533 (
534     FilterBasePtr filter
535 )
536 {
537     FilterChain::iterator it =
538         std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
539     if (it != m_filter_chain.end())
540     {
541         filter->reset();
542         filter->m_io= NULL;
543         //TODO: signal the filter that it is removed ?!
544         m_filter_chain.erase(it);
545     }
546 } // eo IOImplementation::removeFilter
547
548
549 /**
550  * closes the file descriptors (/ the connection).
551  *
552  * @param direction the direction which should be closed (default: @a Direction::both for all).
553  */
554 void IOImplementation::close(Direction direction)
555 {
556     bool had_read_fd= (m_read_fd >= 0);
557     m_errno= 0;
558     if (direction == Direction::unspecified) direction= Direction::both;
559     if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
560     { // special case: half closed (socket) connections...
561         // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
562         // (read_fd or write_fd) this class works as desired.
563         switch(direction)
564         {
565             case Direction::in:
566                 {
567                     int res= ::shutdown(m_read_fd, SHUT_RD);
568                     if (res<0) 
569                     {
570                         m_errno= errno;
571                     }
572                     m_read_fd= -1;
573                     if (!m_eof)
574                     {
575                         for(FilterChain::iterator it= m_filter_chain.begin();
576                             it != m_filter_chain.end();
577                             ++it)
578                         {
579                             (*it)->endOfIncomingData();
580                         }
581                     }
582                 }
583                 return;
584
585             case Direction::out:
586                 {
587                     int res= ::shutdown(m_write_fd, SHUT_WR);
588                     if (res<0)
589                     {
590                         m_errno= errno;
591                     }
592                     m_write_fd= -1;
593                     m_output_buffer.clear();
594                 }
595                 return;
596         }
597     }
598     if (m_write_fd >= 0 && (direction & Direction::out) )
599     {
600         int res1 = ::close(m_write_fd);
601         if (m_write_fd == m_read_fd)
602         {
603             m_read_fd= -1;
604         }
605         m_write_fd= -1;
606         m_output_buffer.clear();
607         if (res1<0) m_errno= errno;
608     }
609     if (m_read_fd >=0 && (direction & Direction::in) )
610     {
611         int res1 = ::close(m_read_fd);
612         m_read_fd= -1;
613         if (res1<0) m_errno= errno;
614     }
615     if (had_read_fd && !m_eof && (m_read_fd<0))
616     {
617         for(FilterChain::iterator it= m_filter_chain.begin();
618             it != m_filter_chain.end();
619             ++it)
620         {
621             (*it)->endOfIncomingData();
622         }
623     }
624 } // eo IOImplementation::close
625
626
627 /**
628  * determines if the io class wants to read data.
629  * Default implementation checks only for a valid file descriptor value.
630  *
631  * @return @a true if the objects wants to read data
632  */
633 bool IOImplementation::wantRead()
634 {
635     return (m_read_fd >= 0) && ! m_eof;
636 } // eo IOImplementation::wantRead
637
638
639 /**
640  * determines if the io class wants to write data.
641  * Default implementation checks for a valid file descriptor value and if the object
642  * cannot write data immediately.
643  *
644  * @return @a true if the objects wants to write data
645  */
646 bool IOImplementation::wantWrite()
647 {
648     return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
649 } // eo IOImplementation::wantWrite
650
651
652 /**
653  * delivers if opened.
654  * The default returns @a true if at least one file descriptor (read or write) is valid.
655  * @return @a true if opened.
656  */
657 bool IOImplementation::opened() const
658 {
659     return (m_read_fd>=0) || (m_write_fd>=0);
660 } // eo IOImplementation::opened() const
661
662
663 /**
664  * returns if the read side detected an end of file (EOF).
665  * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
666  */
667 bool IOImplementation::eof() const
668 {
669     return (m_read_fd < 0) || m_eof;
670 } // eo IOImplementatio::eof() const
671
672
673 /**
674  * @brief returns of the write side didn't detect that it cannot write.
675  * @return @a true if we can write.
676  */
677 bool IOImplementation::writable() const
678 {
679     return (m_write_fd >=0 ) and not m_not_writable;
680 } // eo IOImplementation::writable() const
681
682
683 /**
684  * returns if the output buffer is empty.
685  * @return 
686  */
687 bool IOImplementation::empty() const
688 {
689     return m_output_buffer.empty();
690 } // eo IOImplementation::empty
691
692 /**
693  * puts data into the output buffer and sends it immediately if possible,
694  *
695  * The data is passed through the filter chain before it's stored in the output buffer
696  * (i.e. the output buffer contains data as it should be send directly to the descriptor).
697  * @param _data the data which should be send.
698  */
699 void IOImplementation::lowSend(const std::string& _data)
700 {
701     std::string data(_data);
702
703     for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
704         it_filter!= m_filter_chain.rend();
705         ++it_filter)
706     {
707         data= (*it_filter)->filterOutgoingData(data);
708     }
709     m_output_buffer+= data;
710
711     // if we can send immediately, do it:
712     if (! m_output_buffer.empty() && m_marked_for_writing)
713     {
714         doWrite();
715     }
716 } // eo IOImplementation::lowSend
717
718
719 /**
720  * called by the backend when there is data to read for this object.
721  *
722  * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
723  * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
724  *
725  * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
726  *
727  * @note overload this method only when You know what You are doing!
728  * (overloading is necessary when handling server sockets.)
729  */
730 void IOImplementation::doRead()
731 {
732     // static read buffer; should be ok as long as we don't use threads
733     static char buffer[8*1024]; // 8 KiB
734
735     m_errno = 0;
736     if (m_read_fd<0 || !m_marked_for_reading)
737     {
738         ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
739         return;
740     }
741
742     // reset the mark:
743     m_marked_for_reading = false;
744
745     // now read the data:
746     ssize_t count;
747     count = ::read(m_read_fd, buffer, sizeof(buffer));
748
749     ODOUT("::read -> " << count);
750
751     // interpret what we got:
752     if (count < 0) // error
753     {
754         m_errno = errno;
755         int fd= m_read_fd;
756
757         switch(m_errno)
758         {
759             case EINVAL:
760             case EBADF:
761             case ECONNRESET:
762             case ENETRESET:
763                 if (fd == m_read_fd)
764                 {
765                     close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
766                 }
767                 break;
768         }
769     }
770     else if (count==0) // EOF
771     {
772         // remember the read fd:
773         int fd = m_read_fd;
774         // remember the EOF:
775         m_eof= true;
776         // signal EOF
777         m_signal_eof();
778         // if the fd is still the same: close it.
779         if (fd == m_read_fd)
780         {
781             close( Direction::in );
782         }
783     }
784     else // we have valid data
785     {
786         std::string data(buffer,count);
787         ODOUT(" got \"" << data << "\"");
788         for(FilterChain::iterator it_filter= m_filter_chain.begin();
789             it_filter != m_filter_chain.end();
790             ++it_filter)
791         {
792             data= (*it_filter)->filterIncomingData(data);
793         }
794         m_input_buffer+= data;
795         m_signal_read();
796     }
797 } // eo IOImplementation::doRead
798
799
800 /**
801  * interface for filter classes to inject data into the filter chain (emulating new incoming data).
802  * @param from_filter the filter which injects the new data.
803  * @param _data  the new data.
804  */
805 void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
806 {
807     FilterChain::iterator it_filter =
808         std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
809     if (it_filter == m_filter_chain.end())
810     {
811         // dont accept data inject from a unknown filter
812         return;
813     }
814     // well: pass the data through the remaining filters:
815     // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
816     std::string data(_data);
817     for(++it_filter;
818         it_filter != m_filter_chain.end();
819         ++it_filter)
820     {
821         data= (*it_filter)->filterIncomingData(data);
822     }
823     m_input_buffer+= data;
824     m_signal_read();
825 } // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
826
827
828 /**
829  * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
830  * @param from_filter the filter which injects the new data.
831  * @param _data  the new data.
832  */
833 void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
834 {
835     FilterChain::reverse_iterator it_filter =
836         std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
837     if (it_filter == m_filter_chain.rend())
838     {
839         // dont accept data inject from a unknown filter
840         return;
841     }
842     // well: pass the data through the remaining filters:
843     // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
844     std::string data(_data);
845     for(++it_filter;
846         it_filter!= m_filter_chain.rend();
847         ++it_filter)
848     {
849         data= (*it_filter)->filterOutgoingData(data);
850     }
851     m_output_buffer+= data;
852
853     // if we can send immediately, do it:
854     if (! m_output_buffer.empty() && m_marked_for_writing)
855     {
856         doWrite();
857     }
858 } // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
859
860
861 /**
862  * set the read file descriptor.
863  * Although a derived class can also set the read fd directly; this method should be used
864  * for this task since it updates some flags on the fd for async operation.
865  * @param fd the new read file descriptor.
866  */
867 void IOImplementation::setReadFd(int fd)
868 {
869     // test if we already have a valid descriptor (and may have to close it):
870     if (m_read_fd >=0 )
871     {
872         if (m_read_fd == fd)
873         {
874             // fd was already right; consider it to be ok.
875             return;
876         }
877         close(Direction::in);
878     }
879     // reset our errno:
880     m_errno= 0;
881     // if the new descriptor looks valid, set some flags:
882     if (fd >= 0)
883     {
884         long flags= ::fcntl(fd, F_GETFL);
885         if (flags != -1)
886         {
887             // set the flags for non blocking, async operation
888             flags |= O_NONBLOCK|O_ASYNC;
889             ::fcntl(fd,F_SETFL, flags);
890         }
891         else if ( errno == EBADF )
892         {
893             // well, we seemed to be fed with an invalid descriptor...:
894             m_errno = errno;
895             fd= -1;
896         }
897     }
898     if (fd >= 0) // if still valid:
899     {
900         // set the close-on-exec flag
901         ::fcntl(fd,F_SETFD, FD_CLOEXEC);
902     }
903     m_read_fd= fd;
904     m_marked_for_reading= false;
905     m_eof= false;
906 } // eo IOImplementation::setReadFd(int)
907
908
909
910 /**
911  * set the write file descriptor.
912  * Although a derived class can also set the write fd directly; this method should be used
913  * for this task since it updates some flags on the fd for async operation.
914  * @param fd the new write file descriptor.
915  */
916 void IOImplementation::setWriteFd(int fd)
917 {
918     if (m_write_fd >=0 )
919     {
920         if (m_write_fd == fd)
921         {
922             // fd was already right; consider it to be ok.
923             return;
924         }
925         close(Direction::out);
926     }
927     // reset our errno:
928     m_errno= 0;
929     // if the new descriptor looks valid, set some flags:
930     if (fd >= 0)
931     {
932         long flags= ::fcntl(fd, F_GETFL);
933         if (flags != -1)
934         {
935             // set the flags for non blocking, async operation
936             flags |= O_NONBLOCK|O_ASYNC;
937             ::fcntl(fd,F_SETFL, flags);
938         }
939         else if (errno == EBADF)
940         {
941             // well, we seemed to be fed with an invalid descriptor...:
942             m_errno = errno;
943             fd= -1;
944         }
945     }
946     if (fd >= 0) // if still valid:
947     {
948         // set the close-on-exec flag
949         ::fcntl(fd,F_SETFD, FD_CLOEXEC);
950     }
951     m_write_fd = fd;
952     m_marked_for_writing= false;
953     m_not_writable= false;
954 } // eo IOImplementation::setWriteFd(int)
955
956
957
958 /**
959  * called by the backend when this object can write data.
960  *
961  * If some data was sended, the signal @a m_signal_write is called.
962  *
963  * @internal tries to write all buffered data to output; if this succeeds,
964  * the connection is assumed to be still able to accept more data.
965  * (i.e. the internal write mark is kept!)
966  *
967   * @note overload this method only when You know what You are doing!
968 */
969 void IOImplementation::doWrite()
970 {
971     m_errno = 0;
972     if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
973     {
974         return;
975     }
976
977     ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
978
979     //reset mark:
980     m_marked_for_writing= false;
981
982     // now write the data
983     ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
984
985     ODOUT("::write -> " << count);
986
987     if (count < 0) // error
988     {
989         m_errno= errno;
990         int fd= m_write_fd;
991
992         switch(m_errno)
993         {
994             case EPIPE:
995                 m_not_writable= true;
996                 // emit a signal
997                 m_signal_not_writable();
998                 // fall through
999             case EINVAL:
1000             case EBADF:
1001             case ECONNRESET:
1002             case ENETRESET:
1003                 if (fd == m_write_fd)
1004                 {
1005                     close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
1006                 }
1007                 break;
1008         }
1009     }
1010     else
1011     {
1012         m_output_buffer.erase(0, count);
1013         if (m_output_buffer.empty())
1014         {
1015             // special case: if we were able to send all the data, we keep the write mark:
1016             m_marked_for_writing= true;
1017         }
1018     }
1019     if (count > 0)
1020     {
1021         m_signal_write();
1022     }
1023 } // eo IOImplementation::doWrite
1024
1025
1026 /*
1027  * implementation of SimpleIO
1028  */
1029
1030
1031 SimpleIO::SimpleIO(int read_fd, int write_fd)
1032 : inherited(read_fd, write_fd)
1033 {
1034     m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
1035 } // eo SimpleIO::SimpleIO()
1036
1037
1038 SimpleIO::~SimpleIO()
1039 {
1040 } // eo SimpleIO::~SimpleIO()
1041
1042
1043 /**
1044  * sends a string.
1045  * @param data the string.
1046  */
1047 void SimpleIO::sendString(const std::string& data)
1048 {
1049     lowSend(data);
1050 } // eo SimpleIO::sendString(const std::string&)
1051
1052
1053 /**
1054  * emits the signal signalReceived with the received data.
1055  * This slot is connected to IOImplementation::m_signal_read.
1056  */
1057 void SimpleIO::slotReceived()
1058 {
1059     std::string data;
1060     data.swap(m_input_buffer);
1061     signal_received_string(data);
1062 } // eo SimpleIO::slotReceived()
1063
1064
1065
1066 /*
1067  * implementation of SimpleIO2
1068  */
1069
1070
1071 SimpleIO2::SimpleIO2(int read_fd, int write_fd)
1072 : inherited(read_fd, write_fd)
1073 {
1074     m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
1075 } // eo SimpleIO2::SimpleIO2()
1076
1077
1078 SimpleIO2::~SimpleIO2()
1079 {
1080 } // eo SimpleIO2::~SimpleIO2()
1081
1082
1083 /**
1084  * sends a string.
1085  * @param data the string.
1086  */
1087 void SimpleIO2::sendString(const std::string& data)
1088 {
1089     lowSend(data);
1090 } // eo SimpleIO2::sendString(const std::string&)
1091
1092
1093 /**
1094  * emits the signal signalReceived with the received data.
1095  * This slot is connected to IOImplementation::m_signal_read.
1096  */
1097 void SimpleIO2::slotReceived()
1098 {
1099     std::string data;
1100     data.swap(m_input_buffer);
1101     signal_received_string(data);
1102 } // eo SimpleIO2::slotReceived()
1103
1104
1105
1106 /*
1107  * implementation of class Backend (singleton)
1108  */
1109
1110 Backend* Backend::g_backend= NULL;
1111
1112 int Backend::m_count_active_steps=0;
1113
1114
1115 Backend::Backend()
1116 : m_count_active_loops(0)
1117 , m_count_stop_requests(0)
1118 {
1119 #ifdef HAVE_LIBI2NCOMMON
1120     SystemTools::ignore_signal( SystemTools::Signal::PIPE );
1121 #else
1122     signal( SIGPIPE, SIG_IGN );
1123 #endif
1124 } // eo Backend::Backend
1125
1126
1127 Backend::~Backend()
1128 {
1129 #ifdef HAVE_LIBI2NCOMMON
1130     SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
1131 #else
1132     signal( SIGPIPE, SIG_DFL );
1133 #endif
1134 } // eo Backend::~Backend()
1135
1136 /**
1137  * delivers pointer to the current backend, instantiating a new backend if there was no current one.
1138  *
1139  * This should be the only way to access the backend which should be a singleton.
1140  *
1141  * @return the pointer to the current backend.
1142  */
1143 Backend* Backend::getBackend()
1144 {
1145     if (!g_backend)
1146     {
1147         g_backend = new Backend();
1148     }
1149     return g_backend;
1150 } // eo Backend::getBackend
1151
1152
1153
1154
1155 /**
1156  * performs one backend cycle.
1157  *
1158  * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
1159  * Also determines the timer events which become due and adjusts the timeout.
1160  * Constructs the necessary structures and calls poll().
1161  * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
1162  *
1163  * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
1164  * @return @a true if there was at least one active object; otherwise @a false
1165  *
1166  * @note this method is a little beast.
1167  *
1168  * @internal 
1169  * The cycle is divided into four steps: collecting; poll; mark and execute.
1170  * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
1171  * are calling @a Backup::doOneStep or open their own local backend loop.
1172  *
1173  * @todo handle some more error cases.
1174  * @todo provide a plugin interface for external handler.
1175  * (currently inclusion of external handler is possible by (ab)using timer classes)
1176  */
1177 bool Backend::doOneStep(int timeout)
1178 {
1179     ODOUT( "timeout=" << timeout );
1180     internal_io::PollDataCluster poll_data;
1181     bool had_active_object = false;
1182
1183     ++m_count_active_steps;
1184
1185     try {
1186
1187         FdSetType local_read_fds;
1188         FdSetType local_write_fds;
1189
1190         // step 1 ; collect
1191
1192         { // step 1.1: collect fds for read/write operations
1193             for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
1194                 itIOList != internal_io::g_io_list().end();
1195                 ++itIOList)
1196             {
1197                 if (! *itIOList) continue; // skip NULL entries
1198                 int  read_fd  = (*itIOList)->m_read_fd;
1199                 int  write_fd = (*itIOList)->m_write_fd;
1200                 bool want_read  = (read_fd >= 0) and (*itIOList)->wantRead();
1201                 bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
1202                 if (read_fd >= 0 )
1203                 {
1204                     local_read_fds.insert( read_fd );
1205                 }
1206                 if (write_fd >= 0)
1207                 {
1208                     local_write_fds.insert( write_fd );
1209                 }
1210                 if (!want_read && !want_write) continue;
1211                 if (want_read)
1212                 {
1213                     FODOUT( (*itIOList), "wants to read  (fd=" << read_fd << ")");
1214                     poll_data.add_read_fd(read_fd, *itIOList);
1215                 }
1216                 if (want_write)
1217                 {
1218                     FODOUT( (*itIOList), "wants to write  (fd=" << write_fd << ")");
1219                     poll_data.add_write_fd(write_fd, *itIOList);
1220                 }
1221                 had_active_object= true;
1222             }
1223         }
1224
1225         { // step 1.2: collect timer events
1226             MilliTime current_time;
1227             MilliTime min_event_time;
1228
1229             get_current_monotonic_time(current_time);
1230             bool min_event_time_set;
1231
1232             if (timeout >= 0)
1233             {
1234                 min_event_time = current_time + MilliTime(0,timeout);
1235                 min_event_time_set= true;
1236             }
1237             else
1238             {
1239                 min_event_time = current_time + MilliTime(86400,0);
1240                 min_event_time_set= false;
1241             }
1242             // TODO
1243
1244             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1245                 it_timer != internal_io::g_timer_list().end() 
1246                 && (!had_active_object || !min_event_time_set || current_time < min_event_time);
1247                 ++ it_timer)
1248             {
1249                 if (! *it_timer) continue; // skip NULL entries
1250                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1251                 if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
1252                 {
1253                     min_event_time = (*it_timer)->m_when;
1254                     min_event_time_set= true;
1255                 }
1256                 had_active_object= true;
1257             }
1258
1259             if (min_event_time_set)
1260             { // we have at a minimal event time, so (re)compute the timeout value:
1261                 MilliTime delta= (min_event_time - current_time);
1262                 long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
1263                 if (delta_ms <= 0L)
1264                 {
1265                     timeout= 0L;
1266                 }
1267                 else
1268                 {
1269                     timeout= delta_ms + (delta_ms<5 ? 1 : 3);
1270                 }
1271             }
1272         }
1273
1274         // step 2 : poll
1275         ODOUT(" poll timeout is " << timeout);
1276         {
1277             MilliTime current_time;
1278             get_current_monotonic_time(current_time);
1279             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1280         }
1281         int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
1282
1283         ODOUT("poll -> " << poll_result);
1284         {
1285             MilliTime current_time;
1286             get_current_monotonic_time(current_time);
1287             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1288         }
1289
1290         if (poll_result < 0)
1291         {
1292             //TODO poll error handling (signals ?!)
1293         }
1294
1295         // step 3 : mark
1296
1297         // step 3.1: mark io objects (if necessary)
1298         if (poll_result > 0)
1299         {
1300             for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
1301                 itPollItem != poll_data.m_poll_vector.end();
1302                 ++itPollItem)
1303             {
1304                 ODOUT("  fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
1305                 if ( 0 == (itPollItem->revents))
1306                 { // preliminary continuation if nothing is to handle for this item(/fd)...
1307                     continue;
1308                 }
1309                 if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
1310                 {
1311                     IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
1312                     if (io && io->m_read_fd==itPollItem->fd)
1313                     {
1314                         FODOUT(io,"marked for reading");
1315                         io->m_marked_for_reading= true;
1316                     }
1317                 }
1318                 if ( 0!= (itPollItem->revents & POLLOUT))
1319                 {
1320                     IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1321                     if (io && io->m_write_fd==itPollItem->fd)
1322                     {
1323                         io->m_marked_for_writing= true;
1324                     }
1325                 }
1326                 if ( 0!= (itPollItem->revents & POLLERR))
1327                 {
1328                     IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1329                     if (0!= (itPollItem->events & POLLOUT))
1330                     {
1331                         if (io && io->m_write_fd==itPollItem->fd)
1332                         {
1333                             io->m_marked_for_writing= false;
1334                             //io->close( Direction::out );
1335                         }
1336                     }
1337                 }
1338                 // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
1339             }
1340         }
1341
1342         //Step 3.2: mark timer objects
1343         {
1344             MilliTime current_time;
1345
1346             get_current_monotonic_time(current_time);
1347             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1348
1349             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1350                 it_timer != internal_io::g_timer_list().end();
1351                 ++ it_timer)
1352             {
1353                 ODOUT("  check timer " << *it_timer);
1354                 if (! *it_timer) continue; // skip NULL entries
1355                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1356                 if ( (*it_timer)->m_when <= current_time)
1357                 {
1358                     ODOUT("   ==> MARK");
1359                     (*it_timer)->m_marked = true;
1360                 }
1361             }
1362         }
1363
1364
1365         // step 4 : execute
1366
1367         // step 4.1: execute io
1368         ODOUT("execute stage");
1369         for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
1370             it_io != internal_io::g_io_list().end();
1371             ++ it_io)
1372         {
1373             ODOUT("  check obj " << *it_io);
1374             if (NULL == *it_io) continue;
1375             if ((*it_io)->m_marked_for_writing)
1376             {
1377                 FODOUT((*it_io),"exec doWrite");
1378                 (*it_io)->doWrite();
1379                 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1380                 if ((*it_io)->m_errno)
1381                 {
1382                     continue;
1383                 }
1384             }
1385             if ((*it_io)->m_marked_for_reading)
1386             {
1387                 FODOUT((*it_io),"exec doRead");
1388                 (*it_io)->doRead();
1389                 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1390                 if ((*it_io)->m_errno)
1391                 {
1392                     continue;
1393                 }
1394             }
1395         }
1396
1397         // step 4.2: execute timer events
1398         {
1399             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1400                 it_timer != internal_io::g_timer_list().end();
1401                 ++ it_timer)
1402             {
1403                 if (! *it_timer) continue; // skip NULL entries
1404                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1405                 if (! (*it_timer)->m_marked) continue; // skip if not marked
1406
1407                 // reset the mark and deactivate object now since the execute() method might activate it again
1408                 (*it_timer)->m_marked= false;
1409                 (*it_timer)->m_active= false;
1410
1411                 // now execute the event:
1412                 (*it_timer)->execute();
1413             }
1414         }
1415
1416     } // eo try
1417     catch(...)
1418     {
1419         // clean up our counter
1420         --m_count_active_steps;
1421         // and forward the exception
1422         throw;
1423     }
1424
1425     if ( 0 == --m_count_active_steps)
1426     {
1427         internal_io::g_io_list().clean_list();
1428         internal_io::g_timer_list().clean_list();
1429     }
1430
1431     return had_active_object;
1432 } // eo Backend::doOneStep
1433
1434
1435 /**
1436  * enters a backend loop.
1437  *
1438  * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
1439  * active objects (io objects or timer objects).
1440  */
1441 void Backend::run()
1442 {
1443     ++m_count_active_loops;
1444     do
1445     {
1446         try
1447         {
1448             if (!doOneStep(c_max_poll_wait))
1449             {
1450                 // stop if there are no more active objects.
1451                 stop();
1452             }
1453         }
1454         catch(...)
1455         {
1456             // clean up our counter
1457             --m_count_active_loops;
1458             // and forward the exception
1459             throw;
1460         }
1461     } 
1462     while (0 == m_count_stop_requests);
1463     --m_count_active_loops;
1464     --m_count_stop_requests;
1465 } // eo Backend::run
1466
1467
1468 /**
1469  * @brief stops the latest loop currently run by Backend::run().
1470  * @see Backend::run()
1471  */
1472 void Backend::stop()
1473 {
1474     if (m_count_active_loops)
1475     {
1476         ++m_count_stop_requests;
1477     }
1478 } // eo Backend::stop()
1479
1480
1481
1482 } // eo namespace AsyncIo