Implement getline() for IOImplementation.
[libasyncio] / asyncio / async_io.cpp
1 /*
2 The software in this package is distributed under the GNU General
3 Public License version 2 (with a special exception described below).
4
5 A copy of GNU General Public License (GPL) is included in this distribution,
6 in the file COPYING.GPL.
7
8 As a special exception, if other files instantiate templates or use macros
9 or inline functions from this file, or you compile this file and link it
10 with other works to produce a work based on this file, this file
11 does not by itself cause the resulting work to be covered
12 by the GNU General Public License.
13
14 However the source code for this file must still be made available
15 in accordance with section (3) of the GNU General Public License.
16
17 This exception does not invalidate any other reasons why a work based
18 on this file might be covered by the GNU General Public License.
19 */
20 /** @file
21  * @copyright Copyright © 2007-2008 by Intra2net AG
22  */
23
24 //#define NOISEDEBUG
25
26 #include "async_io.hpp"
27 #include <asyncio_config.hpp>
28
29 #include <list>
30 #include <vector>
31 #include <map>
32 #include <algorithm>
33 #include <utility>
34
35 #include <sys/poll.h>
36 #include <sys/time.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39 #include <errno.h>
40 #include <fcntl.h>
41
42 #include <boost/bind.hpp>
43
44 #include <asyncio_signalfunc.hpp>
45
46 #include <iostream>
47
48 #ifdef NOISEDEBUG
49 #include <iostream>
50 #include <iomanip>
51 #define DOUT(msg) std::cout << msg << std::endl
52 #define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
53 //#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
54 #define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
55 #else
56 #define DOUT(msg) do {} while (0)
57 #define FODOUT(obj,msg) do {} while (0)
58 #define ODOUT(msg) do {} while (0)
59 #endif
60
61
62 namespace
63 {
64
65 using namespace AsyncIo::Utils;
66
67 /*
68  * configuration:
69  */
70
71
72 const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
73
74
75 /**
76  * contains internal helper structs and functions for io handling.
77  */
78 namespace internal_io
79 {
80
81 /**
82  * extends struct pollfd with some convenience functions
83  */
84 struct PollFd : public ::pollfd
85 {
86     PollFd()
87     {
88         fd= 0;
89         events= revents= 0;
90     } // eo PollFd
91
92
93     /**
94      * initializes the struct with a given file descriptor and clears the event mask(s).
95      * @param _fd
96      */
97     PollFd(int _fd)
98     {
99         fd= _fd;
100         events= revents= 0;
101     } // eo PollFd
102
103
104     /**
105      * set that we want to be notified about new incoming data
106      */
107     void setPOLLIN()  { events |= POLLIN; }
108
109     /**
110      * set that we want to be notified if we can send (more) data.
111      */
112     void setPOLLOUT() { events |= POLLOUT; }
113
114 }; // eo struct PollFd
115
116
117 typedef std::vector<PollFd> PollVector;
118 typedef std::map<int,PollVector::size_type> FdPollMap;
119 typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap;
120
121
122 /**
123  * struct for interfacing our local structures with poll()
124  */
125 struct PollDataCluster
126 {
127     PollVector m_poll_vector;
128     FdPollMap  m_fd_poll_map;
129     FdIOMap    m_read_fd_io_map;
130     FdIOMap    m_write_fd_io_map;
131
132     void add_read_fd( int fd, AsyncIo::IOImplementation* io);
133     void add_write_fd( int fd, AsyncIo::IOImplementation* io);
134
135     pollfd* get_pollfd_ptr();
136     unsigned int get_num_pollfds() const;
137
138 }; // eo struct PollDataCluster
139
140
141 typedef PtrList< AsyncIo::IOImplementation, true > IOList;
142 typedef PtrList< AsyncIo::TimerBase, true >        TimerList;
143
144 template<> int IOList::GlobalCountType::InstanceCount= 0;
145 template<> int TimerList::GlobalCountType::InstanceCount= 0;
146
147
148 /**
149  * the (internal) global list of io objects (object pointers)
150  */
151 IOList& g_io_list()
152 {
153     static IOList _the_io_list;
154     return _the_io_list;
155 };
156
157
158 /**
159  * the (internal) global list of timer objects (object pointers)
160  */
161 TimerList& g_timer_list()
162 {
163     static TimerList _the_timer_list;
164     return _the_timer_list;
165 }
166
167 /*
168  * implementation of PollDataCluster
169  */
170
171
172 /**
173  * add a new file descriptor to the read list.
174  *
175  * @param fd the file descriptor.
176  * @param io the io object which uses the fd for reading.
177  */
178 void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io)
179 {
180     FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
181     if (itPollMap != m_fd_poll_map.end())
182     {
183         m_poll_vector[ itPollMap->second ].setPOLLIN();
184     }
185     else
186     {
187         PollFd item(fd);
188         item.setPOLLIN();
189         m_fd_poll_map[fd] = m_poll_vector.size();
190         m_poll_vector.push_back( item );
191     }
192     m_read_fd_io_map[fd]= io;
193 } // eo PollDataCluster::add_read_fd
194
195
196 /**
197  * add a new file descriptor to the write list.
198  *
199  * @param fd the file descriptor.
200  * @param io the io object which uses the fd for writing.
201  */
202 void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io)
203 {
204     FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
205     if (itPollMap != m_fd_poll_map.end())
206     {
207         m_poll_vector[ itPollMap->second ].setPOLLOUT();
208     }
209     else
210     {
211         PollFd item(fd);
212         item.setPOLLOUT();
213         m_fd_poll_map[fd] = m_poll_vector.size();
214         m_poll_vector.push_back( item );
215     }
216     m_write_fd_io_map[fd]= io;
217 } // eo PollDataCluster::add_write_fd
218
219
220 /**
221  * returns a pointer to a pollfd array; suitable for passing to poll()
222  *
223  * @return pointer to pollfd array
224  */
225 pollfd* PollDataCluster::get_pollfd_ptr()
226 {
227     return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
228 } // eo get_pollfd_ptr
229
230
231 /**
232  * returns the number of entries in the pollfd array; suitable for passing to poll()
233  *
234  * @return the number of entries in the pollfd array
235  */
236 unsigned int PollDataCluster::get_num_pollfds() const
237 {
238     return m_poll_vector.size();
239 } // eo get_num_pollfds
240
241
242
243 } // eo namespace internal_io
244
245
246 /*
247  * some internal tool functions and structures
248  */
249
250 struct FilterMatch {
251     AsyncIo::FilterBasePtr m_filter;
252
253     FilterMatch(AsyncIo::FilterBasePtr filter)
254     : m_filter(filter)
255     {}
256
257     bool operator () (const AsyncIo::FilterBasePtr& item)
258     {
259         return item && item == m_filter;
260     }
261
262 }; // eo struct FilterMatch
263
264
265
266 } // eo anonymous namespace
267
268
269
270
271 namespace AsyncIo
272 {
273
274
275 /*
276  * implementation of TimerBase
277  */
278
279 /**
280  * constructor. Adds the object to the internal timer list.
281  */
282 TimerBase::TimerBase()
283 : m_active(false)
284 , m_marked(false)
285 {
286     internal_io::g_timer_list().add_item(this);
287 } // eo TimerBase::TimerBase
288
289
290 /**
291  * destructor. Removes the object from the internal timer list.
292  */
293 TimerBase::~TimerBase()
294 {
295     ODOUT("enter");
296     if (internal_io::TimerList::Instances())
297     {
298         ODOUT("remove from list");
299         internal_io::g_timer_list().remove_item(this);
300     }
301 } // eo TimerBase::~TimerBase
302
303
304 /**
305  * @brief returns the point in time when the time is executed in real time.
306  * @return the point in time when the timer is to be executed.
307  */
308 MilliTime TimerBase::getRealWhenTime() const
309 {
310     MilliTime mono_time;
311     MilliTime real_time;
312     get_current_monotonic_time(mono_time);
313     get_current_real_time(real_time);
314     MilliTime result= m_when - mono_time + real_time;
315     return result;
316 } // eo TimerBase::getRealWhenTime() const
317
318
319 /**
320  * sets the time when the event should be executed.
321  * @param sec the seconds part of the point in time.
322  * @param msec  the milliseconds part of the point in time.
323  */
324 void TimerBase::setWhenTime(long sec, long msec)
325 {
326     m_when.set(sec,msec);
327     m_marked= false;
328 } // eo TimerBase::setWhenTime
329
330
331 /**
332  * sets the time when the event should be executed.
333  * @param mt the point in time.
334  */
335 void TimerBase::setWhenTime(const MilliTime& mt)
336 {
337     m_when= mt;
338     m_marked= false;
339 } // eo TimerBase::setWhenTime
340
341
342 /**
343  * sets the time delta measured from current time when the event should be executed.
344  * @param sec the seconds of the time delta
345  * @param msec the milli seconds of the time delta
346  */
347 void TimerBase::setDeltaWhenTime(long sec, long msec)
348 {
349     setDeltaWhenTime( MilliTime(sec,msec) );
350 } // eo TimerBase::setWhenTime
351
352
353
354 /**
355  * sets the time delta measured from current time when the event should be executed.
356  * @param mt the time delta
357  */
358 void TimerBase::setDeltaWhenTime(const MilliTime& mt)
359 {
360     get_current_monotonic_time(m_when);
361     m_when+= mt;
362     m_marked= false;
363 } // eo TimerBase::setWhenTime
364
365
366 /**
367  * set the active state of the timer event.
368  * @param active determines if the object should be active (default: yes).
369  */
370 void TimerBase::activate(bool active)
371 {
372     m_active = active;
373     if (!active)
374     {
375         // clear the mark if we are not active.
376         m_marked= false;
377     }
378 } // eo TimerBase::activate
379
380
381 /** @fn void TimerBase::deactivate()
382  *  deactivates the event by clearing the active state.
383  */
384
385
386 /**
387  * called when the timer event occured.
388  */
389 void TimerBase::execute()
390 {
391 } // eo TimerBase::execute
392
393
394 /*
395  * implementation of FilterBase class
396  */
397
398
399 FilterBase::FilterBase()
400 : m_io(NULL)
401 {
402 } // eo FilterBase::FilterBase()
403
404
405 /**
406  * injects incoming data.
407  * @param data  the new data
408  */
409 void FilterBase::injectIncomingData(const std::string& data)
410 {
411     if (m_io)
412     {
413         FilterBasePtr ptr= get_ptr_as< FilterBase >();
414         if (ptr)
415         {
416             m_io->injectIncomingData(ptr,data);
417         }
418     }
419 } // FilterBase::injectIncomingData(const std::string&)
420
421
422 /**
423  * injects outgoing data.
424  * @param data the new data
425  */
426 void FilterBase::injectOutgoingData(const std::string& data)
427 {
428     if (m_io)
429     {
430         FilterBasePtr ptr= get_ptr_as< FilterBase >();
431         if (ptr)
432         {
433             m_io->injectOutgoingData(ptr,data);
434         }
435     }
436 } // eo FilterBase::injectOutgoingData(const std::string&)
437
438
439
440 /**
441  * called when EOF detected on incoming channel (or incoming channel closed)
442  */
443 void FilterBase::endOfIncomingData()
444 {
445 } // eo FilterBase::endOfIncomingData()
446
447 /**
448  * called when the filter should reset.
449  * This is used when a new channel is opened or when the filter is taken out of a filter chain.
450  */
451 void FilterBase::reset()
452 {
453 } // eo FilterBase::reset()
454
455
456 /*
457  * implementation of IOImplementation class
458  */
459
460
461 /**
462  * constructor for the base io class.
463  *
464  * Also adds the object to internal list of io objects (which is used by the backend).
465  *
466  * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
467  * @param write_fd  the file descriptor which should be used for writing (default -1 for no value)
468  */
469 IOImplementation::IOImplementation(int read_fd, int write_fd)
470 : m_read_fd(-1)
471 , m_write_fd(-1)
472 , m_eof(false)
473 , m_not_writable(false)
474 , m_input_buffer()
475 , m_output_buffer()
476 , m_marked_for_reading(false)
477 , m_marked_for_writing(false)
478 {
479     internal_io::g_io_list().add_item(this);
480     if (read_fd >= 0)
481     {
482         setReadFd( read_fd );
483     }
484     if (write_fd >= 0)
485     {
486         setWriteFd( write_fd );
487     }
488 } // eo IOImplementation::IOImplementation
489
490
491 /**
492  * destructor of the base io class.
493  *
494  * Removes the object from the interal list of io objects.
495  */
496 IOImplementation::~IOImplementation()
497 {
498     close();
499     if (internal_io::IOList::Instances())
500     {
501         internal_io::g_io_list().remove_item(this);
502     }
503     // now clear the filters:
504     while (! m_filter_chain.empty() )
505     {
506         FilterChain::iterator it = m_filter_chain.begin();
507         (*it)->reset();
508         (*it)->m_io= NULL;
509         //TODO: signal the filter that it is removed ?!
510         m_filter_chain.erase(it);
511     }
512 } // eo IOImplementation::~IOImplementation
513
514
515 /**
516  * adds another filter to the filter chain.
517  * @param filter pointer to the new filter.
518  */
519 void IOImplementation::addFilter
520 (
521     FilterBasePtr filter
522 )
523 {
524     if (!filter)
525     {
526         return; // nothing to do
527     }
528     if (filter->m_io)
529     {
530         filter->m_io->removeFilter(filter);
531     }
532     m_filter_chain.push_back( filter );
533 } // eo IOImplementation::addFilter
534
535
536 /**
537  * removes a filter from the filter chain.
538  * @param filter the pointer to the filter which is removed.
539  * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
540  * deleting the filter if it was dynamically allocated.
541  */
542 void IOImplementation::removeFilter
543 (
544     FilterBasePtr filter
545 )
546 {
547     FilterChain::iterator it =
548         std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
549     if (it != m_filter_chain.end())
550     {
551         filter->reset();
552         filter->m_io= NULL;
553         //TODO: signal the filter that it is removed ?!
554         m_filter_chain.erase(it);
555     }
556 } // eo IOImplementation::removeFilter
557
558
559 /**
560  * closes the file descriptors (/ the connection).
561  *
562  * @param direction the direction which should be closed (default: @a Direction::both for all).
563  */
564 void IOImplementation::close(Direction direction)
565 {
566     bool had_read_fd= (m_read_fd >= 0);
567     m_errno= 0;
568     if (direction == Direction::unspecified) direction= Direction::both;
569     if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
570     { // special case: half closed (socket) connections...
571         // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
572         // (read_fd or write_fd) this class works as desired.
573         switch(direction)
574         {
575             case Direction::in:
576                 {
577                     int res= ::shutdown(m_read_fd, SHUT_RD);
578                     if (res<0)
579                     {
580                         m_errno= errno;
581                     }
582                     m_read_fd= -1;
583                     if (!m_eof)
584                     {
585                         for(FilterChain::iterator it= m_filter_chain.begin();
586                             it != m_filter_chain.end();
587                             ++it)
588                         {
589                             (*it)->endOfIncomingData();
590                         }
591                     }
592                 }
593                 return;
594
595             case Direction::out:
596                 {
597                     int res= ::shutdown(m_write_fd, SHUT_WR);
598                     if (res<0)
599                     {
600                         m_errno= errno;
601                     }
602                     m_write_fd= -1;
603                     m_output_buffer.clear();
604                 }
605                 return;
606         }
607     }
608     if (m_write_fd >= 0 && (direction & Direction::out) )
609     {
610         int res1 = ::close(m_write_fd);
611         if (m_write_fd == m_read_fd)
612         {
613             m_read_fd= -1;
614         }
615         m_write_fd= -1;
616         m_output_buffer.clear();
617         if (res1<0) m_errno= errno;
618     }
619     if (m_read_fd >=0 && (direction & Direction::in) )
620     {
621         int res1 = ::close(m_read_fd);
622         m_read_fd= -1;
623         if (res1<0) m_errno= errno;
624     }
625     if (had_read_fd && !m_eof && (m_read_fd<0))
626     {
627         for(FilterChain::iterator it= m_filter_chain.begin();
628             it != m_filter_chain.end();
629             ++it)
630         {
631             (*it)->endOfIncomingData();
632         }
633     }
634 } // eo IOImplementation::close
635
636
637 /**
638  * determines if the io class wants to read data.
639  * Default implementation checks only for a valid file descriptor value.
640  *
641  * @return @a true if the objects wants to read data
642  */
643 bool IOImplementation::wantRead()
644 {
645     return (m_read_fd >= 0) && ! m_eof;
646 } // eo IOImplementation::wantRead
647
648
649 /**
650  * determines if the io class wants to write data.
651  * Default implementation checks for a valid file descriptor value and if the object
652  * cannot write data immediately.
653  *
654  * @return @a true if the objects wants to write data
655  */
656 bool IOImplementation::wantWrite()
657 {
658     return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
659 } // eo IOImplementation::wantWrite
660
661
662 /**
663  * delivers if opened.
664  * The default returns @a true if at least one file descriptor (read or write) is valid.
665  * @return @a true if opened.
666  */
667 bool IOImplementation::opened() const
668 {
669     return (m_read_fd>=0) || (m_write_fd>=0);
670 } // eo IOImplementation::opened() const
671
672
673 /**
674  * returns if the read side detected an end of file (EOF).
675  * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
676  */
677 bool IOImplementation::eof() const
678 {
679     return (m_read_fd < 0) || m_eof;
680 } // eo IOImplementatio::eof() const
681
682
683 /**
684  * @brief returns of the write side didn't detect that it cannot write.
685  * @return @a true if we can write.
686  */
687 bool IOImplementation::writable() const
688 {
689     return (m_write_fd >=0 ) and not m_not_writable;
690 } // eo IOImplementation::writable() const
691
692
693 /**
694  * returns if the output buffer is empty.
695  * @return
696  */
697 bool IOImplementation::empty() const
698 {
699     return m_output_buffer.empty();
700 } // eo IOImplementation::empty
701
702 /**
703  * puts data into the output buffer and sends it immediately if possible,
704  *
705  * The data is passed through the filter chain before it's stored in the output buffer
706  * (i.e. the output buffer contains data as it should be send directly to the descriptor).
707  * @param _data the data which should be send.
708  */
709 void IOImplementation::lowSend(const std::string& _data)
710 {
711     std::string data(_data);
712
713     for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
714         it_filter!= m_filter_chain.rend();
715         ++it_filter)
716     {
717         data= (*it_filter)->filterOutgoingData(data);
718     }
719     m_output_buffer+= data;
720
721     // if we can send immediately, do it:
722     if (! m_output_buffer.empty() && m_marked_for_writing)
723     {
724         doWrite();
725     }
726 } // eo IOImplementation::lowSend
727
728
729 /**
730  * called by the backend when there is data to read for this object.
731  *
732  * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
733  * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
734  *
735  * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
736  *
737  * @note overload this method only when You know what You are doing!
738  * (overloading is necessary when handling server sockets.)
739  */
740 void IOImplementation::doRead()
741 {
742     // static read buffer; should be ok as long as we don't use threads
743     static char buffer[8*1024]; // 8 KiB
744
745     m_errno = 0;
746     if (m_read_fd<0 || !m_marked_for_reading)
747     {
748         ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
749         return;
750     }
751
752     // reset the mark:
753     m_marked_for_reading = false;
754
755     // now read the data:
756     ssize_t count;
757     count = ::read(m_read_fd, buffer, sizeof(buffer));
758
759     ODOUT("::read -> " << count);
760
761     // interpret what we got:
762     if (count < 0) // error
763     {
764         m_errno = errno;
765         int fd= m_read_fd;
766
767         switch(m_errno)
768         {
769             case EINVAL:
770             case EBADF:
771             case ECONNRESET:
772             case ENETRESET:
773                 if (fd == m_read_fd)
774                 {
775                     close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
776                 }
777                 break;
778         }
779     }
780     else if (count==0) // EOF
781     {
782         // remember the read fd:
783         int fd = m_read_fd;
784         // remember the EOF:
785         m_eof= true;
786         // signal EOF
787         m_signal_eof();
788         // if the fd is still the same: close it.
789         if (fd == m_read_fd)
790         {
791             close( Direction::in );
792         }
793     }
794     else // we have valid data
795     {
796         std::string data(buffer,count);
797         ODOUT(" got \"" << data << "\"");
798         for(FilterChain::iterator it_filter= m_filter_chain.begin();
799             it_filter != m_filter_chain.end();
800             ++it_filter)
801         {
802             data= (*it_filter)->filterIncomingData(data);
803         }
804         m_input_buffer+= data;
805         m_signal_read();
806     }
807 } // eo IOImplementation::doRead
808
809
810 /**
811  * interface for filter classes to inject data into the filter chain (emulating new incoming data).
812  * @param from_filter the filter which injects the new data.
813  * @param _data  the new data.
814  */
815 void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
816 {
817     FilterChain::iterator it_filter =
818         std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
819     if (it_filter == m_filter_chain.end())
820     {
821         // dont accept data inject from a unknown filter
822         return;
823     }
824     // well: pass the data through the remaining filters:
825     // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
826     std::string data(_data);
827     for(++it_filter;
828         it_filter != m_filter_chain.end();
829         ++it_filter)
830     {
831         data= (*it_filter)->filterIncomingData(data);
832     }
833     m_input_buffer+= data;
834     m_signal_read();
835 } // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
836
837
838 /**
839  * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
840  * @param from_filter the filter which injects the new data.
841  * @param _data  the new data.
842  */
843 void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
844 {
845     FilterChain::reverse_iterator it_filter =
846         std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
847     if (it_filter == m_filter_chain.rend())
848     {
849         // dont accept data inject from a unknown filter
850         return;
851     }
852     // well: pass the data through the remaining filters:
853     // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
854     std::string data(_data);
855     for(++it_filter;
856         it_filter!= m_filter_chain.rend();
857         ++it_filter)
858     {
859         data= (*it_filter)->filterOutgoingData(data);
860     }
861     m_output_buffer+= data;
862
863     // if we can send immediately, do it:
864     if (! m_output_buffer.empty() && m_marked_for_writing)
865     {
866         doWrite();
867     }
868 } // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
869
870
871 /**
872  * set the read file descriptor.
873  * Although a derived class can also set the read fd directly; this method should be used
874  * for this task since it updates some flags on the fd for async operation.
875  * @param fd the new read file descriptor.
876  */
877 void IOImplementation::setReadFd(int fd)
878 {
879     // test if we already have a valid descriptor (and may have to close it):
880     if (m_read_fd >=0 )
881     {
882         if (m_read_fd == fd)
883         {
884             // fd was already right; consider it to be ok.
885             return;
886         }
887         close(Direction::in);
888     }
889     // reset our errno:
890     m_errno= 0;
891     // if the new descriptor looks valid, set some flags:
892     if (fd >= 0)
893     {
894         long flags= ::fcntl(fd, F_GETFL);
895         if (flags != -1)
896         {
897             // set the flags for non blocking, async operation
898             flags |= O_NONBLOCK|O_ASYNC;
899             ::fcntl(fd,F_SETFL, flags);
900         }
901         else if ( errno == EBADF )
902         {
903             // well, we seemed to be fed with an invalid descriptor...:
904             m_errno = errno;
905             fd= -1;
906         }
907     }
908     if (fd >= 0) // if still valid:
909     {
910         // set the close-on-exec flag
911         ::fcntl(fd,F_SETFD, FD_CLOEXEC);
912     }
913     m_read_fd= fd;
914     m_marked_for_reading= false;
915     m_eof= false;
916 } // eo IOImplementation::setReadFd(int)
917
918
919
920 /**
921  * set the write file descriptor.
922  * Although a derived class can also set the write fd directly; this method should be used
923  * for this task since it updates some flags on the fd for async operation.
924  * @param fd the new write file descriptor.
925  */
926 void IOImplementation::setWriteFd(int fd)
927 {
928     if (m_write_fd >=0 )
929     {
930         if (m_write_fd == fd)
931         {
932             // fd was already right; consider it to be ok.
933             return;
934         }
935         close(Direction::out);
936     }
937     // reset our errno:
938     m_errno= 0;
939     // if the new descriptor looks valid, set some flags:
940     if (fd >= 0)
941     {
942         long flags= ::fcntl(fd, F_GETFL);
943         if (flags != -1)
944         {
945             // set the flags for non blocking, async operation
946             flags |= O_NONBLOCK|O_ASYNC;
947             ::fcntl(fd,F_SETFL, flags);
948         }
949         else if (errno == EBADF)
950         {
951             // well, we seemed to be fed with an invalid descriptor...:
952             m_errno = errno;
953             fd= -1;
954         }
955     }
956     if (fd >= 0) // if still valid:
957     {
958         // set the close-on-exec flag
959         ::fcntl(fd,F_SETFD, FD_CLOEXEC);
960     }
961     m_write_fd = fd;
962     m_marked_for_writing= false;
963     m_not_writable= false;
964 } // eo IOImplementation::setWriteFd(int)
965
966
967
968 /**
969  * called by the backend when this object can write data.
970  *
971  * If some data was sended, the signal @a m_signal_write is called.
972  *
973  * @internal tries to write all buffered data to output; if this succeeds,
974  * the connection is assumed to be still able to accept more data.
975  * (i.e. the internal write mark is kept!)
976  *
977   * @note overload this method only when You know what You are doing!
978 */
979 void IOImplementation::doWrite()
980 {
981     m_errno = 0;
982     if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
983     {
984         return;
985     }
986
987     ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
988
989     //reset mark:
990     m_marked_for_writing= false;
991
992     // now write the data
993     ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
994
995     ODOUT("::write -> " << count);
996
997     if (count < 0) // error
998     {
999         m_errno= errno;
1000         int fd= m_write_fd;
1001
1002         switch(m_errno)
1003         {
1004             case EPIPE:
1005                 m_not_writable= true;
1006                 // emit a signal
1007                 m_signal_not_writable();
1008                 // fall through
1009             case EINVAL:
1010             case EBADF:
1011             case ECONNRESET:
1012             case ENETRESET:
1013                 if (fd == m_write_fd)
1014                 {
1015                     close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
1016                 }
1017                 break;
1018         }
1019     }
1020     else
1021     {
1022         m_output_buffer.erase(0, count);
1023         if (m_output_buffer.empty())
1024         {
1025             // special case: if we were able to send all the data, we keep the write mark:
1026             m_marked_for_writing= true;
1027         }
1028     }
1029     if (count > 0)
1030     {
1031         m_signal_write();
1032     }
1033 } // eo IOImplementation::doWrite
1034
1035 /**
1036  * Return a copy of the input buffer.
1037  */
1038 std::string IOImplementation::getInput() const
1039 {
1040     return m_input_buffer;
1041 }
1042
1043 /**
1044  * Return the input buffer and clear it.
1045  */
1046 std::string IOImplementation::getInputClear()
1047 {
1048     std::string retbuf;
1049
1050     retbuf.swap(m_input_buffer);
1051
1052     return retbuf;
1053 }
1054
1055 /**
1056  * Return true if there are bytes available in the input buffer.
1057  */
1058 bool IOImplementation::inputAvailable() const
1059 {
1060     return !m_input_buffer.empty();
1061 }
1062
1063 /**
1064  * Cut off the first len bytes from the input buffer.
1065  * @returns The number of bytes actually shortened
1066  */
1067 std::string::size_type IOImplementation::shortenInput(std::string::size_type len)
1068 {
1069     std::string::size_type real_len=len;
1070     if (real_len > m_input_buffer.size())
1071         real_len=m_input_buffer.size();
1072
1073     if (real_len > 0)
1074         m_input_buffer.erase(0,real_len);
1075
1076     return real_len;
1077 }
1078
1079 /**
1080  * Read and remove a line from the input buffer.
1081  * @param delim the line ending character, \\n by default.
1082  * @returns A full line including the line ending if available, an empty string otherwise.
1083  */
1084 std::string IOImplementation::getline(char delim)
1085 {
1086     std::string line;
1087     std::string::size_type pos=m_input_buffer.find(delim);
1088
1089     // no line ending in the buffer?
1090     if (pos==std::string::npos)
1091         return line;
1092
1093     // copy the line including the delimiter
1094     line=m_input_buffer.substr(0,pos+1);
1095
1096     m_input_buffer.erase(0,pos+1);
1097
1098     return line;
1099 }
1100
1101
1102 /*
1103  * implementation of SimpleIO
1104  */
1105
1106
1107 SimpleIO::SimpleIO(int read_fd, int write_fd)
1108 : inherited(read_fd, write_fd)
1109 {
1110     m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
1111 } // eo SimpleIO::SimpleIO()
1112
1113
1114 SimpleIO::~SimpleIO()
1115 {
1116 } // eo SimpleIO::~SimpleIO()
1117
1118
1119 /**
1120  * sends a string.
1121  * @param data the string.
1122  */
1123 void SimpleIO::sendString(const std::string& data)
1124 {
1125     lowSend(data);
1126 } // eo SimpleIO::sendString(const std::string&)
1127
1128
1129 /**
1130  * emits the signal signalReceived with the received data.
1131  * This slot is connected to IOImplementation::m_signal_read.
1132  */
1133 void SimpleIO::slotReceived()
1134 {
1135     std::string data;
1136     data.swap(m_input_buffer);
1137     signal_received_string(data);
1138 } // eo SimpleIO::slotReceived()
1139
1140
1141
1142 /*
1143  * implementation of SimpleIO2
1144  */
1145
1146
1147 SimpleIO2::SimpleIO2(int read_fd, int write_fd)
1148 : inherited(read_fd, write_fd)
1149 {
1150     m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
1151 } // eo SimpleIO2::SimpleIO2()
1152
1153
1154 SimpleIO2::~SimpleIO2()
1155 {
1156 } // eo SimpleIO2::~SimpleIO2()
1157
1158
1159 /**
1160  * sends a string.
1161  * @param data the string.
1162  */
1163 void SimpleIO2::sendString(const std::string& data)
1164 {
1165     lowSend(data);
1166 } // eo SimpleIO2::sendString(const std::string&)
1167
1168
1169 /**
1170  * emits the signal signalReceived with the received data.
1171  * This slot is connected to IOImplementation::m_signal_read.
1172  */
1173 void SimpleIO2::slotReceived()
1174 {
1175     std::string data;
1176     data.swap(m_input_buffer);
1177     signal_received_string(data);
1178 } // eo SimpleIO2::slotReceived()
1179
1180
1181
1182 /*
1183  * implementation of class Backend (singleton)
1184  */
1185
1186 Backend* Backend::g_backend= NULL;
1187
1188 int Backend::m_count_active_steps=0;
1189
1190
1191 Backend::Backend()
1192 : m_count_active_loops(0)
1193 , m_count_stop_requests(0)
1194 {
1195         SystemTools::ignore_signal( SystemTools::Signal::PIPE );
1196 } // eo Backend::Backend
1197
1198
1199 Backend::~Backend()
1200 {
1201     SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
1202 } // eo Backend::~Backend()
1203
1204 /**
1205  * delivers pointer to the current backend, instantiating a new backend if there was no current one.
1206  *
1207  * This should be the only way to access the backend which should be a singleton.
1208  *
1209  * @return the pointer to the current backend.
1210  */
1211 Backend* Backend::getBackend()
1212 {
1213     if (!g_backend)
1214     {
1215         g_backend = new Backend();
1216     }
1217     return g_backend;
1218 } // eo Backend::getBackend
1219
1220
1221
1222
1223 /**
1224  * performs one backend cycle.
1225  *
1226  * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
1227  * Also determines the timer events which become due and adjusts the timeout.
1228  * Constructs the necessary structures and calls poll().
1229  * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
1230  *
1231  * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
1232  * @return @a true if there was at least one active object; otherwise @a false
1233  *
1234  * @note this method is a little beast.
1235  *
1236  * @internal
1237  * The cycle is divided into four steps: collecting; poll; mark and execute.
1238  * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
1239  * are calling @a Backup::doOneStep or open their own local backend loop.
1240  *
1241  * @todo handle some more error cases.
1242  * @todo provide a plugin interface for external handler.
1243  * (currently inclusion of external handler is possible by (ab)using timer classes)
1244  */
1245 bool Backend::doOneStep(int timeout)
1246 {
1247     ODOUT( "timeout=" << timeout );
1248     internal_io::PollDataCluster poll_data;
1249     bool had_active_object = false;
1250
1251     ++m_count_active_steps;
1252
1253     try {
1254
1255         FdSetType local_read_fds;
1256         FdSetType local_write_fds;
1257
1258         // step 1 ; collect
1259
1260         { // step 1.1: collect fds for read/write operations
1261             for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
1262                 itIOList != internal_io::g_io_list().end();
1263                 ++itIOList)
1264             {
1265                 if (! *itIOList) continue; // skip NULL entries
1266                 int  read_fd  = (*itIOList)->m_read_fd;
1267                 int  write_fd = (*itIOList)->m_write_fd;
1268                 bool want_read  = (read_fd >= 0) and (*itIOList)->wantRead();
1269                 bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
1270                 if (read_fd >= 0 )
1271                 {
1272                     local_read_fds.insert( read_fd );
1273                 }
1274                 if (write_fd >= 0)
1275                 {
1276                     local_write_fds.insert( write_fd );
1277                 }
1278                 if (!want_read && !want_write) continue;
1279                 if (want_read)
1280                 {
1281                     FODOUT( (*itIOList), "wants to read  (fd=" << read_fd << ")");
1282                     poll_data.add_read_fd(read_fd, *itIOList);
1283                 }
1284                 if (want_write)
1285                 {
1286                     FODOUT( (*itIOList), "wants to write  (fd=" << write_fd << ")");
1287                     poll_data.add_write_fd(write_fd, *itIOList);
1288                 }
1289                 had_active_object= true;
1290             }
1291         }
1292
1293         { // step 1.2: collect timer events
1294             MilliTime current_time;
1295             MilliTime min_event_time;
1296
1297             get_current_monotonic_time(current_time);
1298             bool min_event_time_set;
1299
1300             if (timeout >= 0)
1301             {
1302                 min_event_time = current_time + MilliTime(0,timeout);
1303                 min_event_time_set= true;
1304             }
1305             else
1306             {
1307                 min_event_time = current_time + MilliTime(86400,0);
1308                 min_event_time_set= false;
1309             }
1310             // TODO
1311
1312             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1313                 it_timer != internal_io::g_timer_list().end()
1314                 && (!had_active_object || !min_event_time_set || current_time < min_event_time);
1315                 ++ it_timer)
1316             {
1317                 if (! *it_timer) continue; // skip NULL entries
1318                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1319                 if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
1320                 {
1321                     min_event_time = (*it_timer)->m_when;
1322                     min_event_time_set= true;
1323                 }
1324                 had_active_object= true;
1325             }
1326
1327             if (min_event_time_set)
1328             { // we have at a minimal event time, so (re)compute the timeout value:
1329                 MilliTime delta= (min_event_time - current_time);
1330                 long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
1331                 if (delta_ms <= 0L)
1332                 {
1333                     timeout= 0L;
1334                 }
1335                 else
1336                 {
1337                     timeout= delta_ms + (delta_ms<5 ? 1 : 3);
1338                 }
1339             }
1340         }
1341
1342         // step 2 : poll
1343         ODOUT(" poll timeout is " << timeout);
1344         {
1345             MilliTime current_time;
1346             get_current_monotonic_time(current_time);
1347             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1348         }
1349         int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
1350
1351         ODOUT("poll -> " << poll_result);
1352         {
1353             MilliTime current_time;
1354             get_current_monotonic_time(current_time);
1355             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1356         }
1357
1358         if (poll_result < 0)
1359         {
1360             //TODO poll error handling (signals ?!)
1361         }
1362
1363         // step 3 : mark
1364
1365         // step 3.1: mark io objects (if necessary)
1366         if (poll_result > 0)
1367         {
1368             for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
1369                 itPollItem != poll_data.m_poll_vector.end();
1370                 ++itPollItem)
1371             {
1372                 ODOUT("  fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
1373                 if ( 0 == (itPollItem->revents))
1374                 { // preliminary continuation if nothing is to handle for this item(/fd)...
1375                     continue;
1376                 }
1377                 if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
1378                 {
1379                     IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
1380                     if (io && io->m_read_fd==itPollItem->fd)
1381                     {
1382                         FODOUT(io,"marked for reading");
1383                         io->m_marked_for_reading= true;
1384                     }
1385                 }
1386                 if ( 0!= (itPollItem->revents & POLLOUT))
1387                 {
1388                     IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1389                     if (io && io->m_write_fd==itPollItem->fd)
1390                     {
1391                         io->m_marked_for_writing= true;
1392                     }
1393                 }
1394                 if ( 0!= (itPollItem->revents & POLLERR))
1395                 {
1396                     IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1397                     if (0!= (itPollItem->events & POLLOUT))
1398                     {
1399                         if (io && io->m_write_fd==itPollItem->fd)
1400                         {
1401                             io->m_marked_for_writing= false;
1402                             //io->close( Direction::out );
1403                         }
1404                     }
1405                 }
1406                 // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
1407             }
1408         }
1409
1410         //Step 3.2: mark timer objects
1411         {
1412             MilliTime current_time;
1413
1414             get_current_monotonic_time(current_time);
1415             ODOUT("  current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1416
1417             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1418                 it_timer != internal_io::g_timer_list().end();
1419                 ++ it_timer)
1420             {
1421                 ODOUT("  check timer " << *it_timer);
1422                 if (! *it_timer) continue; // skip NULL entries
1423                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1424                 if ( (*it_timer)->m_when <= current_time)
1425                 {
1426                     ODOUT("   ==> MARK");
1427                     (*it_timer)->m_marked = true;
1428                 }
1429             }
1430         }
1431
1432
1433         // step 4 : execute
1434
1435         // step 4.1: execute io
1436         ODOUT("execute stage");
1437         for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
1438             it_io != internal_io::g_io_list().end();
1439             ++ it_io)
1440         {
1441             ODOUT("  check obj " << *it_io);
1442             if (NULL == *it_io) continue;
1443             if ((*it_io)->m_marked_for_writing)
1444             {
1445                 FODOUT((*it_io),"exec doWrite");
1446                 (*it_io)->doWrite();
1447                 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1448                 if ((*it_io)->m_errno)
1449                 {
1450                     continue;
1451                 }
1452             }
1453             if ((*it_io)->m_marked_for_reading)
1454             {
1455                 FODOUT((*it_io),"exec doRead");
1456                 (*it_io)->doRead();
1457                 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1458                 if ((*it_io)->m_errno)
1459                 {
1460                     continue;
1461                 }
1462             }
1463         }
1464
1465         // step 4.2: execute timer events
1466         {
1467             for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1468                 it_timer != internal_io::g_timer_list().end();
1469                 ++ it_timer)
1470             {
1471                 if (! *it_timer) continue; // skip NULL entries
1472                 if (! (*it_timer)->m_active) continue; // skip if not enabled
1473                 if (! (*it_timer)->m_marked) continue; // skip if not marked
1474
1475                 // reset the mark and deactivate object now since the execute() method might activate it again
1476                 (*it_timer)->m_marked= false;
1477                 (*it_timer)->m_active= false;
1478
1479                 // now execute the event:
1480                 (*it_timer)->execute();
1481             }
1482         }
1483
1484     } // eo try
1485     catch(...)
1486     {
1487         // clean up our counter
1488         --m_count_active_steps;
1489         // and forward the exception
1490         throw;
1491     }
1492
1493     if ( 0 == --m_count_active_steps)
1494     {
1495         internal_io::g_io_list().clean_list();
1496         internal_io::g_timer_list().clean_list();
1497     }
1498
1499     return had_active_object;
1500 } // eo Backend::doOneStep
1501
1502
1503 /**
1504  * enters a backend loop.
1505  *
1506  * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
1507  * active objects (io objects or timer objects).
1508  */
1509 void Backend::run()
1510 {
1511     ++m_count_active_loops;
1512     do
1513     {
1514         try
1515         {
1516             if (!doOneStep(c_max_poll_wait))
1517             {
1518                 // stop if there are no more active objects.
1519                 stop();
1520             }
1521         }
1522         catch(...)
1523         {
1524             // clean up our counter
1525             --m_count_active_loops;
1526             // and forward the exception
1527             throw;
1528         }
1529     }
1530     while (0 == m_count_stop_requests);
1531     --m_count_active_loops;
1532     --m_count_stop_requests;
1533 } // eo Backend::run
1534
1535
1536 /**
1537  * @brief stops the latest loop currently run by Backend::run().
1538  * @see Backend::run()
1539  */
1540 void Backend::stop()
1541 {
1542     if (m_count_active_loops)
1543     {
1544         ++m_count_stop_requests;
1545     }
1546 } // eo Backend::stop()
1547
1548
1549
1550 } // eo namespace AsyncIo