95d916b3a56dcffa11e28cedb859e009607d5915
[libasyncio] / asyncio / async_io.cpp
1 /** @file
2  *
3  *
4  * @copyright Copyright © 2007-2008 by Intra2net AG
5  * @license LGPL
6  * @contact info@intra2net.com
7  */
8
9 //#define NOISEDEBUG
10
11 #include "async_io.hpp"
12
13 #include <list>
14 #include <vector>
15 #include <map>
16 #include <algorithm>
17 #include <utility>
18
19 #include <sys/poll.h>
20 #include <sys/time.h>
21 #include <sys/socket.h>
22 #include <unistd.h>
23 #include <errno.h>
24 #include <fcntl.h>
25
26 #include <boost/bind.hpp>
27
28 #include <signalfunc.hpp>
29 #include <timefunc.hxx>
30
31 #include <iostream>
32
33 #ifdef NOISEDEBUG
34 #include <iostream>
35 #include <iomanip>
36 #define DOUT(msg) std::cout << msg << std::endl
37 #define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
38 //#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
39 #define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
40 #else
41 #define DOUT(msg) do {} while (0)
42 #define FODOUT(obj,msg) do {} while (0)
43 #define ODOUT(msg) do {} while (0)
44 #endif
45
46
47 namespace
48 {
49
50 using namespace AsyncIo::Utils;
51
52 /*
53  * configuration:
54  */
55
56
57 const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
58
59
60 /**
61  * contains internal helper structs and functions for io handling.
62  */
63 namespace internal_io
64 {
65
66 /**
67  * extends struct pollfd with some convenience functions
68  */
69 struct PollFd : public ::pollfd
70 {
71     PollFd()
72     {
73         fd= 0;
74         events= revents= 0;
75     } // eo PollFd
76
77
78     /**
79      * initializes the struct with a given file descriptor and clears the event mask(s).
80      * @param _fd 
81      */
82     PollFd(int _fd)
83     {
84         fd= _fd;
85         events= revents= 0;
86     } // eo PollFd
87
88
89     /**
90      * set that we want to be notified about new incoming data
91      */
92     void setPOLLIN()  { events |= POLLIN; }
93
94     /**
95      * set that we want to be notified if we can send (more) data.
96      */
97     void setPOLLOUT() { events |= POLLOUT; }
98
99 }; // eo struct PollFd
100
101
102 typedef std::vector<PollFd> PollVector;
103 typedef std::map<int,PollVector::size_type> FdPollMap;
104 typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap;
105
106
107 /**
108  * struct for interfacing our local structures with poll()
109  */
110 struct PollDataCluster
111 {
112     PollVector m_poll_vector;
113     FdPollMap  m_fd_poll_map;
114     FdIOMap    m_read_fd_io_map;
115     FdIOMap    m_write_fd_io_map;
116
117     void add_read_fd( int fd, AsyncIo::IOImplementation* io);
118     void add_write_fd( int fd, AsyncIo::IOImplementation* io);
119
120     pollfd* get_pollfd_ptr();
121     unsigned int get_num_pollfds() const;
122
123 }; // eo struct PollDataCluster
124
125
126 typedef PtrList< AsyncIo::IOImplementation, true > IOList;
127 typedef PtrList< AsyncIo::TimerBase, true >        TimerList;
128
129 template<> int IOList::GlobalCountType::InstanceCount= 0;
130 template<> int TimerList::GlobalCountType::InstanceCount= 0;
131
132
133 /**
134  * the (internal) global list of io objects (object pointers)
135  */
136 IOList& g_io_list()
137 {
138     static IOList _the_io_list;
139     return _the_io_list;
140 };
141
142
143 /**
144  * the (internal) global list of timer objects (object pointers)
145  */
146 TimerList& g_timer_list()
147 {
148     static TimerList _the_timer_list;
149     return _the_timer_list;
150 }
151
152 /*
153  * implementation of PollDataCluster
154  */
155
156
157 /**
158  * add a new file descriptor to the read list.
159  *
160  * @param fd the file descriptor.
161  * @param io the io object which uses the fd for reading.
162  */
163 void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io)
164 {
165     FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
166     if (itPollMap != m_fd_poll_map.end())
167     {
168         m_poll_vector[ itPollMap->second ].setPOLLIN();
169     }
170     else
171     {
172         PollFd item(fd);
173         item.setPOLLIN();
174         m_fd_poll_map[fd] = m_poll_vector.size();
175         m_poll_vector.push_back( item );
176     }
177     m_read_fd_io_map[fd]= io;
178 } // eo PollDataCluster::add_read_fd
179
180
181 /**
182  * add a new file descriptor to the write list.
183  *
184  * @param fd the file descriptor.
185  * @param io the io object which uses the fd for writing.
186  */
187 void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io)
188 {
189     FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
190     if (itPollMap != m_fd_poll_map.end())
191     {
192         m_poll_vector[ itPollMap->second ].setPOLLOUT();
193     }
194     else
195     {
196         PollFd item(fd);
197         item.setPOLLOUT();
198         m_fd_poll_map[fd] = m_poll_vector.size();
199         m_poll_vector.push_back( item );
200     }
201     m_write_fd_io_map[fd]= io;
202 } // eo PollDataCluster::add_write_fd
203
204
205 /**
206  * returns a pointer to a pollfd array; suitable for passing to poll()
207  *
208  * @return pointer to pollfd array
209  */
210 pollfd* PollDataCluster::get_pollfd_ptr()
211 {
212     return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
213 } // eo get_pollfd_ptr
214
215
216 /**
217  * returns the number of entries in the pollfd array; suitable for passing to poll()
218  *
219  * @return the number of entries in the pollfd array
220  */
221 unsigned int PollDataCluster::get_num_pollfds() const
222 {
223     return m_poll_vector.size();
224 } // eo get_num_pollfds
225
226
227
228 } // eo namespace internal_io
229
230
231 /*
232  * some internal tool functions and structures
233  */
234
235 struct FilterMatch {
236     AsyncIo::FilterBasePtr m_filter;
237
238     FilterMatch(AsyncIo::FilterBasePtr filter)
239     : m_filter(filter)
240     {}
241
242     bool operator () (const AsyncIo::FilterBasePtr& item)
243     {
244         return item && item == m_filter;
245     }
246
247 }; // eo struct FilterMatch
248
249
250
251 } // eo anonymous namespace
252
253
254
255
256 namespace AsyncIo
257 {
258
259
260 /*
261  * implementation of TimerBase
262  */
263
264 /**
265  * constructor. Adds the object to the internal timer list.
266  */
267 TimerBase::TimerBase()
268 : m_active(false)
269 , m_marked(false)
270 {
271     internal_io::g_timer_list().add_item(this);
272 } // eo TimerBase::TimerBase
273
274
275 /**
276  * destructor. Removes the object from the internal timer list.
277  */
278 TimerBase::~TimerBase()
279 {
280     ODOUT("enter");
281     if (internal_io::TimerList::Instances())
282     {
283         ODOUT("remove from list");
284         internal_io::g_timer_list().remove_item(this);
285     }
286 } // eo TimerBase::~TimerBase
287
288
289 /**
290  * @brief returns the point in time when the time is executed in real time.
291  * @return the point in time when the timer is to be executed.
292  */
293 MilliTime TimerBase::getRealWhenTime() const
294 {
295     MilliTime mono_time;
296     MilliTime real_time;
297     get_current_monotonic_time(mono_time);
298     get_current_real_time(real_time);
299     MilliTime result= m_when - mono_time + real_time;
300     return result;
301 } // eo TimerBase::getRealWhenTime() const
302
303
304 /**
305  * sets the time when the event should be executed.
306  * @param sec the seconds part of the point in time.
307  * @param msec  the milliseconds part of the point in time.
308  */
309 void TimerBase::setWhenTime(long sec, long msec)
310 {
311     m_when.set(sec,msec);
312     m_marked= false;
313 } // eo TimerBase::setWhenTime
314
315
316 /**
317  * sets the time when the event should be executed.
318  * @param mt the point in time.
319  */
320 void TimerBase::setWhenTime(const MilliTime& mt)
321 {
322     m_when= mt;
323     m_marked= false;
324 } // eo TimerBase::setWhenTime
325
326
327 /**
328  * sets the time delta measured from current time when the event should be executed.
329  * @param sec the seconds of the time delta
330  * @param msec the milli seconds of the time delta
331  */
332 void TimerBase::setDeltaWhenTime(long sec, long msec)
333 {
334     setDeltaWhenTime( MilliTime(sec,msec) );
335 } // eo TimerBase::setWhenTime
336
337
338
339 /**
340  * sets the time delta measured from current time when the event should be executed.
341  * @param mt the time delta
342  */
343 void TimerBase::setDeltaWhenTime(const MilliTime& mt)
344 {
345     get_current_monotonic_time(m_when);
346     m_when+= mt;
347     m_marked= false;
348 } // eo TimerBase::setWhenTime
349
350
351 /**
352  * set the active state of the timer event.
353  * @param active determines if the object should be active (default: yes).
354  */
355 void TimerBase::activate(bool active)
356 {
357     m_active = active;
358     if (!active)
359     {
360         // clear the mark if we are not active.
361         m_marked= false;
362     }
363 } // eo TimerBase::activate
364
365
366 /** @fn void TimerBase::deactivate()
367  *  deactivates the event by clearing the active state.
368  */
369
370
371 /**
372  * called when the timer event occured.
373  */
374 void TimerBase::execute()
375 {
376 } // eo TimerBase::execute
377
378
379 /*
380  * implementation of FilterBase class
381  */
382
383
384 FilterBase::FilterBase()
385 : m_io(NULL)
386 {
387 } // eo FilterBase::FilterBase()
388
389
390 /**
391  * injects incoming data.
392  * @param data  the new data
393  */
394 void FilterBase::injectIncomingData(const std::string& data)
395 {
396     if (m_io)
397     {
398         FilterBasePtr ptr= get_ptr_as< FilterBase >();
399         if (ptr)
400         {
401             m_io->injectIncomingData(ptr,data);
402         }
403     }
404 } // FilterBase::injectIncomingData(const std::string&)
405
406
407 /**
408  * injects outgoing data.
409  * @param data the new data
410  */
411 void FilterBase::injectOutgoingData(const std::string& data)
412 {
413     if (m_io)
414     {
415         FilterBasePtr ptr= get_ptr_as< FilterBase >();
416         if (ptr)
417         {
418             m_io->injectOutgoingData(ptr,data);
419         }
420     }
421 } // eo FilterBase::injectOutgoingData(const std::string&)
422
423
424
425 /**
426  * called when EOF detected on incoming channel (or incoming channel closed)
427  */
428 void FilterBase::endOfIncomingData()
429 {
430 } // eo FilterBase::endOfIncomingData()
431
432 /**
433  * called when the filter should reset.
434  * This is used when a new channel is opened or when the filter is taken out of a filter chain.
435  */
436 void FilterBase::reset()
437 {
438 } // eo FilterBase::reset()
439
440
441 /*
442  * implementation of IOImplementation class
443  */
444
445
446 /**
447  * constructor for the base io class.
448  *
449  * Also adds the object to internal list of io objects (which is used by the backend).
450  *
451  * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
452  * @param write_fd  the file descriptor which should be used for writing (default -1 for no value)
453  */
454 IOImplementation::IOImplementation(int read_fd, int write_fd)
455 : m_read_fd(-1)
456 , m_write_fd(-1)
457 , m_eof(false)
458 , m_not_writable(false)
459 , m_input_buffer()
460 , m_output_buffer()
461 , m_marked_for_reading(false)
462 , m_marked_for_writing(false)
463 {
464     internal_io::g_io_list().add_item(this);
465     if (read_fd >= 0)
466     {
467         setReadFd( read_fd );
468     }
469     if (write_fd >= 0)
470     {
471         setWriteFd( write_fd );
472     }
473 } // eo IOImplementation::IOImplementation
474
475
476 /**
477  * destructor of the base io class.
478  *
479  * Removes the object from the interal list of io objects.
480  */
481 IOImplementation::~IOImplementation()
482 {
483     close();
484     if (internal_io::IOList::Instances())
485     {
486         internal_io::g_io_list().remove_item(this);
487     }
488     // now clear the filters:
489     while (! m_filter_chain.empty() )
490     {
491         FilterChain::iterator it = m_filter_chain.begin();
492         (*it)->reset();
493         (*it)->m_io= NULL;
494         //TODO: signal the filter that it is removed ?!
495         m_filter_chain.erase(it);
496     }
497 } // eo IOImplementation::~IOImplementation
498
499
500 /**
501  * adds another filter to the filter chain.
502  * @param filter pointer to the new filter.
503  */
504 void IOImplementation::addFilter
505 (
506     FilterBasePtr filter
507 )
508 {
509     if (!filter)
510     {
511         return; // nothing to do
512     }
513     if (filter->m_io)
514     {
515         filter->m_io->removeFilter(filter);
516     }
517     m_filter_chain.push_back( filter );
518 } // eo IOImplementation::addFilter
519
520
521 /**
522  * removes a filter from the filter chain.
523  * @param filter the pointer to the filter which is removed.
524  * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
525  * deleting the filter if it was dynamically allocated.
526  */
527 void IOImplementation::removeFilter
528 (
529     FilterBasePtr filter
530 )
531 {
532     FilterChain::iterator it =
533         std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
534     if (it != m_filter_chain.end())
535     {
536         filter->reset();
537         filter->m_io= NULL;
538         //TODO: signal the filter that it is removed ?!
539         m_filter_chain.erase(it);
540     }
541 } // eo IOImplementation::removeFilter
542
543
544 /**
545  * closes the file descriptors (/ the connection).
546  *
547  * @param direction the direction which should be closed (default: @a Direction::both for all).
548  */
549 void IOImplementation::close(Direction direction)
550 {
551     bool had_read_fd= (m_read_fd >= 0);
552     m_errno= 0;
553     if (direction == Direction::unspecified) direction= Direction::both;
554     if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
555     { // special case: half closed (socket) connections...
556         // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
557         // (read_fd or write_fd) this class works as desired.
558         switch(direction)
559         {
560             case Direction::in:
561                 {
562                     int res= ::shutdown(m_read_fd, SHUT_RD);
563                     if (res<0) 
564                     {
565                         m_errno= errno;
566                     }
567                     m_read_fd= -1;
568                     if (!m_eof)
569                     {
570                         for(FilterChain::iterator it= m_filter_chain.begin();
571                             it != m_filter_chain.end();
572                             ++it)
573                         {
574                             (*it)->endOfIncomingData();
575                         }
576                     }
577                 }
578                 return;
579
580             case Direction::out:
581                 {
582                     int res= ::shutdown(m_write_fd, SHUT_WR);
583                     if (res<0)
584                     {
585                         m_errno= errno;
586                     }
587                     m_write_fd= -1;
588                     m_output_buffer.clear();
589                 }
590                 return;
591         }
592     }
593     if (m_write_fd >= 0 && (direction & Direction::out) )
594     {
595         int res1 = ::close(m_write_fd);
596         if (m_write_fd == m_read_fd)
597         {
598             m_read_fd= -1;
599         }
600         m_write_fd= -1;
601         m_output_buffer.clear();
602         if (res1<0) m_errno= errno;
603     }
604     if (m_read_fd >=0 && (direction & Direction::in) )
605     {
606         int res1 = ::close(m_read_fd);
607         m_read_fd= -1;
608         if (res1<0) m_errno= errno;
609     }
610     if (had_read_fd && !m_eof && (m_read_fd<0))
611     {
612         for(FilterChain::iterator it= m_filter_chain.begin();
613             it != m_filter_chain.end();
614             ++it)
615         {
616             (*it)->endOfIncomingData();
617         }
618     }
619 } // eo IOImplementation::close
620
621
622 /**
623  * determines if the io class wants to read data.
624  * Default implementation checks only for a valid file descriptor value.
625  *
626  * @return @a true if the objects wants to read data
627  */
628 bool IOImplementation::wantRead()
629 {
630     return (m_read_fd >= 0) && ! m_eof;
631 } // eo IOImplementation::wantRead
632
633
634 /**
635  * determines if the io class wants to write data.
636  * Default implementation checks for a valid file descriptor value and if the object
637  * cannot write data immediately.
638  *
639  * @return @a true if the objects wants to write data
640  */
641 bool IOImplementation::wantWrite()
642 {
643     return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
644 } // eo IOImplementation::wantWrite
645
646
647 /**
648  * delivers if opened.
649  * The default returns @a true if at least one file descriptor (read or write) is valid.
650  * @return @a true if opened.
651  */
652 bool IOImplementation::opened() const
653 {
654     return (m_read_fd>=0) || (m_write_fd>=0);
655 } // eo IOImplementation::opened() const
656
657
658 /**
659  * returns if the read side detected an end of file (EOF).
660  * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
661  */
662 bool IOImplementation::eof() const
663 {
664     return (m_read_fd < 0) || m_eof;
665 } // eo IOImplementatio::eof() const
666
667
668 /**
669  * @brief returns of the write side didn't detect that it cannot write.
670  * @return @a true if we can write.
671  */
672 bool IOImplementation::writable() const
673 {
674     return (m_write_fd >=0 ) and not m_not_writable;
675 } // eo IOImplementation::writable() const
676
677
678 /**
679  * returns if the output buffer is empty.
680  * @return 
681  */
682 bool IOImplementation::empty() const
683 {
684     return m_output_buffer.empty();
685 } // eo IOImplementation::empty
686
687 /**
688  * puts data into the output buffer and sends it immediately if possible,
689  *
690  * The data is passed through the filter chain before it's stored in the output buffer
691  * (i.e. the output buffer contains data as it should be send directly to the descriptor).
692  * @param _data the data which should be send.
693  */
694 void IOImplementation::lowSend(const std::string& _data)
695 {
696     std::string data(_data);
697
698     for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
699         it_filter!= m_filter_chain.rend();
700         ++it_filter)
701     {
702         data= (*it_filter)->filterOutgoingData(data);
703     }
704     m_output_buffer+= data;
705
706     // if we can send immediately, do it:
707     if (! m_output_buffer.empty() && m_marked_for_writing)
708     {
709         doWrite();
710     }
711 } // eo IOImplementation::lowSend
712
713
714 /**
715  * called by the backend when there is data to read for this object.
716  *
717  * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
718  * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
719  *
720  * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
721  *
722  * @note overload this method only when You know what You are doing!
723  * (overloading is necessary when handling server sockets.)
724  */
725 void IOImplementation::doRead()
726 {
727     // static read buffer; should be ok as long as we don't use threads
728     static char buffer[8*1024]; // 8 KiB
729
730     m_errno = 0;
731     if (m_read_fd<0 || !m_marked_for_reading)
732     {
733         ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
734         return;
735     }
736
737     // reset the mark:
738     m_marked_for_reading = false;
739
740     // now read the data:
741     ssize_t count;
742     count = ::read(m_read_fd, buffer, sizeof(buffer));
743
744     ODOUT("::read -> " << count);
745
746     // interpret what we got:
747     if (count < 0) // error
748     {
749         m_errno = errno;
750         int fd= m_read_fd;
751
752         switch(m_errno)
753         {
754             case EINVAL:
755             case EBADF:
756             case ECONNRESET:
757             case ENETRESET:
758                 if (fd == m_read_fd)
759                 {
760                     close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
761                 }
762                 break;
763         }
764     }
765     else if (count==0) // EOF
766     {
767         // remember the read fd:
768         int fd = m_read_fd;
769         // remember the EOF:
770         m_eof= true;
771         // signal EOF
772         m_signal_eof();
773         // if the fd is still the same: close it.
774         if (fd == m_read_fd)
775         {
776             close( Direction::in );
777         }
778     }
779     else // we have valid data
780     {
781         std::string data(buffer,count);
782         ODOUT(" got \"" << data << "\"");
783         for(FilterChain::iterator it_filter= m_filter_chain.begin();
784             it_filter != m_filter_chain.end();
785             ++it_filter)
786         {
787             data= (*it_filter)->filterIncomingData(data);
788         }
789         m_input_buffer+= data;
790         m_signal_read();
791     }
792 } // eo IOImplementation::doRead
793
794
795 /**
796  * interface for filter classes to inject data into the filter chain (emulating new incoming data).
797  * @param from_filter the filter which injects the new data.
798  * @param _data  the new data.
799  */
800 void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
801 {
802     FilterChain::iterator it_filter =
803         std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
804     if (it_filter == m_filter_chain.end())
805     {
806         // dont accept data inject from a unknown filter
807         return;
808     }
809     // well: pass the data through the remaining filters:
810     // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
811     std::string data(_data);
812     for(++it_filter;
813         it_filter != m_filter_chain.end();
814         ++it_filter)
815     {
816         data= (*it_filter)->filterIncomingData(data);
817     }
818     m_input_buffer+= data;
819     m_signal_read();
820 } // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
821
822
823 /**
824  * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
825  * @param from_filter the filter which injects the new data.
826  * @param _data  the new data.
827  */
828 void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
829 {
830     FilterChain::reverse_iterator it_filter =
831         std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
832     if (it_filter == m_filter_chain.rend())
833     {
834         // dont accept data inject from a unknown filter
835         return;
836     }
837     // well: pass the data through the remaining filters:
838     // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
839     std::string data(_data);
840     for(++it_filter;
841         it_filter!= m_filter_chain.rend();
842         ++it_filter)
843     {
844         data= (*it_filter)->filterOutgoingData(data);
845     }
846     m_output_buffer+= data;
847
848     // if we can send immediately, do it:
849     if (! m_output_buffer.empty() && m_marked_for_writing)
850     {
851         doWrite();
852     }
853 } // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
854
855
856 /**
857  * set the read file descriptor.
858  * Although a derived class can also set the read fd directly; this method should be used
859  * for this task since it updates some flags on the fd for async operation.
860  * @param fd the new read file descriptor.
861  */
862 void IOImplementation::setReadFd(int fd)
863 {
864     // test if we already have a valid descriptor (and may have to close it):
865     if (m_read_fd >=0 )
866     {
867         if (m_read_fd == fd)
868         {
869             // fd was already right; consider it to be ok.
870             return;
871         }
872         close(Direction::in);
873     }
874     // reset our errno:
875     m_errno= 0;
876     // if the new descriptor looks valid, set some flags:
877     if (fd >= 0)
878     {
879         long flags= ::fcntl(fd, F_GETFL);
880         if (flags != -1)
881         {
882             // set the flags for non blocking, async operation
883             flags |= O_NONBLOCK|O_ASYNC;
884             ::fcntl(fd,F_SETFL, flags);
885         }
886         else if ( errno == EBADF )
887         {
888             // well, we seemed to be fed with an invalid descriptor...:
889             m_errno = errno;
890             fd= -1;
891         }
892     }
893     if (fd >= 0) // if still valid:
894     {
895         // set the close-on-exec flag
896         ::fcntl(fd,F_SETFD, FD_CLOEXEC);
897     }
898     m_read_fd= fd;
899     m_marked_for_reading= false;
900     m_eof= false;
901 } // eo IOImplementation::setReadFd(int)
902
903
904
905 /**
906  * set the write file descriptor.
907  * Although a derived class can also set the write fd directly; this method should be used
908  * for this task since it updates some flags on the fd for async operation.
909  * @param fd the new write file descriptor.
910  */
911 void IOImplementation::setWriteFd(int fd)
912 {
913     if (m_write_fd >=0 )
914     {
915         if (m_write_fd == fd)
916         {
917             // fd was already right; consider it to be ok.
918             return;
919         }
920         close(Direction::out);
921     }
922     // reset our errno:
923     m_errno= 0;
924     // if the new descriptor looks valid, set some flags:
925     if (fd >= 0)
926     {
927         long flags= ::fcntl(fd, F_GETFL);
928         if (flags != -1)
929         {
930             // set the flags for non blocking, async operation
931             flags |= O_NONBLOCK|O_ASYNC;
932             ::fcntl(fd,F_SETFL, flags);
933         }
934         else if (errno == EBADF)
935         {
936             // well, we seemed to be fed with an invalid descriptor...:
937             m_errno = errno;
938             fd= -1;
939         }
940     }
941     if (fd >= 0) // if still valid:
942     {
943         // set the close-on-exec flag
944         ::fcntl(fd,F_SETFD, FD_CLOEXEC);
945     }
946     m_write_fd = fd;
947     m_marked_for_writing= false;
948     m_not_writable= false;
949 } // eo IOImplementation::setWriteFd(int)
950
951
952
953 /**
954  * called by the backend when this object can write data.
955  *
956  * If some data was sended, the signal @a m_signal_write is called.
957  *
958  * @internal tries to write all buffered data to output; if this succeeds,
959  * the connection is assumed to be still able to accept more data.
960  * (i.e. the internal write mark is kept!)
961  *
962   * @note overload this method only when You know what You are doing!
963 */
964 void IOImplementation::doWrite()
965 {
966     m_errno = 0;
967     if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
968     {
969         return;
970     }
971
972     ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
973
974     //reset mark:
975     m_marked_for_writing= false;
976
977     // now write the data
978     ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
979
980     ODOUT("::write -> " << count);
981
982     if (count < 0) // error
983     {
984         m_errno= errno;
985         int fd= m_write_fd;
986
987         switch(m_errno)
988         {
989             case EPIPE:
990                 m_not_writable= true;
991                 // emit a signal
992                 m_signal_not_writable();
993                 // fall through
994             case EINVAL:
995             case EBADF:
996             case ECONNRESET:
997             case ENETRESET:
998                 if (fd == m_write_fd)
999                 {
1000                     close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
1001                 }
1002                 break;
1003         }
1004     }
1005     else
1006     {
1007         m_output_buffer.erase(0, count);
1008         if (m_output_buffer.empty())
1009         {
1010             // special case: if we were able to send all the data, we keep the write mark:
1011             m_marked_for_writing= true;
1012         }
1013     }
1014     if (count > 0)
1015     {
1016         m_signal_write();
1017     }
1018 } // eo IOImplementation::doWrite
1019
1020
1021 /*
1022  * implementation of SimpleIO
1023  */
1024
1025
1026 SimpleIO::SimpleIO(int read_fd, int write_fd)
1027 : inherited(read_fd, write_fd)
1028 {
1029     m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
1030 } // eo SimpleIO::SimpleIO()
1031
1032
1033 SimpleIO::~SimpleIO()
1034 {
1035 } // eo SimpleIO::~SimpleIO()
1036
1037
1038 /**
1039  * sends a string.
1040  * @param data the string.
1041  */
1042 void SimpleIO::sendString(const std::string& data)
1043 {
1044     lowSend(data);
1045 } // eo SimpleIO::sendString(const std::string&)
1046
1047
1048 /**
1049  * emits the signal signalReceived with the received data.
1050  * This slot is connected to IOImplementation::m_signal_read.
1051  */
1052 void SimpleIO::slotReceived()
1053 {
1054     std::string data;
1055     data.swap(m_input_buffer);
1056     signal_received_string(data);
1057 } // eo SimpleIO::slotReceived()
1058
1059
1060
1061 /*
1062  * implementation of SimpleIO2
1063  */
1064
1065
1066 SimpleIO2::SimpleIO2(int read_fd, int write_fd)
1067 : inherited(read_fd, write_fd)
1068 {
1069     m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
1070 } // eo SimpleIO2::SimpleIO2()
1071
1072
1073 SimpleIO2::~SimpleIO2()
1074 {
1075 } // eo SimpleIO2::~SimpleIO2()
1076
1077
1078 /**
1079  * sends a string.
1080  * @param data the string.
1081  */
1082 void SimpleIO2::sendString(const std::string& data)
1083 {
1084     lowSend(data);
1085 } // eo SimpleIO2::sendString(const std::string&)
1086
1087
1088 /**
1089  * emits the signal signalReceived with the received data.
1090  * This slot is connected to IOImplementation::m_signal_read.
1091  */
1092 void SimpleIO2::slotReceived()
1093 {
1094     std::string data;
1095     data.swap(m_input_buffer);
1096     signal_received_string(data);
1097 } // eo SimpleIO2::slotReceived()
1098
1099
1100
1101 /*
1102  * implementation of class Backend (singleton)
1103  */
1104
1105 Backend* Backend::g_backend= NULL;
1106
1107 int Backend::m_count_active_steps=0;
1108
1109
1110 Backend::Backend()
1111 : m_count_active_loops(0)
1112 , m_count_stop_requests(0)
1113 {
1114     SystemTools::ignore_signal( SystemTools::Signal::PIPE );
1115 } // eo Backend::Backend
1116
1117
1118 Backend::~Backend()
1119 {
1120     SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
1121 } // eo Backend::~Backend()
1122
1123 /**
1124  * delivers pointer to the current backend, instantiating a new backend if there was no current one.
1125  *
1126  * This should be the only way to access the backend which should be a singleton.
1127  *
1128  * @return the pointer to the current backend.
1129  */
1130 Backend* Backend::getBackend()
1131 {
1132     if (!g_backend)
1133     {
1134         g_backend = new Backend();
1135     }
1136     return g_backend;
1137 } // eo Backend::getBackend
1138
1139
1140
1141
1142 /**
1143  * performs one backend cycle.
1144  *
1145  * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
1146  * Also determines the timer events which become due and adjusts the timeout.
1147  * Constructs the necessary structures and calls poll().
1148  * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
1149  *
1150  * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
1151  * @return @a true if there was at least one active object; otherwise @a false
1152  *
1153  * @note this method is a little beast.
1154  *
1155  * @internal 
1156  * The cycle is divided into four steps: collecting; poll; mark and execute.
1157  * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
1158  * are calling @a Backup::doOneStep or open their own local backend loop.
1159  *
1160  * @todo handle some more error cases.
1161  * @todo provide a plugin interface for external handler.
1162  * (currently inclusion of external handler is possible by (ab)using timer classes)
1163  */
1164 bool Backend::doOneStep(int timeout)
1165 {
1166     ODOUT( "timeout=" << timeout );
1167     internal_io::PollDataCluster poll_data;
1168     bool had_active_object = false;
1169
1170     ++m_count_active_steps;
1171
1172     try {
1173
1174         FdSetType local_read_fds;
1175         FdSetType local_write_fds;
1176
1177         // step 1 ; collect
1178
1179         { // step 1.1: collect fds for read/write operations
1180             for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
1181                 itIOList != internal_io::g_io_list().end();
1182                 ++itIOList)
1183             {
1184                 if (! *itIOList) continue; // skip NULL entries
1185                 int  read_fd  = (*itIOList)->m_read_fd;
1186                 int  write_fd = (*itIOList)->m_write_fd;
1187                 bool want_read  = (read_fd >= 0) and (*itIOList)->wantRead();
1188                 bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
1189                 if (read_fd >= 0 )
1190                 {
1191                     local_read_fds.insert( read_fd );
1192                 }
1193                 if (write_fd >= 0)
1194                 {
1195                     local_write_fds.insert( write_fd );
1196                 }
1197                 if (!want_read && !want_write) continue;
1198                 if (want_read)
1199                 {
1200                     FODOUT( (*itIOList), "wants to read  (fd=" << read_fd << ")");
1201                     poll_data.add_read_fd(read_fd, *itIOList);
1202                 }
1203                 if (want_write)
1204                 {
1205                     FODOUT( (*itIOList), "wants to write  (fd=" << write_fd << ")");
1206                     poll_data.add_write_fd(write_fd, *itIOList);
1207                 }
1208                 had_active_object= true;
1209             }
1210         }
1211
1212         { // step 1.2: collect timer events
1213             MilliTime current_time;
1214             MilliTime min_event_time;
1215
1216             get_current_monotonic_time(current_time);
1217             bool min_event_time_set;
1218
1219             if (timeout >= 0)
1220             {
1221                 min_event_time = current_time + MilliTime(0,timeout);
1222                 min_event_time_set= true;
1223             }
1224             else
1225             {
1226                 min_event_time = current_time + MilliTime(86400,0);
1227                 min_event_time_set= false;
1228             }
1229             // TODO
1230
1231             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1232                 it_timer != internal_io::g_timer_list().end() 
1233                 && (!had_active_object || !min_event_time_set || current_time < min_event_time);
1234                 ++ it_timer)
1235             {
1236                 if (! *it_timer) continue; // skip NULL entries
1237                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1238                 if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
1239                 {
1240                     min_event_time = (*it_timer)->m_when;
1241                     min_event_time_set= true;
1242                 }
1243                 had_active_object= true;
1244             }
1245
1246             if (min_event_time_set)
1247             { // we have at a minimal event time, so (re)compute the timeout value:
1248                 MilliTime delta= (min_event_time - current_time);
1249                 long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
1250                 if (delta_ms <= 0L)
1251                 {
1252                     timeout= 0L;
1253                 }
1254                 else
1255                 {
1256                     timeout= delta_ms + (delta_ms<5 ? 1 : 3);
1257                 }
1258             }
1259         }
1260
1261         // step 2 : poll
1262         ODOUT(" poll timeout is " << timeout);
1263         {
1264             MilliTime current_time;
1265             get_current_monotonic_time(current_time);
1266             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1267         }
1268         int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
1269
1270         ODOUT("poll -> " << poll_result);
1271         {
1272             MilliTime current_time;
1273             get_current_monotonic_time(current_time);
1274             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1275         }
1276
1277         if (poll_result < 0)
1278         {
1279             //TODO poll error handling (signals ?!)
1280         }
1281
1282         // step 3 : mark
1283
1284         // step 3.1: mark io objects (if necessary)
1285         if (poll_result > 0)
1286         {
1287             for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
1288                 itPollItem != poll_data.m_poll_vector.end();
1289                 ++itPollItem)
1290             {
1291                 ODOUT("  fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
1292                 if ( 0 == (itPollItem->revents))
1293                 { // preliminary continuation if nothing is to handle for this item(/fd)...
1294                     continue;
1295                 }
1296                 if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
1297                 {
1298                     IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
1299                     if (io && io->m_read_fd==itPollItem->fd)
1300                     {
1301                         FODOUT(io,"marked for reading");
1302                         io->m_marked_for_reading= true;
1303                     }
1304                 }
1305                 if ( 0!= (itPollItem->revents & POLLOUT))
1306                 {
1307                     IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1308                     if (io && io->m_write_fd==itPollItem->fd)
1309                     {
1310                         io->m_marked_for_writing= true;
1311                     }
1312                 }
1313                 if ( 0!= (itPollItem->revents & POLLERR))
1314                 {
1315                     IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1316                     if (0!= (itPollItem->events & POLLOUT))
1317                     {
1318                         if (io && io->m_write_fd==itPollItem->fd)
1319                         {
1320                             io->m_marked_for_writing= false;
1321                             //io->close( Direction::out );
1322                         }
1323                     }
1324                 }
1325                 // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
1326             }
1327         }
1328
1329         //Step 3.2: mark timer objects
1330         {
1331             MilliTime current_time;
1332
1333             get_current_monotonic_time(current_time);
1334             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1335
1336             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1337                 it_timer != internal_io::g_timer_list().end();
1338                 ++ it_timer)
1339             {
1340                 ODOUT("  check timer " << *it_timer);
1341                 if (! *it_timer) continue; // skip NULL entries
1342                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1343                 if ( (*it_timer)->m_when <= current_time)
1344                 {
1345                     ODOUT("   ==> MARK");
1346                     (*it_timer)->m_marked = true;
1347                 }
1348             }
1349         }
1350
1351
1352         // step 4 : execute
1353
1354         // step 4.1: execute io
1355         ODOUT("execute stage");
1356         for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
1357             it_io != internal_io::g_io_list().end();
1358             ++ it_io)
1359         {
1360             ODOUT("  check obj " << *it_io);
1361             if (NULL == *it_io) continue;
1362             if ((*it_io)->m_marked_for_writing)
1363             {
1364                 FODOUT((*it_io),"exec doWrite");
1365                 (*it_io)->doWrite();
1366                 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1367                 if ((*it_io)->m_errno)
1368                 {
1369                     continue;
1370                 }
1371             }
1372             if ((*it_io)->m_marked_for_reading)
1373             {
1374                 FODOUT((*it_io),"exec doRead");
1375                 (*it_io)->doRead();
1376                 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1377                 if ((*it_io)->m_errno)
1378                 {
1379                     continue;
1380                 }
1381             }
1382         }
1383
1384         // step 4.2: execute timer events
1385         {
1386             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1387                 it_timer != internal_io::g_timer_list().end();
1388                 ++ it_timer)
1389             {
1390                 if (! *it_timer) continue; // skip NULL entries
1391                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1392                 if (! (*it_timer)->m_marked) continue; // skip if not marked
1393
1394                 // reset the mark and deactivate object now since the execute() method might activate it again
1395                 (*it_timer)->m_marked= false;
1396                 (*it_timer)->m_active= false;
1397
1398                 // now execute the event:
1399                 (*it_timer)->execute();
1400             }
1401         }
1402
1403     } // eo try
1404     catch(...)
1405     {
1406         // clean up our counter
1407         --m_count_active_steps;
1408         // and forward the exception
1409         throw;
1410     }
1411
1412     if ( 0 == --m_count_active_steps)
1413     {
1414         internal_io::g_io_list().clean_list();
1415         internal_io::g_timer_list().clean_list();
1416     }
1417
1418     return had_active_object;
1419 } // eo Backend::doOneStep
1420
1421
1422 /**
1423  * enters a backend loop.
1424  *
1425  * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
1426  * active objects (io objects or timer objects).
1427  */
1428 void Backend::run()
1429 {
1430     ++m_count_active_loops;
1431     do
1432     {
1433         try
1434         {
1435             if (!doOneStep(c_max_poll_wait))
1436             {
1437                 // stop if there are no more active objects.
1438                 stop();
1439             }
1440         }
1441         catch(...)
1442         {
1443             // clean up our counter
1444             --m_count_active_loops;
1445             // and forward the exception
1446             throw;
1447         }
1448     } 
1449     while (0 == m_count_stop_requests);
1450     --m_count_active_loops;
1451     --m_count_stop_requests;
1452 } // eo Backend::run
1453
1454
1455 /**
1456  * @brief stops the latest loop currently run by Backend::run().
1457  * @see Backend::run()
1458  */
1459 void Backend::stop()
1460 {
1461     if (m_count_active_loops)
1462     {
1463         ++m_count_stop_requests;
1464     }
1465 } // eo Backend::stop()
1466
1467
1468
1469 } // eo namespace AsyncIo