c85edb577bd3c79a66a2cf15c88c016ae22122c4
[libasyncio] / asyncio / async_io.cpp
1 /*
2 The software in this package is distributed under the GNU General
3 Public License version 2 (with a special exception described below).
4
5 A copy of GNU General Public License (GPL) is included in this distribution,
6 in the file COPYING.GPL.
7
8 As a special exception, if other files instantiate templates or use macros
9 or inline functions from this file, or you compile this file and link it
10 with other works to produce a work based on this file, this file
11 does not by itself cause the resulting work to be covered
12 by the GNU General Public License.
13
14 However the source code for this file must still be made available
15 in accordance with section (3) of the GNU General Public License.
16
17 This exception does not invalidate any other reasons why a work based
18 on this file might be covered by the GNU General Public License.
19 */
20 /** @file
21  * @copyright Copyright © 2007-2008 by Intra2net AG
22  */
23
24 //#define NOISEDEBUG
25
26 #include "async_io.hpp"
27 #include <asyncio_config.hpp>
28
29 #include <list>
30 #include <vector>
31 #include <map>
32 #include <algorithm>
33 #include <utility>
34
35 #include <sys/poll.h>
36 #include <sys/time.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39 #include <errno.h>
40 #include <fcntl.h>
41
42 #include <boost/bind.hpp>
43
44 #include <asyncio_signalfunc.hpp>
45
46 #include <iostream>
47
48 #ifdef NOISEDEBUG
49 #include <iostream>
50 #include <iomanip>
51 #define DOUT(msg) std::cout << msg << std::endl
52 #define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
53 //#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
54 #define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
55 #else
56 #define DOUT(msg) do {} while (0)
57 #define FODOUT(obj,msg) do {} while (0)
58 #define ODOUT(msg) do {} while (0)
59 #endif
60
61
62 namespace
63 {
64
65 using namespace AsyncIo::Utils;
66
67 /*
68  * configuration:
69  */
70
71
72 const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
73
74
75 /**
76  * contains internal helper structs and functions for io handling.
77  */
78 namespace internal_io
79 {
80
81 /**
82  * extends struct pollfd with some convenience functions
83  */
84 struct PollFd : public ::pollfd
85 {
86     PollFd()
87     {
88         fd= 0;
89         events= revents= 0;
90     } // eo PollFd
91
92
93     /**
94      * initializes the struct with a given file descriptor and clears the event mask(s).
95      * @param _fd
96      */
97     PollFd(int _fd)
98     {
99         fd= _fd;
100         events= revents= 0;
101     } // eo PollFd
102
103
104     /**
105      * set that we want to be notified about new incoming data
106      */
107     void setPOLLIN()  { events |= POLLIN; }
108
109     /**
110      * set that we want to be notified if we can send (more) data.
111      */
112     void setPOLLOUT() { events |= POLLOUT; }
113
114 }; // eo struct PollFd
115
116
117 typedef std::vector<PollFd> PollVector;
118 typedef std::map<int,PollVector::size_type> FdPollMap;
119 typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap;
120
121
122 /**
123  * struct for interfacing our local structures with poll()
124  */
125 struct PollDataCluster
126 {
127     PollVector m_poll_vector;
128     FdPollMap  m_fd_poll_map;
129     FdIOMap    m_read_fd_io_map;
130     FdIOMap    m_write_fd_io_map;
131
132     void add_read_fd( int fd, AsyncIo::IOImplementation* io);
133     void add_write_fd( int fd, AsyncIo::IOImplementation* io);
134
135     pollfd* get_pollfd_ptr();
136     unsigned int get_num_pollfds() const;
137
138 }; // eo struct PollDataCluster
139
140
141 typedef PtrList< AsyncIo::IOImplementation, true > IOList;
142 typedef PtrList< AsyncIo::TimerBase, true >        TimerList;
143
144 template<> int IOList::GlobalCountType::InstanceCount= 0;
145 template<> int TimerList::GlobalCountType::InstanceCount= 0;
146
147
148 /**
149  * the (internal) global list of io objects (object pointers)
150  */
151 IOList& g_io_list()
152 {
153     static IOList _the_io_list;
154     return _the_io_list;
155 };
156
157
158 /**
159  * the (internal) global list of timer objects (object pointers)
160  */
161 TimerList& g_timer_list()
162 {
163     static TimerList _the_timer_list;
164     return _the_timer_list;
165 }
166
167 /*
168  * implementation of PollDataCluster
169  */
170
171
172 /**
173  * add a new file descriptor to the read list.
174  *
175  * @param fd the file descriptor.
176  * @param io the io object which uses the fd for reading.
177  */
178 void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io)
179 {
180     FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
181     if (itPollMap != m_fd_poll_map.end())
182     {
183         m_poll_vector[ itPollMap->second ].setPOLLIN();
184     }
185     else
186     {
187         PollFd item(fd);
188         item.setPOLLIN();
189         m_fd_poll_map[fd] = m_poll_vector.size();
190         m_poll_vector.push_back( item );
191     }
192     m_read_fd_io_map[fd]= io;
193 } // eo PollDataCluster::add_read_fd
194
195
196 /**
197  * add a new file descriptor to the write list.
198  *
199  * @param fd the file descriptor.
200  * @param io the io object which uses the fd for writing.
201  */
202 void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io)
203 {
204     FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
205     if (itPollMap != m_fd_poll_map.end())
206     {
207         m_poll_vector[ itPollMap->second ].setPOLLOUT();
208     }
209     else
210     {
211         PollFd item(fd);
212         item.setPOLLOUT();
213         m_fd_poll_map[fd] = m_poll_vector.size();
214         m_poll_vector.push_back( item );
215     }
216     m_write_fd_io_map[fd]= io;
217 } // eo PollDataCluster::add_write_fd
218
219
220 /**
221  * returns a pointer to a pollfd array; suitable for passing to poll()
222  *
223  * @return pointer to pollfd array
224  */
225 pollfd* PollDataCluster::get_pollfd_ptr()
226 {
227     return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
228 } // eo get_pollfd_ptr
229
230
231 /**
232  * returns the number of entries in the pollfd array; suitable for passing to poll()
233  *
234  * @return the number of entries in the pollfd array
235  */
236 unsigned int PollDataCluster::get_num_pollfds() const
237 {
238     return m_poll_vector.size();
239 } // eo get_num_pollfds
240
241
242
243 } // eo namespace internal_io
244
245
246 /*
247  * some internal tool functions and structures
248  */
249
250 struct FilterMatch {
251     AsyncIo::FilterBasePtr m_filter;
252
253     FilterMatch(AsyncIo::FilterBasePtr filter)
254     : m_filter(filter)
255     {}
256
257     bool operator () (const AsyncIo::FilterBasePtr& item)
258     {
259         return item && item == m_filter;
260     }
261
262 }; // eo struct FilterMatch
263
264
265
266 } // eo anonymous namespace
267
268
269
270
271 namespace AsyncIo
272 {
273
274
275 /*
276  * implementation of TimerBase
277  */
278
279 /**
280  * constructor. Adds the object to the internal timer list.
281  */
282 TimerBase::TimerBase()
283 : m_active(false)
284 , m_marked(false)
285 {
286     internal_io::g_timer_list().add_item(this);
287 } // eo TimerBase::TimerBase
288
289
290 /**
291  * destructor. Removes the object from the internal timer list.
292  */
293 TimerBase::~TimerBase()
294 {
295     ODOUT("enter");
296     if (internal_io::TimerList::Instances())
297     {
298         ODOUT("remove from list");
299         internal_io::g_timer_list().remove_item(this);
300     }
301 } // eo TimerBase::~TimerBase
302
303
304 /**
305  * @brief returns the point in time when the time is executed in real time.
306  * @return the point in time when the timer is to be executed.
307  */
308 MilliTime TimerBase::getRealWhenTime() const
309 {
310     MilliTime mono_time;
311     MilliTime real_time;
312     get_current_monotonic_time(mono_time);
313     get_current_real_time(real_time);
314     MilliTime result= m_when - mono_time + real_time;
315     return result;
316 } // eo TimerBase::getRealWhenTime() const
317
318
319 /**
320  * sets the time when the event should be executed.
321  * @param sec the seconds part of the point in time.
322  * @param msec  the milliseconds part of the point in time.
323  */
324 void TimerBase::setWhenTime(long sec, long msec)
325 {
326     m_when.set(sec,msec);
327     m_marked= false;
328 } // eo TimerBase::setWhenTime
329
330
331 /**
332  * sets the time when the event should be executed.
333  * @param mt the point in time.
334  */
335 void TimerBase::setWhenTime(const MilliTime& mt)
336 {
337     m_when= mt;
338     m_marked= false;
339 } // eo TimerBase::setWhenTime
340
341
342 /**
343  * sets the time delta measured from current time when the event should be executed.
344  * @param sec the seconds of the time delta
345  * @param msec the milli seconds of the time delta
346  */
347 void TimerBase::setDeltaWhenTime(long sec, long msec)
348 {
349     setDeltaWhenTime( MilliTime(sec,msec) );
350 } // eo TimerBase::setWhenTime
351
352
353
354 /**
355  * sets the time delta measured from current time when the event should be executed.
356  * @param mt the time delta
357  */
358 void TimerBase::setDeltaWhenTime(const MilliTime& mt)
359 {
360     get_current_monotonic_time(m_when);
361     m_when+= mt;
362     m_marked= false;
363 } // eo TimerBase::setWhenTime
364
365
366 /**
367  * set the active state of the timer event.
368  * @param active determines if the object should be active (default: yes).
369  */
370 void TimerBase::activate(bool active)
371 {
372     m_active = active;
373     if (!active)
374     {
375         // clear the mark if we are not active.
376         m_marked= false;
377     }
378 } // eo TimerBase::activate
379
380
381 /** @fn void TimerBase::deactivate()
382  *  deactivates the event by clearing the active state.
383  */
384
385
386 /**
387  * called when the timer event occured.
388  */
389 void TimerBase::execute()
390 {
391 } // eo TimerBase::execute
392
393
394 /*
395  * implementation of FilterBase class
396  */
397
398
399 FilterBase::FilterBase()
400 : m_io(NULL)
401 {
402 } // eo FilterBase::FilterBase()
403
404
405 /**
406  * injects incoming data.
407  * @param data  the new data
408  */
409 void FilterBase::injectIncomingData(const std::string& data)
410 {
411     if (m_io)
412     {
413         FilterBasePtr ptr= get_ptr_as< FilterBase >();
414         if (ptr)
415         {
416             m_io->injectIncomingData(ptr,data);
417         }
418     }
419 } // FilterBase::injectIncomingData(const std::string&)
420
421
422 /**
423  * injects outgoing data.
424  * @param data the new data
425  */
426 void FilterBase::injectOutgoingData(const std::string& data)
427 {
428     if (m_io)
429     {
430         FilterBasePtr ptr= get_ptr_as< FilterBase >();
431         if (ptr)
432         {
433             m_io->injectOutgoingData(ptr,data);
434         }
435     }
436 } // eo FilterBase::injectOutgoingData(const std::string&)
437
438
439
440 /**
441  * called when EOF detected on incoming channel (or incoming channel closed)
442  */
443 void FilterBase::endOfIncomingData()
444 {
445 } // eo FilterBase::endOfIncomingData()
446
447 /**
448  * called when the filter should reset.
449  * This is used when a new channel is opened or when the filter is taken out of a filter chain.
450  */
451 void FilterBase::reset()
452 {
453 } // eo FilterBase::reset()
454
455
456 /*
457  * implementation of IOImplementation class
458  */
459
460
461 /**
462  * constructor for the base io class.
463  *
464  * Also adds the object to internal list of io objects (which is used by the backend).
465  *
466  * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
467  * @param write_fd  the file descriptor which should be used for writing (default -1 for no value)
468  */
469 IOImplementation::IOImplementation(int read_fd, int write_fd)
470 : m_read_fd(-1)
471 , m_write_fd(-1)
472 , m_eof(false)
473 , m_not_writable(false)
474 , m_input_buffer()
475 , m_output_buffer()
476 , m_marked_for_reading(false)
477 , m_marked_for_writing(false)
478 {
479     internal_io::g_io_list().add_item(this);
480     if (read_fd >= 0)
481     {
482         setReadFd( read_fd );
483     }
484     if (write_fd >= 0)
485     {
486         setWriteFd( write_fd );
487     }
488 } // eo IOImplementation::IOImplementation
489
490
491 /**
492  * destructor of the base io class.
493  *
494  * Removes the object from the interal list of io objects.
495  */
496 IOImplementation::~IOImplementation()
497 {
498     close();
499     if (internal_io::IOList::Instances())
500     {
501         internal_io::g_io_list().remove_item(this);
502     }
503     // now clear the filters:
504     while (! m_filter_chain.empty() )
505     {
506         FilterChain::iterator it = m_filter_chain.begin();
507         (*it)->reset();
508         (*it)->m_io= NULL;
509         //TODO: signal the filter that it is removed ?!
510         m_filter_chain.erase(it);
511     }
512 } // eo IOImplementation::~IOImplementation
513
514
515 /**
516  * adds another filter to the filter chain.
517  * @param filter pointer to the new filter.
518  */
519 void IOImplementation::addFilter
520 (
521     FilterBasePtr filter
522 )
523 {
524     if (!filter)
525     {
526         return; // nothing to do
527     }
528     if (filter->m_io)
529     {
530         filter->m_io->removeFilter(filter);
531     }
532     m_filter_chain.push_back( filter );
533 } // eo IOImplementation::addFilter
534
535
536 /**
537  * removes a filter from the filter chain.
538  * @param filter the pointer to the filter which is removed.
539  * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
540  * deleting the filter if it was dynamically allocated.
541  */
542 void IOImplementation::removeFilter
543 (
544     FilterBasePtr filter
545 )
546 {
547     FilterChain::iterator it =
548         std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
549     if (it != m_filter_chain.end())
550     {
551         filter->reset();
552         filter->m_io= NULL;
553         //TODO: signal the filter that it is removed ?!
554         m_filter_chain.erase(it);
555     }
556 } // eo IOImplementation::removeFilter
557
558
559 /**
560  * closes the file descriptors (/ the connection).
561  *
562  * @param direction the direction which should be closed (default: @a Direction::both for all).
563  */
564 void IOImplementation::close(Direction direction)
565 {
566     bool had_read_fd= (m_read_fd >= 0);
567     m_errno= 0;
568     if (direction == Direction::unspecified) direction= Direction::both;
569     if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
570     { // special case: half closed (socket) connections...
571         // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
572         // (read_fd or write_fd) this class works as desired.
573         switch(direction)
574         {
575             case Direction::in:
576                 {
577                     int res= ::shutdown(m_read_fd, SHUT_RD);
578                     if (res<0)
579                     {
580                         m_errno= errno;
581                     }
582                     m_read_fd= -1;
583                     if (!m_eof)
584                     {
585                         for(FilterChain::iterator it= m_filter_chain.begin();
586                             it != m_filter_chain.end();
587                             ++it)
588                         {
589                             (*it)->endOfIncomingData();
590                         }
591                     }
592                 }
593                 return;
594
595             case Direction::out:
596                 {
597                     int res= ::shutdown(m_write_fd, SHUT_WR);
598                     if (res<0)
599                     {
600                         m_errno= errno;
601                     }
602                     m_write_fd= -1;
603                     m_output_buffer.clear();
604                 }
605                 return;
606         }
607     }
608     if (m_write_fd >= 0 && (direction & Direction::out) )
609     {
610         int res1 = ::close(m_write_fd);
611         if (m_write_fd == m_read_fd)
612         {
613             m_read_fd= -1;
614         }
615         m_write_fd= -1;
616         m_output_buffer.clear();
617         if (res1<0) m_errno= errno;
618     }
619     if (m_read_fd >=0 && (direction & Direction::in) )
620     {
621         int res1 = ::close(m_read_fd);
622         m_read_fd= -1;
623         if (res1<0) m_errno= errno;
624     }
625     if (had_read_fd && !m_eof && (m_read_fd<0))
626     {
627         for(FilterChain::iterator it= m_filter_chain.begin();
628             it != m_filter_chain.end();
629             ++it)
630         {
631             (*it)->endOfIncomingData();
632         }
633     }
634 } // eo IOImplementation::close
635
636
637 /**
638  * determines if the io class wants to read data.
639  * Default implementation checks only for a valid file descriptor value.
640  *
641  * @return @a true if the objects wants to read data
642  */
643 bool IOImplementation::wantRead()
644 {
645     return (m_read_fd >= 0) && ! m_eof;
646 } // eo IOImplementation::wantRead
647
648
649 /**
650  * determines if the io class wants to write data.
651  * Default implementation checks for a valid file descriptor value and if the object
652  * cannot write data immediately.
653  *
654  * @return @a true if the objects wants to write data
655  */
656 bool IOImplementation::wantWrite()
657 {
658     return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
659 } // eo IOImplementation::wantWrite
660
661
662 /**
663  * delivers if opened.
664  * The default returns @a true if at least one file descriptor (read or write) is valid.
665  * @return @a true if opened.
666  */
667 bool IOImplementation::opened() const
668 {
669     return (m_read_fd>=0) || (m_write_fd>=0);
670 } // eo IOImplementation::opened() const
671
672
673 /**
674  * returns if the read side detected an end of file (EOF).
675  * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
676  */
677 bool IOImplementation::eof() const
678 {
679     return (m_read_fd < 0) || m_eof;
680 } // eo IOImplementatio::eof() const
681
682
683 /**
684  * @brief returns of the write side didn't detect that it cannot write.
685  * @return @a true if we can write.
686  */
687 bool IOImplementation::writable() const
688 {
689     return (m_write_fd >=0 ) and not m_not_writable;
690 } // eo IOImplementation::writable() const
691
692
693 /**
694  * returns if the output buffer is empty.
695  * @return
696  */
697 bool IOImplementation::empty() const
698 {
699     return m_output_buffer.empty();
700 } // eo IOImplementation::empty
701
702 /**
703  * puts data into the output buffer and sends it immediately if possible,
704  *
705  * The data is passed through the filter chain before it's stored in the output buffer
706  * (i.e. the output buffer contains data as it should be send directly to the descriptor).
707  * @param _data the data which should be send.
708  */
709 void IOImplementation::lowSend(const std::string& _data)
710 {
711     std::string data(_data);
712
713     for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
714         it_filter!= m_filter_chain.rend();
715         ++it_filter)
716     {
717         data= (*it_filter)->filterOutgoingData(data);
718     }
719     m_output_buffer+= data;
720
721     // if we can send immediately, do it:
722     if (! m_output_buffer.empty() && m_marked_for_writing)
723     {
724         doWrite();
725     }
726 } // eo IOImplementation::lowSend
727
728
729 /**
730  * called by the backend when there is data to read for this object.
731  *
732  * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
733  * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
734  *
735  * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
736  *
737  * @note overload this method only when You know what You are doing!
738  * (overloading is necessary when handling server sockets.)
739  */
740 void IOImplementation::doRead()
741 {
742     // static read buffer; should be ok as long as we don't use threads
743     static char buffer[8*1024]; // 8 KiB
744
745     m_errno = 0;
746     if (m_read_fd<0 || !m_marked_for_reading)
747     {
748         ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
749         return;
750     }
751
752     // reset the mark:
753     m_marked_for_reading = false;
754
755     // now read the data:
756     ssize_t count;
757     count = ::read(m_read_fd, buffer, sizeof(buffer));
758
759     ODOUT("::read -> " << count);
760
761     // interpret what we got:
762     if (count < 0) // error
763     {
764         m_errno = errno;
765         int fd= m_read_fd;
766
767         switch(m_errno)
768         {
769             case EINVAL:
770             case EBADF:
771             case ECONNRESET:
772             case ENETRESET:
773                 if (fd == m_read_fd)
774                 {
775                     close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
776                 }
777                 break;
778         }
779     }
780     else if (count==0) // EOF
781     {
782         // remember the read fd:
783         int fd = m_read_fd;
784         // remember the EOF:
785         m_eof= true;
786         // signal EOF
787         m_signal_eof();
788         // if the fd is still the same: close it.
789         if (fd == m_read_fd)
790         {
791             close( Direction::in );
792         }
793     }
794     else // we have valid data
795     {
796         std::string data(buffer,count);
797         ODOUT(" got \"" << data << "\"");
798         for(FilterChain::iterator it_filter= m_filter_chain.begin();
799             it_filter != m_filter_chain.end();
800             ++it_filter)
801         {
802             data= (*it_filter)->filterIncomingData(data);
803         }
804         m_input_buffer+= data;
805         m_signal_read();
806     }
807 } // eo IOImplementation::doRead
808
809
810 /**
811  * interface for filter classes to inject data into the filter chain (emulating new incoming data).
812  * @param from_filter the filter which injects the new data.
813  * @param _data  the new data.
814  */
815 void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
816 {
817     FilterChain::iterator it_filter =
818         std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
819     if (it_filter == m_filter_chain.end())
820     {
821         // dont accept data inject from a unknown filter
822         return;
823     }
824     // well: pass the data through the remaining filters:
825     // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
826     std::string data(_data);
827     for(++it_filter;
828         it_filter != m_filter_chain.end();
829         ++it_filter)
830     {
831         data= (*it_filter)->filterIncomingData(data);
832     }
833     m_input_buffer+= data;
834     m_signal_read();
835 } // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
836
837
838 /**
839  * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
840  * @param from_filter the filter which injects the new data.
841  * @param _data  the new data.
842  */
843 void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
844 {
845     FilterChain::reverse_iterator it_filter =
846         std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
847     if (it_filter == m_filter_chain.rend())
848     {
849         // dont accept data inject from a unknown filter
850         return;
851     }
852     // well: pass the data through the remaining filters:
853     // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
854     std::string data(_data);
855     for(++it_filter;
856         it_filter!= m_filter_chain.rend();
857         ++it_filter)
858     {
859         data= (*it_filter)->filterOutgoingData(data);
860     }
861     m_output_buffer+= data;
862
863     // if we can send immediately, do it:
864     if (! m_output_buffer.empty() && m_marked_for_writing)
865     {
866         doWrite();
867     }
868 } // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
869
870
871 /**
872  * set the read file descriptor.
873  * Although a derived class can also set the read fd directly; this method should be used
874  * for this task since it updates some flags on the fd for async operation.
875  * @param fd the new read file descriptor.
876  */
877 void IOImplementation::setReadFd(int fd)
878 {
879     // test if we already have a valid descriptor (and may have to close it):
880     if (m_read_fd >=0 )
881     {
882         if (m_read_fd == fd)
883         {
884             // fd was already right; consider it to be ok.
885             return;
886         }
887         close(Direction::in);
888     }
889     // reset our errno:
890     m_errno= 0;
891     // if the new descriptor looks valid, set some flags:
892     if (fd >= 0)
893     {
894         long flags= ::fcntl(fd, F_GETFL);
895         if (flags != -1)
896         {
897             // set the flags for non blocking, async operation
898             flags |= O_NONBLOCK|O_ASYNC;
899             ::fcntl(fd,F_SETFL, flags);
900         }
901         else if ( errno == EBADF )
902         {
903             // well, we seemed to be fed with an invalid descriptor...:
904             m_errno = errno;
905             fd= -1;
906         }
907     }
908     if (fd >= 0) // if still valid:
909     {
910         // set the close-on-exec flag
911         ::fcntl(fd,F_SETFD, FD_CLOEXEC);
912     }
913     m_read_fd= fd;
914     m_marked_for_reading= false;
915     m_eof= false;
916 } // eo IOImplementation::setReadFd(int)
917
918
919
920 /**
921  * set the write file descriptor.
922  * Although a derived class can also set the write fd directly; this method should be used
923  * for this task since it updates some flags on the fd for async operation.
924  * @param fd the new write file descriptor.
925  */
926 void IOImplementation::setWriteFd(int fd)
927 {
928     if (m_write_fd >=0 )
929     {
930         if (m_write_fd == fd)
931         {
932             // fd was already right; consider it to be ok.
933             return;
934         }
935         close(Direction::out);
936     }
937     // reset our errno:
938     m_errno= 0;
939     // if the new descriptor looks valid, set some flags:
940     if (fd >= 0)
941     {
942         long flags= ::fcntl(fd, F_GETFL);
943         if (flags != -1)
944         {
945             // set the flags for non blocking, async operation
946             flags |= O_NONBLOCK|O_ASYNC;
947             ::fcntl(fd,F_SETFL, flags);
948         }
949         else if (errno == EBADF)
950         {
951             // well, we seemed to be fed with an invalid descriptor...:
952             m_errno = errno;
953             fd= -1;
954         }
955     }
956     if (fd >= 0) // if still valid:
957     {
958         // set the close-on-exec flag
959         ::fcntl(fd,F_SETFD, FD_CLOEXEC);
960     }
961     m_write_fd = fd;
962     m_marked_for_writing= false;
963     m_not_writable= false;
964 } // eo IOImplementation::setWriteFd(int)
965
966
967
968 /**
969  * called by the backend when this object can write data.
970  *
971  * If some data was sended, the signal @a m_signal_write is called.
972  *
973  * @internal tries to write all buffered data to output; if this succeeds,
974  * the connection is assumed to be still able to accept more data.
975  * (i.e. the internal write mark is kept!)
976  *
977   * @note overload this method only when You know what You are doing!
978 */
979 void IOImplementation::doWrite()
980 {
981     m_errno = 0;
982     if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
983     {
984         return;
985     }
986
987     ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
988
989     //reset mark:
990     m_marked_for_writing= false;
991
992     // now write the data
993     ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
994
995     ODOUT("::write -> " << count);
996
997     if (count < 0) // error
998     {
999         m_errno= errno;
1000         int fd= m_write_fd;
1001
1002         switch(m_errno)
1003         {
1004             case EPIPE:
1005                 m_not_writable= true;
1006                 // emit a signal
1007                 m_signal_not_writable();
1008                 // fall through
1009             case EINVAL:
1010             case EBADF:
1011             case ECONNRESET:
1012             case ENETRESET:
1013                 if (fd == m_write_fd)
1014                 {
1015                     close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
1016                 }
1017                 break;
1018         }
1019     }
1020     else
1021     {
1022         m_output_buffer.erase(0, count);
1023         if (m_output_buffer.empty())
1024         {
1025             // special case: if we were able to send all the data, we keep the write mark:
1026             m_marked_for_writing= true;
1027         }
1028     }
1029     if (count > 0)
1030     {
1031         m_signal_write();
1032     }
1033 } // eo IOImplementation::doWrite
1034
1035
1036 /*
1037  * implementation of SimpleIO
1038  */
1039
1040
1041 SimpleIO::SimpleIO(int read_fd, int write_fd)
1042 : inherited(read_fd, write_fd)
1043 {
1044     m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
1045 } // eo SimpleIO::SimpleIO()
1046
1047
1048 SimpleIO::~SimpleIO()
1049 {
1050 } // eo SimpleIO::~SimpleIO()
1051
1052
1053 /**
1054  * sends a string.
1055  * @param data the string.
1056  */
1057 void SimpleIO::sendString(const std::string& data)
1058 {
1059     lowSend(data);
1060 } // eo SimpleIO::sendString(const std::string&)
1061
1062
1063 /**
1064  * emits the signal signalReceived with the received data.
1065  * This slot is connected to IOImplementation::m_signal_read.
1066  */
1067 void SimpleIO::slotReceived()
1068 {
1069     std::string data;
1070     data.swap(m_input_buffer);
1071     signal_received_string(data);
1072 } // eo SimpleIO::slotReceived()
1073
1074
1075
1076 /*
1077  * implementation of SimpleIO2
1078  */
1079
1080
1081 SimpleIO2::SimpleIO2(int read_fd, int write_fd)
1082 : inherited(read_fd, write_fd)
1083 {
1084     m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
1085 } // eo SimpleIO2::SimpleIO2()
1086
1087
1088 SimpleIO2::~SimpleIO2()
1089 {
1090 } // eo SimpleIO2::~SimpleIO2()
1091
1092
1093 /**
1094  * sends a string.
1095  * @param data the string.
1096  */
1097 void SimpleIO2::sendString(const std::string& data)
1098 {
1099     lowSend(data);
1100 } // eo SimpleIO2::sendString(const std::string&)
1101
1102
1103 /**
1104  * emits the signal signalReceived with the received data.
1105  * This slot is connected to IOImplementation::m_signal_read.
1106  */
1107 void SimpleIO2::slotReceived()
1108 {
1109     std::string data;
1110     data.swap(m_input_buffer);
1111     signal_received_string(data);
1112 } // eo SimpleIO2::slotReceived()
1113
1114
1115
1116 /*
1117  * implementation of class Backend (singleton)
1118  */
1119
1120 Backend* Backend::g_backend= NULL;
1121
1122 int Backend::m_count_active_steps=0;
1123
1124
1125 Backend::Backend()
1126 : m_count_active_loops(0)
1127 , m_count_stop_requests(0)
1128 {
1129         SystemTools::ignore_signal( SystemTools::Signal::PIPE );
1130 } // eo Backend::Backend
1131
1132
1133 Backend::~Backend()
1134 {
1135     SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
1136 } // eo Backend::~Backend()
1137
1138 /**
1139  * delivers pointer to the current backend, instantiating a new backend if there was no current one.
1140  *
1141  * This should be the only way to access the backend which should be a singleton.
1142  *
1143  * @return the pointer to the current backend.
1144  */
1145 Backend* Backend::getBackend()
1146 {
1147     if (!g_backend)
1148     {
1149         g_backend = new Backend();
1150     }
1151     return g_backend;
1152 } // eo Backend::getBackend
1153
1154
1155
1156
1157 /**
1158  * performs one backend cycle.
1159  *
1160  * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
1161  * Also determines the timer events which become due and adjusts the timeout.
1162  * Constructs the necessary structures and calls poll().
1163  * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
1164  *
1165  * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
1166  * @return @a true if there was at least one active object; otherwise @a false
1167  *
1168  * @note this method is a little beast.
1169  *
1170  * @internal
1171  * The cycle is divided into four steps: collecting; poll; mark and execute.
1172  * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
1173  * are calling @a Backup::doOneStep or open their own local backend loop.
1174  *
1175  * @todo handle some more error cases.
1176  * @todo provide a plugin interface for external handler.
1177  * (currently inclusion of external handler is possible by (ab)using timer classes)
1178  */
1179 bool Backend::doOneStep(int timeout)
1180 {
1181     ODOUT( "timeout=" << timeout );
1182     internal_io::PollDataCluster poll_data;
1183     bool had_active_object = false;
1184
1185     ++m_count_active_steps;
1186
1187     try {
1188
1189         FdSetType local_read_fds;
1190         FdSetType local_write_fds;
1191
1192         // step 1 ; collect
1193
1194         { // step 1.1: collect fds for read/write operations
1195             for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
1196                 itIOList != internal_io::g_io_list().end();
1197                 ++itIOList)
1198             {
1199                 if (! *itIOList) continue; // skip NULL entries
1200                 int  read_fd  = (*itIOList)->m_read_fd;
1201                 int  write_fd = (*itIOList)->m_write_fd;
1202                 bool want_read  = (read_fd >= 0) and (*itIOList)->wantRead();
1203                 bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
1204                 if (read_fd >= 0 )
1205                 {
1206                     local_read_fds.insert( read_fd );
1207                 }
1208                 if (write_fd >= 0)
1209                 {
1210                     local_write_fds.insert( write_fd );
1211                 }
1212                 if (!want_read && !want_write) continue;
1213                 if (want_read)
1214                 {
1215                     FODOUT( (*itIOList), "wants to read  (fd=" << read_fd << ")");
1216                     poll_data.add_read_fd(read_fd, *itIOList);
1217                 }
1218                 if (want_write)
1219                 {
1220                     FODOUT( (*itIOList), "wants to write  (fd=" << write_fd << ")");
1221                     poll_data.add_write_fd(write_fd, *itIOList);
1222                 }
1223                 had_active_object= true;
1224             }
1225         }
1226
1227         { // step 1.2: collect timer events
1228             MilliTime current_time;
1229             MilliTime min_event_time;
1230
1231             get_current_monotonic_time(current_time);
1232             bool min_event_time_set;
1233
1234             if (timeout >= 0)
1235             {
1236                 min_event_time = current_time + MilliTime(0,timeout);
1237                 min_event_time_set= true;
1238             }
1239             else
1240             {
1241                 min_event_time = current_time + MilliTime(86400,0);
1242                 min_event_time_set= false;
1243             }
1244             // TODO
1245
1246             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1247                 it_timer != internal_io::g_timer_list().end()
1248                 && (!had_active_object || !min_event_time_set || current_time < min_event_time);
1249                 ++ it_timer)
1250             {
1251                 if (! *it_timer) continue; // skip NULL entries
1252                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1253                 if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
1254                 {
1255                     min_event_time = (*it_timer)->m_when;
1256                     min_event_time_set= true;
1257                 }
1258                 had_active_object= true;
1259             }
1260
1261             if (min_event_time_set)
1262             { // we have at a minimal event time, so (re)compute the timeout value:
1263                 MilliTime delta= (min_event_time - current_time);
1264                 long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
1265                 if (delta_ms <= 0L)
1266                 {
1267                     timeout= 0L;
1268                 }
1269                 else
1270                 {
1271                     timeout= delta_ms + (delta_ms<5 ? 1 : 3);
1272                 }
1273             }
1274         }
1275
1276         // step 2 : poll
1277         ODOUT(" poll timeout is " << timeout);
1278         {
1279             MilliTime current_time;
1280             get_current_monotonic_time(current_time);
1281             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1282         }
1283         int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
1284
1285         ODOUT("poll -> " << poll_result);
1286         {
1287             MilliTime current_time;
1288             get_current_monotonic_time(current_time);
1289             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1290         }
1291
1292         if (poll_result < 0)
1293         {
1294             //TODO poll error handling (signals ?!)
1295         }
1296
1297         // step 3 : mark
1298
1299         // step 3.1: mark io objects (if necessary)
1300         if (poll_result > 0)
1301         {
1302             for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
1303                 itPollItem != poll_data.m_poll_vector.end();
1304                 ++itPollItem)
1305             {
1306                 ODOUT("  fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
1307                 if ( 0 == (itPollItem->revents))
1308                 { // preliminary continuation if nothing is to handle for this item(/fd)...
1309                     continue;
1310                 }
1311                 if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
1312                 {
1313                     IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
1314                     if (io && io->m_read_fd==itPollItem->fd)
1315                     {
1316                         FODOUT(io,"marked for reading");
1317                         io->m_marked_for_reading= true;
1318                     }
1319                 }
1320                 if ( 0!= (itPollItem->revents & POLLOUT))
1321                 {
1322                     IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1323                     if (io && io->m_write_fd==itPollItem->fd)
1324                     {
1325                         io->m_marked_for_writing= true;
1326                     }
1327                 }
1328                 if ( 0!= (itPollItem->revents & POLLERR))
1329                 {
1330                     IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1331                     if (0!= (itPollItem->events & POLLOUT))
1332                     {
1333                         if (io && io->m_write_fd==itPollItem->fd)
1334                         {
1335                             io->m_marked_for_writing= false;
1336                             //io->close( Direction::out );
1337                         }
1338                     }
1339                 }
1340                 // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
1341             }
1342         }
1343
1344         //Step 3.2: mark timer objects
1345         {
1346             MilliTime current_time;
1347
1348             get_current_monotonic_time(current_time);
1349             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1350
1351             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1352                 it_timer != internal_io::g_timer_list().end();
1353                 ++ it_timer)
1354             {
1355                 ODOUT("  check timer " << *it_timer);
1356                 if (! *it_timer) continue; // skip NULL entries
1357                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1358                 if ( (*it_timer)->m_when <= current_time)
1359                 {
1360                     ODOUT("   ==> MARK");
1361                     (*it_timer)->m_marked = true;
1362                 }
1363             }
1364         }
1365
1366
1367         // step 4 : execute
1368
1369         // step 4.1: execute io
1370         ODOUT("execute stage");
1371         for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
1372             it_io != internal_io::g_io_list().end();
1373             ++ it_io)
1374         {
1375             ODOUT("  check obj " << *it_io);
1376             if (NULL == *it_io) continue;
1377             if ((*it_io)->m_marked_for_writing)
1378             {
1379                 FODOUT((*it_io),"exec doWrite");
1380                 (*it_io)->doWrite();
1381                 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1382                 if ((*it_io)->m_errno)
1383                 {
1384                     continue;
1385                 }
1386             }
1387             if ((*it_io)->m_marked_for_reading)
1388             {
1389                 FODOUT((*it_io),"exec doRead");
1390                 (*it_io)->doRead();
1391                 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1392                 if ((*it_io)->m_errno)
1393                 {
1394                     continue;
1395                 }
1396             }
1397         }
1398
1399         // step 4.2: execute timer events
1400         {
1401             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1402                 it_timer != internal_io::g_timer_list().end();
1403                 ++ it_timer)
1404             {
1405                 if (! *it_timer) continue; // skip NULL entries
1406                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1407                 if (! (*it_timer)->m_marked) continue; // skip if not marked
1408
1409                 // reset the mark and deactivate object now since the execute() method might activate it again
1410                 (*it_timer)->m_marked= false;
1411                 (*it_timer)->m_active= false;
1412
1413                 // now execute the event:
1414                 (*it_timer)->execute();
1415             }
1416         }
1417
1418     } // eo try
1419     catch(...)
1420     {
1421         // clean up our counter
1422         --m_count_active_steps;
1423         // and forward the exception
1424         throw;
1425     }
1426
1427     if ( 0 == --m_count_active_steps)
1428     {
1429         internal_io::g_io_list().clean_list();
1430         internal_io::g_timer_list().clean_list();
1431     }
1432
1433     return had_active_object;
1434 } // eo Backend::doOneStep
1435
1436
1437 /**
1438  * enters a backend loop.
1439  *
1440  * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
1441  * active objects (io objects or timer objects).
1442  */
1443 void Backend::run()
1444 {
1445     ++m_count_active_loops;
1446     do
1447     {
1448         try
1449         {
1450             if (!doOneStep(c_max_poll_wait))
1451             {
1452                 // stop if there are no more active objects.
1453                 stop();
1454             }
1455         }
1456         catch(...)
1457         {
1458             // clean up our counter
1459             --m_count_active_loops;
1460             // and forward the exception
1461             throw;
1462         }
1463     }
1464     while (0 == m_count_stop_requests);
1465     --m_count_active_loops;
1466     --m_count_stop_requests;
1467 } // eo Backend::run
1468
1469
1470 /**
1471  * @brief stops the latest loop currently run by Backend::run().
1472  * @see Backend::run()
1473  */
1474 void Backend::stop()
1475 {
1476     if (m_count_active_loops)
1477     {
1478         ++m_count_stop_requests;
1479     }
1480 } // eo Backend::stop()
1481
1482
1483
1484 } // eo namespace AsyncIo