564f4e094b3d4d0e35ec8c88c6535cb99a0a6bde
[libasyncio] / asyncio / async_io.cpp
1 /*
2 The software in this package is distributed under the GNU General
3 Public License version 2 (with a special exception described below).
4
5 A copy of GNU General Public License (GPL) is included in this distribution,
6 in the file COPYING.GPL.
7
8 As a special exception, if other files instantiate templates or use macros
9 or inline functions from this file, or you compile this file and link it
10 with other works to produce a work based on this file, this file
11 does not by itself cause the resulting work to be covered
12 by the GNU General Public License.
13
14 However the source code for this file must still be made available
15 in accordance with section (3) of the GNU General Public License.
16
17 This exception does not invalidate any other reasons why a work based
18 on this file might be covered by the GNU General Public License.
19 */
20 /** @file
21  * @copyright Copyright © 2007-2008 by Intra2net AG
22  */
23
24 //#define NOISEDEBUG
25
26 #include "async_io.hpp"
27 #include <asyncio_config.hpp>
28
29 #include <list>
30 #include <vector>
31 #include <map>
32 #include <algorithm>
33 #include <utility>
34
35 #include <sys/poll.h>
36 #include <sys/time.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39 #include <errno.h>
40 #include <fcntl.h>
41
42 #include <boost/bind.hpp>
43
44 #ifdef HAVE_LIBI2NCOMMON
45 #  include <signalfunc.hpp>
46 //#  include <timefunc.hxx>
47 #else
48 #include <signal.h>
49 #endif
50
51 #include <iostream>
52
53 #ifdef NOISEDEBUG
54 #include <iostream>
55 #include <iomanip>
56 #define DOUT(msg) std::cout << msg << std::endl
57 #define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
58 //#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
59 #define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
60 #else
61 #define DOUT(msg) do {} while (0)
62 #define FODOUT(obj,msg) do {} while (0)
63 #define ODOUT(msg) do {} while (0)
64 #endif
65
66
67 namespace
68 {
69
70 using namespace AsyncIo::Utils;
71
72 /*
73  * configuration:
74  */
75
76
77 const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
78
79
80 /**
81  * contains internal helper structs and functions for io handling.
82  */
83 namespace internal_io
84 {
85
86 /**
87  * extends struct pollfd with some convenience functions
88  */
89 struct PollFd : public ::pollfd
90 {
91     PollFd()
92     {
93         fd= 0;
94         events= revents= 0;
95     } // eo PollFd
96
97
98     /**
99      * initializes the struct with a given file descriptor and clears the event mask(s).
100      * @param _fd 
101      */
102     PollFd(int _fd)
103     {
104         fd= _fd;
105         events= revents= 0;
106     } // eo PollFd
107
108
109     /**
110      * set that we want to be notified about new incoming data
111      */
112     void setPOLLIN()  { events |= POLLIN; }
113
114     /**
115      * set that we want to be notified if we can send (more) data.
116      */
117     void setPOLLOUT() { events |= POLLOUT; }
118
119 }; // eo struct PollFd
120
121
122 typedef std::vector<PollFd> PollVector;
123 typedef std::map<int,PollVector::size_type> FdPollMap;
124 typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap;
125
126
127 /**
128  * struct for interfacing our local structures with poll()
129  */
130 struct PollDataCluster
131 {
132     PollVector m_poll_vector;
133     FdPollMap  m_fd_poll_map;
134     FdIOMap    m_read_fd_io_map;
135     FdIOMap    m_write_fd_io_map;
136
137     void add_read_fd( int fd, AsyncIo::IOImplementation* io);
138     void add_write_fd( int fd, AsyncIo::IOImplementation* io);
139
140     pollfd* get_pollfd_ptr();
141     unsigned int get_num_pollfds() const;
142
143 }; // eo struct PollDataCluster
144
145
146 typedef PtrList< AsyncIo::IOImplementation, true > IOList;
147 typedef PtrList< AsyncIo::TimerBase, true >        TimerList;
148
149 template<> int IOList::GlobalCountType::InstanceCount= 0;
150 template<> int TimerList::GlobalCountType::InstanceCount= 0;
151
152
153 /**
154  * the (internal) global list of io objects (object pointers)
155  */
156 IOList& g_io_list()
157 {
158     static IOList _the_io_list;
159     return _the_io_list;
160 };
161
162
163 /**
164  * the (internal) global list of timer objects (object pointers)
165  */
166 TimerList& g_timer_list()
167 {
168     static TimerList _the_timer_list;
169     return _the_timer_list;
170 }
171
172 /*
173  * implementation of PollDataCluster
174  */
175
176
177 /**
178  * add a new file descriptor to the read list.
179  *
180  * @param fd the file descriptor.
181  * @param io the io object which uses the fd for reading.
182  */
183 void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io)
184 {
185     FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
186     if (itPollMap != m_fd_poll_map.end())
187     {
188         m_poll_vector[ itPollMap->second ].setPOLLIN();
189     }
190     else
191     {
192         PollFd item(fd);
193         item.setPOLLIN();
194         m_fd_poll_map[fd] = m_poll_vector.size();
195         m_poll_vector.push_back( item );
196     }
197     m_read_fd_io_map[fd]= io;
198 } // eo PollDataCluster::add_read_fd
199
200
201 /**
202  * add a new file descriptor to the write list.
203  *
204  * @param fd the file descriptor.
205  * @param io the io object which uses the fd for writing.
206  */
207 void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io)
208 {
209     FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
210     if (itPollMap != m_fd_poll_map.end())
211     {
212         m_poll_vector[ itPollMap->second ].setPOLLOUT();
213     }
214     else
215     {
216         PollFd item(fd);
217         item.setPOLLOUT();
218         m_fd_poll_map[fd] = m_poll_vector.size();
219         m_poll_vector.push_back( item );
220     }
221     m_write_fd_io_map[fd]= io;
222 } // eo PollDataCluster::add_write_fd
223
224
225 /**
226  * returns a pointer to a pollfd array; suitable for passing to poll()
227  *
228  * @return pointer to pollfd array
229  */
230 pollfd* PollDataCluster::get_pollfd_ptr()
231 {
232     return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
233 } // eo get_pollfd_ptr
234
235
236 /**
237  * returns the number of entries in the pollfd array; suitable for passing to poll()
238  *
239  * @return the number of entries in the pollfd array
240  */
241 unsigned int PollDataCluster::get_num_pollfds() const
242 {
243     return m_poll_vector.size();
244 } // eo get_num_pollfds
245
246
247
248 } // eo namespace internal_io
249
250
251 /*
252  * some internal tool functions and structures
253  */
254
255 struct FilterMatch {
256     AsyncIo::FilterBasePtr m_filter;
257
258     FilterMatch(AsyncIo::FilterBasePtr filter)
259     : m_filter(filter)
260     {}
261
262     bool operator () (const AsyncIo::FilterBasePtr& item)
263     {
264         return item && item == m_filter;
265     }
266
267 }; // eo struct FilterMatch
268
269
270
271 } // eo anonymous namespace
272
273
274
275
276 namespace AsyncIo
277 {
278
279
280 /*
281  * implementation of TimerBase
282  */
283
284 /**
285  * constructor. Adds the object to the internal timer list.
286  */
287 TimerBase::TimerBase()
288 : m_active(false)
289 , m_marked(false)
290 {
291     internal_io::g_timer_list().add_item(this);
292 } // eo TimerBase::TimerBase
293
294
295 /**
296  * destructor. Removes the object from the internal timer list.
297  */
298 TimerBase::~TimerBase()
299 {
300     ODOUT("enter");
301     if (internal_io::TimerList::Instances())
302     {
303         ODOUT("remove from list");
304         internal_io::g_timer_list().remove_item(this);
305     }
306 } // eo TimerBase::~TimerBase
307
308
309 /**
310  * @brief returns the point in time when the time is executed in real time.
311  * @return the point in time when the timer is to be executed.
312  */
313 MilliTime TimerBase::getRealWhenTime() const
314 {
315     MilliTime mono_time;
316     MilliTime real_time;
317     get_current_monotonic_time(mono_time);
318     get_current_real_time(real_time);
319     MilliTime result= m_when - mono_time + real_time;
320     return result;
321 } // eo TimerBase::getRealWhenTime() const
322
323
324 /**
325  * sets the time when the event should be executed.
326  * @param sec the seconds part of the point in time.
327  * @param msec  the milliseconds part of the point in time.
328  */
329 void TimerBase::setWhenTime(long sec, long msec)
330 {
331     m_when.set(sec,msec);
332     m_marked= false;
333 } // eo TimerBase::setWhenTime
334
335
336 /**
337  * sets the time when the event should be executed.
338  * @param mt the point in time.
339  */
340 void TimerBase::setWhenTime(const MilliTime& mt)
341 {
342     m_when= mt;
343     m_marked= false;
344 } // eo TimerBase::setWhenTime
345
346
347 /**
348  * sets the time delta measured from current time when the event should be executed.
349  * @param sec the seconds of the time delta
350  * @param msec the milli seconds of the time delta
351  */
352 void TimerBase::setDeltaWhenTime(long sec, long msec)
353 {
354     setDeltaWhenTime( MilliTime(sec,msec) );
355 } // eo TimerBase::setWhenTime
356
357
358
359 /**
360  * sets the time delta measured from current time when the event should be executed.
361  * @param mt the time delta
362  */
363 void TimerBase::setDeltaWhenTime(const MilliTime& mt)
364 {
365     get_current_monotonic_time(m_when);
366     m_when+= mt;
367     m_marked= false;
368 } // eo TimerBase::setWhenTime
369
370
371 /**
372  * set the active state of the timer event.
373  * @param active determines if the object should be active (default: yes).
374  */
375 void TimerBase::activate(bool active)
376 {
377     m_active = active;
378     if (!active)
379     {
380         // clear the mark if we are not active.
381         m_marked= false;
382     }
383 } // eo TimerBase::activate
384
385
386 /** @fn void TimerBase::deactivate()
387  *  deactivates the event by clearing the active state.
388  */
389
390
391 /**
392  * called when the timer event occured.
393  */
394 void TimerBase::execute()
395 {
396 } // eo TimerBase::execute
397
398
399 /*
400  * implementation of FilterBase class
401  */
402
403
404 FilterBase::FilterBase()
405 : m_io(NULL)
406 {
407 } // eo FilterBase::FilterBase()
408
409
410 /**
411  * injects incoming data.
412  * @param data  the new data
413  */
414 void FilterBase::injectIncomingData(const std::string& data)
415 {
416     if (m_io)
417     {
418         FilterBasePtr ptr= get_ptr_as< FilterBase >();
419         if (ptr)
420         {
421             m_io->injectIncomingData(ptr,data);
422         }
423     }
424 } // FilterBase::injectIncomingData(const std::string&)
425
426
427 /**
428  * injects outgoing data.
429  * @param data the new data
430  */
431 void FilterBase::injectOutgoingData(const std::string& data)
432 {
433     if (m_io)
434     {
435         FilterBasePtr ptr= get_ptr_as< FilterBase >();
436         if (ptr)
437         {
438             m_io->injectOutgoingData(ptr,data);
439         }
440     }
441 } // eo FilterBase::injectOutgoingData(const std::string&)
442
443
444
445 /**
446  * called when EOF detected on incoming channel (or incoming channel closed)
447  */
448 void FilterBase::endOfIncomingData()
449 {
450 } // eo FilterBase::endOfIncomingData()
451
452 /**
453  * called when the filter should reset.
454  * This is used when a new channel is opened or when the filter is taken out of a filter chain.
455  */
456 void FilterBase::reset()
457 {
458 } // eo FilterBase::reset()
459
460
461 /*
462  * implementation of IOImplementation class
463  */
464
465
466 /**
467  * constructor for the base io class.
468  *
469  * Also adds the object to internal list of io objects (which is used by the backend).
470  *
471  * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
472  * @param write_fd  the file descriptor which should be used for writing (default -1 for no value)
473  */
474 IOImplementation::IOImplementation(int read_fd, int write_fd)
475 : m_read_fd(-1)
476 , m_write_fd(-1)
477 , m_eof(false)
478 , m_not_writable(false)
479 , m_input_buffer()
480 , m_output_buffer()
481 , m_marked_for_reading(false)
482 , m_marked_for_writing(false)
483 {
484     internal_io::g_io_list().add_item(this);
485     if (read_fd >= 0)
486     {
487         setReadFd( read_fd );
488     }
489     if (write_fd >= 0)
490     {
491         setWriteFd( write_fd );
492     }
493 } // eo IOImplementation::IOImplementation
494
495
496 /**
497  * destructor of the base io class.
498  *
499  * Removes the object from the interal list of io objects.
500  */
501 IOImplementation::~IOImplementation()
502 {
503     close();
504     if (internal_io::IOList::Instances())
505     {
506         internal_io::g_io_list().remove_item(this);
507     }
508     // now clear the filters:
509     while (! m_filter_chain.empty() )
510     {
511         FilterChain::iterator it = m_filter_chain.begin();
512         (*it)->reset();
513         (*it)->m_io= NULL;
514         //TODO: signal the filter that it is removed ?!
515         m_filter_chain.erase(it);
516     }
517 } // eo IOImplementation::~IOImplementation
518
519
520 /**
521  * adds another filter to the filter chain.
522  * @param filter pointer to the new filter.
523  */
524 void IOImplementation::addFilter
525 (
526     FilterBasePtr filter
527 )
528 {
529     if (!filter)
530     {
531         return; // nothing to do
532     }
533     if (filter->m_io)
534     {
535         filter->m_io->removeFilter(filter);
536     }
537     m_filter_chain.push_back( filter );
538 } // eo IOImplementation::addFilter
539
540
541 /**
542  * removes a filter from the filter chain.
543  * @param filter the pointer to the filter which is removed.
544  * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
545  * deleting the filter if it was dynamically allocated.
546  */
547 void IOImplementation::removeFilter
548 (
549     FilterBasePtr filter
550 )
551 {
552     FilterChain::iterator it =
553         std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
554     if (it != m_filter_chain.end())
555     {
556         filter->reset();
557         filter->m_io= NULL;
558         //TODO: signal the filter that it is removed ?!
559         m_filter_chain.erase(it);
560     }
561 } // eo IOImplementation::removeFilter
562
563
564 /**
565  * closes the file descriptors (/ the connection).
566  *
567  * @param direction the direction which should be closed (default: @a Direction::both for all).
568  */
569 void IOImplementation::close(Direction direction)
570 {
571     bool had_read_fd= (m_read_fd >= 0);
572     m_errno= 0;
573     if (direction == Direction::unspecified) direction= Direction::both;
574     if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
575     { // special case: half closed (socket) connections...
576         // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
577         // (read_fd or write_fd) this class works as desired.
578         switch(direction)
579         {
580             case Direction::in:
581                 {
582                     int res= ::shutdown(m_read_fd, SHUT_RD);
583                     if (res<0) 
584                     {
585                         m_errno= errno;
586                     }
587                     m_read_fd= -1;
588                     if (!m_eof)
589                     {
590                         for(FilterChain::iterator it= m_filter_chain.begin();
591                             it != m_filter_chain.end();
592                             ++it)
593                         {
594                             (*it)->endOfIncomingData();
595                         }
596                     }
597                 }
598                 return;
599
600             case Direction::out:
601                 {
602                     int res= ::shutdown(m_write_fd, SHUT_WR);
603                     if (res<0)
604                     {
605                         m_errno= errno;
606                     }
607                     m_write_fd= -1;
608                     m_output_buffer.clear();
609                 }
610                 return;
611         }
612     }
613     if (m_write_fd >= 0 && (direction & Direction::out) )
614     {
615         int res1 = ::close(m_write_fd);
616         if (m_write_fd == m_read_fd)
617         {
618             m_read_fd= -1;
619         }
620         m_write_fd= -1;
621         m_output_buffer.clear();
622         if (res1<0) m_errno= errno;
623     }
624     if (m_read_fd >=0 && (direction & Direction::in) )
625     {
626         int res1 = ::close(m_read_fd);
627         m_read_fd= -1;
628         if (res1<0) m_errno= errno;
629     }
630     if (had_read_fd && !m_eof && (m_read_fd<0))
631     {
632         for(FilterChain::iterator it= m_filter_chain.begin();
633             it != m_filter_chain.end();
634             ++it)
635         {
636             (*it)->endOfIncomingData();
637         }
638     }
639 } // eo IOImplementation::close
640
641
642 /**
643  * determines if the io class wants to read data.
644  * Default implementation checks only for a valid file descriptor value.
645  *
646  * @return @a true if the objects wants to read data
647  */
648 bool IOImplementation::wantRead()
649 {
650     return (m_read_fd >= 0) && ! m_eof;
651 } // eo IOImplementation::wantRead
652
653
654 /**
655  * determines if the io class wants to write data.
656  * Default implementation checks for a valid file descriptor value and if the object
657  * cannot write data immediately.
658  *
659  * @return @a true if the objects wants to write data
660  */
661 bool IOImplementation::wantWrite()
662 {
663     return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
664 } // eo IOImplementation::wantWrite
665
666
667 /**
668  * delivers if opened.
669  * The default returns @a true if at least one file descriptor (read or write) is valid.
670  * @return @a true if opened.
671  */
672 bool IOImplementation::opened() const
673 {
674     return (m_read_fd>=0) || (m_write_fd>=0);
675 } // eo IOImplementation::opened() const
676
677
678 /**
679  * returns if the read side detected an end of file (EOF).
680  * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
681  */
682 bool IOImplementation::eof() const
683 {
684     return (m_read_fd < 0) || m_eof;
685 } // eo IOImplementatio::eof() const
686
687
688 /**
689  * @brief returns of the write side didn't detect that it cannot write.
690  * @return @a true if we can write.
691  */
692 bool IOImplementation::writable() const
693 {
694     return (m_write_fd >=0 ) and not m_not_writable;
695 } // eo IOImplementation::writable() const
696
697
698 /**
699  * returns if the output buffer is empty.
700  * @return 
701  */
702 bool IOImplementation::empty() const
703 {
704     return m_output_buffer.empty();
705 } // eo IOImplementation::empty
706
707 /**
708  * puts data into the output buffer and sends it immediately if possible,
709  *
710  * The data is passed through the filter chain before it's stored in the output buffer
711  * (i.e. the output buffer contains data as it should be send directly to the descriptor).
712  * @param _data the data which should be send.
713  */
714 void IOImplementation::lowSend(const std::string& _data)
715 {
716     std::string data(_data);
717
718     for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
719         it_filter!= m_filter_chain.rend();
720         ++it_filter)
721     {
722         data= (*it_filter)->filterOutgoingData(data);
723     }
724     m_output_buffer+= data;
725
726     // if we can send immediately, do it:
727     if (! m_output_buffer.empty() && m_marked_for_writing)
728     {
729         doWrite();
730     }
731 } // eo IOImplementation::lowSend
732
733
734 /**
735  * called by the backend when there is data to read for this object.
736  *
737  * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
738  * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
739  *
740  * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
741  *
742  * @note overload this method only when You know what You are doing!
743  * (overloading is necessary when handling server sockets.)
744  */
745 void IOImplementation::doRead()
746 {
747     // static read buffer; should be ok as long as we don't use threads
748     static char buffer[8*1024]; // 8 KiB
749
750     m_errno = 0;
751     if (m_read_fd<0 || !m_marked_for_reading)
752     {
753         ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
754         return;
755     }
756
757     // reset the mark:
758     m_marked_for_reading = false;
759
760     // now read the data:
761     ssize_t count;
762     count = ::read(m_read_fd, buffer, sizeof(buffer));
763
764     ODOUT("::read -> " << count);
765
766     // interpret what we got:
767     if (count < 0) // error
768     {
769         m_errno = errno;
770         int fd= m_read_fd;
771
772         switch(m_errno)
773         {
774             case EINVAL:
775             case EBADF:
776             case ECONNRESET:
777             case ENETRESET:
778                 if (fd == m_read_fd)
779                 {
780                     close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
781                 }
782                 break;
783         }
784     }
785     else if (count==0) // EOF
786     {
787         // remember the read fd:
788         int fd = m_read_fd;
789         // remember the EOF:
790         m_eof= true;
791         // signal EOF
792         m_signal_eof();
793         // if the fd is still the same: close it.
794         if (fd == m_read_fd)
795         {
796             close( Direction::in );
797         }
798     }
799     else // we have valid data
800     {
801         std::string data(buffer,count);
802         ODOUT(" got \"" << data << "\"");
803         for(FilterChain::iterator it_filter= m_filter_chain.begin();
804             it_filter != m_filter_chain.end();
805             ++it_filter)
806         {
807             data= (*it_filter)->filterIncomingData(data);
808         }
809         m_input_buffer+= data;
810         m_signal_read();
811     }
812 } // eo IOImplementation::doRead
813
814
815 /**
816  * interface for filter classes to inject data into the filter chain (emulating new incoming data).
817  * @param from_filter the filter which injects the new data.
818  * @param _data  the new data.
819  */
820 void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
821 {
822     FilterChain::iterator it_filter =
823         std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
824     if (it_filter == m_filter_chain.end())
825     {
826         // dont accept data inject from a unknown filter
827         return;
828     }
829     // well: pass the data through the remaining filters:
830     // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
831     std::string data(_data);
832     for(++it_filter;
833         it_filter != m_filter_chain.end();
834         ++it_filter)
835     {
836         data= (*it_filter)->filterIncomingData(data);
837     }
838     m_input_buffer+= data;
839     m_signal_read();
840 } // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
841
842
843 /**
844  * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
845  * @param from_filter the filter which injects the new data.
846  * @param _data  the new data.
847  */
848 void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
849 {
850     FilterChain::reverse_iterator it_filter =
851         std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
852     if (it_filter == m_filter_chain.rend())
853     {
854         // dont accept data inject from a unknown filter
855         return;
856     }
857     // well: pass the data through the remaining filters:
858     // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
859     std::string data(_data);
860     for(++it_filter;
861         it_filter!= m_filter_chain.rend();
862         ++it_filter)
863     {
864         data= (*it_filter)->filterOutgoingData(data);
865     }
866     m_output_buffer+= data;
867
868     // if we can send immediately, do it:
869     if (! m_output_buffer.empty() && m_marked_for_writing)
870     {
871         doWrite();
872     }
873 } // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
874
875
876 /**
877  * set the read file descriptor.
878  * Although a derived class can also set the read fd directly; this method should be used
879  * for this task since it updates some flags on the fd for async operation.
880  * @param fd the new read file descriptor.
881  */
882 void IOImplementation::setReadFd(int fd)
883 {
884     // test if we already have a valid descriptor (and may have to close it):
885     if (m_read_fd >=0 )
886     {
887         if (m_read_fd == fd)
888         {
889             // fd was already right; consider it to be ok.
890             return;
891         }
892         close(Direction::in);
893     }
894     // reset our errno:
895     m_errno= 0;
896     // if the new descriptor looks valid, set some flags:
897     if (fd >= 0)
898     {
899         long flags= ::fcntl(fd, F_GETFL);
900         if (flags != -1)
901         {
902             // set the flags for non blocking, async operation
903             flags |= O_NONBLOCK|O_ASYNC;
904             ::fcntl(fd,F_SETFL, flags);
905         }
906         else if ( errno == EBADF )
907         {
908             // well, we seemed to be fed with an invalid descriptor...:
909             m_errno = errno;
910             fd= -1;
911         }
912     }
913     if (fd >= 0) // if still valid:
914     {
915         // set the close-on-exec flag
916         ::fcntl(fd,F_SETFD, FD_CLOEXEC);
917     }
918     m_read_fd= fd;
919     m_marked_for_reading= false;
920     m_eof= false;
921 } // eo IOImplementation::setReadFd(int)
922
923
924
925 /**
926  * set the write file descriptor.
927  * Although a derived class can also set the write fd directly; this method should be used
928  * for this task since it updates some flags on the fd for async operation.
929  * @param fd the new write file descriptor.
930  */
931 void IOImplementation::setWriteFd(int fd)
932 {
933     if (m_write_fd >=0 )
934     {
935         if (m_write_fd == fd)
936         {
937             // fd was already right; consider it to be ok.
938             return;
939         }
940         close(Direction::out);
941     }
942     // reset our errno:
943     m_errno= 0;
944     // if the new descriptor looks valid, set some flags:
945     if (fd >= 0)
946     {
947         long flags= ::fcntl(fd, F_GETFL);
948         if (flags != -1)
949         {
950             // set the flags for non blocking, async operation
951             flags |= O_NONBLOCK|O_ASYNC;
952             ::fcntl(fd,F_SETFL, flags);
953         }
954         else if (errno == EBADF)
955         {
956             // well, we seemed to be fed with an invalid descriptor...:
957             m_errno = errno;
958             fd= -1;
959         }
960     }
961     if (fd >= 0) // if still valid:
962     {
963         // set the close-on-exec flag
964         ::fcntl(fd,F_SETFD, FD_CLOEXEC);
965     }
966     m_write_fd = fd;
967     m_marked_for_writing= false;
968     m_not_writable= false;
969 } // eo IOImplementation::setWriteFd(int)
970
971
972
973 /**
974  * called by the backend when this object can write data.
975  *
976  * If some data was sended, the signal @a m_signal_write is called.
977  *
978  * @internal tries to write all buffered data to output; if this succeeds,
979  * the connection is assumed to be still able to accept more data.
980  * (i.e. the internal write mark is kept!)
981  *
982   * @note overload this method only when You know what You are doing!
983 */
984 void IOImplementation::doWrite()
985 {
986     m_errno = 0;
987     if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
988     {
989         return;
990     }
991
992     ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
993
994     //reset mark:
995     m_marked_for_writing= false;
996
997     // now write the data
998     ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
999
1000     ODOUT("::write -> " << count);
1001
1002     if (count < 0) // error
1003     {
1004         m_errno= errno;
1005         int fd= m_write_fd;
1006
1007         switch(m_errno)
1008         {
1009             case EPIPE:
1010                 m_not_writable= true;
1011                 // emit a signal
1012                 m_signal_not_writable();
1013                 // fall through
1014             case EINVAL:
1015             case EBADF:
1016             case ECONNRESET:
1017             case ENETRESET:
1018                 if (fd == m_write_fd)
1019                 {
1020                     close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
1021                 }
1022                 break;
1023         }
1024     }
1025     else
1026     {
1027         m_output_buffer.erase(0, count);
1028         if (m_output_buffer.empty())
1029         {
1030             // special case: if we were able to send all the data, we keep the write mark:
1031             m_marked_for_writing= true;
1032         }
1033     }
1034     if (count > 0)
1035     {
1036         m_signal_write();
1037     }
1038 } // eo IOImplementation::doWrite
1039
1040
1041 /*
1042  * implementation of SimpleIO
1043  */
1044
1045
1046 SimpleIO::SimpleIO(int read_fd, int write_fd)
1047 : inherited(read_fd, write_fd)
1048 {
1049     m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
1050 } // eo SimpleIO::SimpleIO()
1051
1052
1053 SimpleIO::~SimpleIO()
1054 {
1055 } // eo SimpleIO::~SimpleIO()
1056
1057
1058 /**
1059  * sends a string.
1060  * @param data the string.
1061  */
1062 void SimpleIO::sendString(const std::string& data)
1063 {
1064     lowSend(data);
1065 } // eo SimpleIO::sendString(const std::string&)
1066
1067
1068 /**
1069  * emits the signal signalReceived with the received data.
1070  * This slot is connected to IOImplementation::m_signal_read.
1071  */
1072 void SimpleIO::slotReceived()
1073 {
1074     std::string data;
1075     data.swap(m_input_buffer);
1076     signal_received_string(data);
1077 } // eo SimpleIO::slotReceived()
1078
1079
1080
1081 /*
1082  * implementation of SimpleIO2
1083  */
1084
1085
1086 SimpleIO2::SimpleIO2(int read_fd, int write_fd)
1087 : inherited(read_fd, write_fd)
1088 {
1089     m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
1090 } // eo SimpleIO2::SimpleIO2()
1091
1092
1093 SimpleIO2::~SimpleIO2()
1094 {
1095 } // eo SimpleIO2::~SimpleIO2()
1096
1097
1098 /**
1099  * sends a string.
1100  * @param data the string.
1101  */
1102 void SimpleIO2::sendString(const std::string& data)
1103 {
1104     lowSend(data);
1105 } // eo SimpleIO2::sendString(const std::string&)
1106
1107
1108 /**
1109  * emits the signal signalReceived with the received data.
1110  * This slot is connected to IOImplementation::m_signal_read.
1111  */
1112 void SimpleIO2::slotReceived()
1113 {
1114     std::string data;
1115     data.swap(m_input_buffer);
1116     signal_received_string(data);
1117 } // eo SimpleIO2::slotReceived()
1118
1119
1120
1121 /*
1122  * implementation of class Backend (singleton)
1123  */
1124
1125 Backend* Backend::g_backend= NULL;
1126
1127 int Backend::m_count_active_steps=0;
1128
1129
1130 Backend::Backend()
1131 : m_count_active_loops(0)
1132 , m_count_stop_requests(0)
1133 {
1134 #ifdef HAVE_LIBI2NCOMMON
1135     SystemTools::ignore_signal( SystemTools::Signal::PIPE );
1136 #else
1137     signal( SIGPIPE, SIG_IGN );
1138 #endif
1139 } // eo Backend::Backend
1140
1141
1142 Backend::~Backend()
1143 {
1144 #ifdef HAVE_LIBI2NCOMMON
1145     SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
1146 #else
1147     signal( SIGPIPE, SIG_DFL );
1148 #endif
1149 } // eo Backend::~Backend()
1150
1151 /**
1152  * delivers pointer to the current backend, instantiating a new backend if there was no current one.
1153  *
1154  * This should be the only way to access the backend which should be a singleton.
1155  *
1156  * @return the pointer to the current backend.
1157  */
1158 Backend* Backend::getBackend()
1159 {
1160     if (!g_backend)
1161     {
1162         g_backend = new Backend();
1163     }
1164     return g_backend;
1165 } // eo Backend::getBackend
1166
1167
1168
1169
1170 /**
1171  * performs one backend cycle.
1172  *
1173  * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
1174  * Also determines the timer events which become due and adjusts the timeout.
1175  * Constructs the necessary structures and calls poll().
1176  * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
1177  *
1178  * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
1179  * @return @a true if there was at least one active object; otherwise @a false
1180  *
1181  * @note this method is a little beast.
1182  *
1183  * @internal 
1184  * The cycle is divided into four steps: collecting; poll; mark and execute.
1185  * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
1186  * are calling @a Backup::doOneStep or open their own local backend loop.
1187  *
1188  * @todo handle some more error cases.
1189  * @todo provide a plugin interface for external handler.
1190  * (currently inclusion of external handler is possible by (ab)using timer classes)
1191  */
1192 bool Backend::doOneStep(int timeout)
1193 {
1194     ODOUT( "timeout=" << timeout );
1195     internal_io::PollDataCluster poll_data;
1196     bool had_active_object = false;
1197
1198     ++m_count_active_steps;
1199
1200     try {
1201
1202         FdSetType local_read_fds;
1203         FdSetType local_write_fds;
1204
1205         // step 1 ; collect
1206
1207         { // step 1.1: collect fds for read/write operations
1208             for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
1209                 itIOList != internal_io::g_io_list().end();
1210                 ++itIOList)
1211             {
1212                 if (! *itIOList) continue; // skip NULL entries
1213                 int  read_fd  = (*itIOList)->m_read_fd;
1214                 int  write_fd = (*itIOList)->m_write_fd;
1215                 bool want_read  = (read_fd >= 0) and (*itIOList)->wantRead();
1216                 bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
1217                 if (read_fd >= 0 )
1218                 {
1219                     local_read_fds.insert( read_fd );
1220                 }
1221                 if (write_fd >= 0)
1222                 {
1223                     local_write_fds.insert( write_fd );
1224                 }
1225                 if (!want_read && !want_write) continue;
1226                 if (want_read)
1227                 {
1228                     FODOUT( (*itIOList), "wants to read  (fd=" << read_fd << ")");
1229                     poll_data.add_read_fd(read_fd, *itIOList);
1230                 }
1231                 if (want_write)
1232                 {
1233                     FODOUT( (*itIOList), "wants to write  (fd=" << write_fd << ")");
1234                     poll_data.add_write_fd(write_fd, *itIOList);
1235                 }
1236                 had_active_object= true;
1237             }
1238         }
1239
1240         { // step 1.2: collect timer events
1241             MilliTime current_time;
1242             MilliTime min_event_time;
1243
1244             get_current_monotonic_time(current_time);
1245             bool min_event_time_set;
1246
1247             if (timeout >= 0)
1248             {
1249                 min_event_time = current_time + MilliTime(0,timeout);
1250                 min_event_time_set= true;
1251             }
1252             else
1253             {
1254                 min_event_time = current_time + MilliTime(86400,0);
1255                 min_event_time_set= false;
1256             }
1257             // TODO
1258
1259             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1260                 it_timer != internal_io::g_timer_list().end() 
1261                 && (!had_active_object || !min_event_time_set || current_time < min_event_time);
1262                 ++ it_timer)
1263             {
1264                 if (! *it_timer) continue; // skip NULL entries
1265                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1266                 if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
1267                 {
1268                     min_event_time = (*it_timer)->m_when;
1269                     min_event_time_set= true;
1270                 }
1271                 had_active_object= true;
1272             }
1273
1274             if (min_event_time_set)
1275             { // we have at a minimal event time, so (re)compute the timeout value:
1276                 MilliTime delta= (min_event_time - current_time);
1277                 long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
1278                 if (delta_ms <= 0L)
1279                 {
1280                     timeout= 0L;
1281                 }
1282                 else
1283                 {
1284                     timeout= delta_ms + (delta_ms<5 ? 1 : 3);
1285                 }
1286             }
1287         }
1288
1289         // step 2 : poll
1290         ODOUT(" poll timeout is " << timeout);
1291         {
1292             MilliTime current_time;
1293             get_current_monotonic_time(current_time);
1294             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1295         }
1296         int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
1297
1298         ODOUT("poll -> " << poll_result);
1299         {
1300             MilliTime current_time;
1301             get_current_monotonic_time(current_time);
1302             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1303         }
1304
1305         if (poll_result < 0)
1306         {
1307             //TODO poll error handling (signals ?!)
1308         }
1309
1310         // step 3 : mark
1311
1312         // step 3.1: mark io objects (if necessary)
1313         if (poll_result > 0)
1314         {
1315             for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
1316                 itPollItem != poll_data.m_poll_vector.end();
1317                 ++itPollItem)
1318             {
1319                 ODOUT("  fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
1320                 if ( 0 == (itPollItem->revents))
1321                 { // preliminary continuation if nothing is to handle for this item(/fd)...
1322                     continue;
1323                 }
1324                 if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
1325                 {
1326                     IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
1327                     if (io && io->m_read_fd==itPollItem->fd)
1328                     {
1329                         FODOUT(io,"marked for reading");
1330                         io->m_marked_for_reading= true;
1331                     }
1332                 }
1333                 if ( 0!= (itPollItem->revents & POLLOUT))
1334                 {
1335                     IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1336                     if (io && io->m_write_fd==itPollItem->fd)
1337                     {
1338                         io->m_marked_for_writing= true;
1339                     }
1340                 }
1341                 if ( 0!= (itPollItem->revents & POLLERR))
1342                 {
1343                     IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1344                     if (0!= (itPollItem->events & POLLOUT))
1345                     {
1346                         if (io && io->m_write_fd==itPollItem->fd)
1347                         {
1348                             io->m_marked_for_writing= false;
1349                             //io->close( Direction::out );
1350                         }
1351                     }
1352                 }
1353                 // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
1354             }
1355         }
1356
1357         //Step 3.2: mark timer objects
1358         {
1359             MilliTime current_time;
1360
1361             get_current_monotonic_time(current_time);
1362             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1363
1364             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1365                 it_timer != internal_io::g_timer_list().end();
1366                 ++ it_timer)
1367             {
1368                 ODOUT("  check timer " << *it_timer);
1369                 if (! *it_timer) continue; // skip NULL entries
1370                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1371                 if ( (*it_timer)->m_when <= current_time)
1372                 {
1373                     ODOUT("   ==> MARK");
1374                     (*it_timer)->m_marked = true;
1375                 }
1376             }
1377         }
1378
1379
1380         // step 4 : execute
1381
1382         // step 4.1: execute io
1383         ODOUT("execute stage");
1384         for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
1385             it_io != internal_io::g_io_list().end();
1386             ++ it_io)
1387         {
1388             ODOUT("  check obj " << *it_io);
1389             if (NULL == *it_io) continue;
1390             if ((*it_io)->m_marked_for_writing)
1391             {
1392                 FODOUT((*it_io),"exec doWrite");
1393                 (*it_io)->doWrite();
1394                 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1395                 if ((*it_io)->m_errno)
1396                 {
1397                     continue;
1398                 }
1399             }
1400             if ((*it_io)->m_marked_for_reading)
1401             {
1402                 FODOUT((*it_io),"exec doRead");
1403                 (*it_io)->doRead();
1404                 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1405                 if ((*it_io)->m_errno)
1406                 {
1407                     continue;
1408                 }
1409             }
1410         }
1411
1412         // step 4.2: execute timer events
1413         {
1414             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1415                 it_timer != internal_io::g_timer_list().end();
1416                 ++ it_timer)
1417             {
1418                 if (! *it_timer) continue; // skip NULL entries
1419                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1420                 if (! (*it_timer)->m_marked) continue; // skip if not marked
1421
1422                 // reset the mark and deactivate object now since the execute() method might activate it again
1423                 (*it_timer)->m_marked= false;
1424                 (*it_timer)->m_active= false;
1425
1426                 // now execute the event:
1427                 (*it_timer)->execute();
1428             }
1429         }
1430
1431     } // eo try
1432     catch(...)
1433     {
1434         // clean up our counter
1435         --m_count_active_steps;
1436         // and forward the exception
1437         throw;
1438     }
1439
1440     if ( 0 == --m_count_active_steps)
1441     {
1442         internal_io::g_io_list().clean_list();
1443         internal_io::g_timer_list().clean_list();
1444     }
1445
1446     return had_active_object;
1447 } // eo Backend::doOneStep
1448
1449
1450 /**
1451  * enters a backend loop.
1452  *
1453  * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
1454  * active objects (io objects or timer objects).
1455  */
1456 void Backend::run()
1457 {
1458     ++m_count_active_loops;
1459     do
1460     {
1461         try
1462         {
1463             if (!doOneStep(c_max_poll_wait))
1464             {
1465                 // stop if there are no more active objects.
1466                 stop();
1467             }
1468         }
1469         catch(...)
1470         {
1471             // clean up our counter
1472             --m_count_active_loops;
1473             // and forward the exception
1474             throw;
1475         }
1476     } 
1477     while (0 == m_count_stop_requests);
1478     --m_count_active_loops;
1479     --m_count_stop_requests;
1480 } // eo Backend::run
1481
1482
1483 /**
1484  * @brief stops the latest loop currently run by Backend::run().
1485  * @see Backend::run()
1486  */
1487 void Backend::stop()
1488 {
1489     if (m_count_active_loops)
1490     {
1491         ++m_count_stop_requests;
1492     }
1493 } // eo Backend::stop()
1494
1495
1496
1497 } // eo namespace AsyncIo