unconditionally use namespace I2n; include signalfunc from our utils
[libasyncio] / asyncio / async_io.cpp
CommitLineData
8c15b8c7
TJ
1/*
2The software in this package is distributed under the GNU General
3Public License version 2 (with a special exception described below).
4
5A copy of GNU General Public License (GPL) is included in this distribution,
6in the file COPYING.GPL.
7
8As a special exception, if other files instantiate templates or use macros
9or inline functions from this file, or you compile this file and link it
10with other works to produce a work based on this file, this file
11does not by itself cause the resulting work to be covered
12by the GNU General Public License.
13
14However the source code for this file must still be made available
15in accordance with section (3) of the GNU General Public License.
16
17This exception does not invalidate any other reasons why a work based
18on this file might be covered by the GNU General Public License.
19*/
5c8a3d40 20/** @file
5c8a3d40 21 * @copyright Copyright © 2007-2008 by Intra2net AG
5c8a3d40
RP
22 */
23
24//#define NOISEDEBUG
25
42b7c46d 26#include "async_io.hpp"
c78245a3 27#include <asyncio_config.hpp>
5c8a3d40
RP
28
29#include <list>
30#include <vector>
31#include <map>
32#include <algorithm>
33#include <utility>
34
35#include <sys/poll.h>
36#include <sys/time.h>
37#include <sys/socket.h>
38#include <unistd.h>
39#include <errno.h>
40#include <fcntl.h>
41
42#include <boost/bind.hpp>
43
74786da6 44#include <asyncio_signalfunc.hpp>
5c8a3d40
RP
45
46#include <iostream>
47
48#ifdef NOISEDEBUG
49#include <iostream>
50#include <iomanip>
51#define DOUT(msg) std::cout << msg << std::endl
52#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
53//#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
54#define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
55#else
56#define DOUT(msg) do {} while (0)
57#define FODOUT(obj,msg) do {} while (0)
58#define ODOUT(msg) do {} while (0)
59#endif
60
aba4c34d 61
5c8a3d40
RP
62namespace
63{
64
aba4c34d 65using namespace AsyncIo::Utils;
5c8a3d40
RP
66
67/*
68 * configuration:
69 */
70
71
72const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
73
74
75/**
76 * contains internal helper structs and functions for io handling.
77 */
78namespace internal_io
79{
80
81/**
82 * extends struct pollfd with some convenience functions
83 */
84struct PollFd : public ::pollfd
85{
86 PollFd()
87 {
88 fd= 0;
89 events= revents= 0;
90 } // eo PollFd
91
92
93 /**
94 * initializes the struct with a given file descriptor and clears the event mask(s).
74786da6 95 * @param _fd
5c8a3d40
RP
96 */
97 PollFd(int _fd)
98 {
99 fd= _fd;
100 events= revents= 0;
101 } // eo PollFd
102
103
104 /**
105 * set that we want to be notified about new incoming data
106 */
107 void setPOLLIN() { events |= POLLIN; }
108
109 /**
110 * set that we want to be notified if we can send (more) data.
111 */
112 void setPOLLOUT() { events |= POLLOUT; }
113
114}; // eo struct PollFd
115
116
117typedef std::vector<PollFd> PollVector;
118typedef std::map<int,PollVector::size_type> FdPollMap;
42b7c46d 119typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap;
5c8a3d40
RP
120
121
122/**
123 * struct for interfacing our local structures with poll()
124 */
125struct PollDataCluster
126{
127 PollVector m_poll_vector;
128 FdPollMap m_fd_poll_map;
129 FdIOMap m_read_fd_io_map;
130 FdIOMap m_write_fd_io_map;
131
42b7c46d
RP
132 void add_read_fd( int fd, AsyncIo::IOImplementation* io);
133 void add_write_fd( int fd, AsyncIo::IOImplementation* io);
5c8a3d40
RP
134
135 pollfd* get_pollfd_ptr();
136 unsigned int get_num_pollfds() const;
137
138}; // eo struct PollDataCluster
139
140
aba4c34d
RP
141typedef PtrList< AsyncIo::IOImplementation, true > IOList;
142typedef PtrList< AsyncIo::TimerBase, true > TimerList;
5c8a3d40 143
aba4c34d
RP
144template<> int IOList::GlobalCountType::InstanceCount= 0;
145template<> int TimerList::GlobalCountType::InstanceCount= 0;
5c8a3d40
RP
146
147
148/**
149 * the (internal) global list of io objects (object pointers)
150 */
151IOList& g_io_list()
152{
153 static IOList _the_io_list;
154 return _the_io_list;
155};
156
157
158/**
159 * the (internal) global list of timer objects (object pointers)
160 */
161TimerList& g_timer_list()
162{
163 static TimerList _the_timer_list;
164 return _the_timer_list;
165}
166
167/*
168 * implementation of PollDataCluster
169 */
170
171
172/**
173 * add a new file descriptor to the read list.
174 *
175 * @param fd the file descriptor.
176 * @param io the io object which uses the fd for reading.
177 */
42b7c46d 178void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io)
5c8a3d40
RP
179{
180 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
181 if (itPollMap != m_fd_poll_map.end())
182 {
183 m_poll_vector[ itPollMap->second ].setPOLLIN();
184 }
185 else
186 {
187 PollFd item(fd);
188 item.setPOLLIN();
189 m_fd_poll_map[fd] = m_poll_vector.size();
190 m_poll_vector.push_back( item );
191 }
192 m_read_fd_io_map[fd]= io;
193} // eo PollDataCluster::add_read_fd
194
195
196/**
197 * add a new file descriptor to the write list.
198 *
199 * @param fd the file descriptor.
200 * @param io the io object which uses the fd for writing.
201 */
42b7c46d 202void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io)
5c8a3d40
RP
203{
204 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
205 if (itPollMap != m_fd_poll_map.end())
206 {
207 m_poll_vector[ itPollMap->second ].setPOLLOUT();
208 }
209 else
210 {
211 PollFd item(fd);
212 item.setPOLLOUT();
213 m_fd_poll_map[fd] = m_poll_vector.size();
214 m_poll_vector.push_back( item );
215 }
216 m_write_fd_io_map[fd]= io;
217} // eo PollDataCluster::add_write_fd
218
219
220/**
221 * returns a pointer to a pollfd array; suitable for passing to poll()
222 *
223 * @return pointer to pollfd array
224 */
225pollfd* PollDataCluster::get_pollfd_ptr()
226{
227 return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
228} // eo get_pollfd_ptr
229
230
231/**
232 * returns the number of entries in the pollfd array; suitable for passing to poll()
233 *
234 * @return the number of entries in the pollfd array
235 */
236unsigned int PollDataCluster::get_num_pollfds() const
237{
238 return m_poll_vector.size();
239} // eo get_num_pollfds
240
241
242
243} // eo namespace internal_io
244
245
246/*
247 * some internal tool functions and structures
248 */
249
250struct FilterMatch {
42b7c46d 251 AsyncIo::FilterBasePtr m_filter;
5c8a3d40 252
42b7c46d 253 FilterMatch(AsyncIo::FilterBasePtr filter)
5c8a3d40
RP
254 : m_filter(filter)
255 {}
256
42b7c46d 257 bool operator () (const AsyncIo::FilterBasePtr& item)
5c8a3d40
RP
258 {
259 return item && item == m_filter;
260 }
261
262}; // eo struct FilterMatch
263
264
5c8a3d40
RP
265
266} // eo anonymous namespace
267
268
269
270
42b7c46d 271namespace AsyncIo
5c8a3d40
RP
272{
273
274
5c8a3d40
RP
275/*
276 * implementation of TimerBase
277 */
278
279/**
280 * constructor. Adds the object to the internal timer list.
281 */
282TimerBase::TimerBase()
283: m_active(false)
284, m_marked(false)
285{
286 internal_io::g_timer_list().add_item(this);
287} // eo TimerBase::TimerBase
288
289
290/**
291 * destructor. Removes the object from the internal timer list.
292 */
293TimerBase::~TimerBase()
294{
295 ODOUT("enter");
aba4c34d 296 if (internal_io::TimerList::Instances())
5c8a3d40
RP
297 {
298 ODOUT("remove from list");
299 internal_io::g_timer_list().remove_item(this);
300 }
301} // eo TimerBase::~TimerBase
302
303
304/**
305 * @brief returns the point in time when the time is executed in real time.
306 * @return the point in time when the timer is to be executed.
307 */
308MilliTime TimerBase::getRealWhenTime() const
309{
310 MilliTime mono_time;
311 MilliTime real_time;
312 get_current_monotonic_time(mono_time);
313 get_current_real_time(real_time);
314 MilliTime result= m_when - mono_time + real_time;
315 return result;
316} // eo TimerBase::getRealWhenTime() const
317
318
319/**
320 * sets the time when the event should be executed.
321 * @param sec the seconds part of the point in time.
322 * @param msec the milliseconds part of the point in time.
323 */
324void TimerBase::setWhenTime(long sec, long msec)
325{
326 m_when.set(sec,msec);
327 m_marked= false;
328} // eo TimerBase::setWhenTime
329
330
331/**
332 * sets the time when the event should be executed.
333 * @param mt the point in time.
334 */
335void TimerBase::setWhenTime(const MilliTime& mt)
336{
337 m_when= mt;
338 m_marked= false;
339} // eo TimerBase::setWhenTime
340
341
342/**
343 * sets the time delta measured from current time when the event should be executed.
344 * @param sec the seconds of the time delta
345 * @param msec the milli seconds of the time delta
346 */
347void TimerBase::setDeltaWhenTime(long sec, long msec)
348{
349 setDeltaWhenTime( MilliTime(sec,msec) );
350} // eo TimerBase::setWhenTime
351
352
353
354/**
355 * sets the time delta measured from current time when the event should be executed.
356 * @param mt the time delta
357 */
358void TimerBase::setDeltaWhenTime(const MilliTime& mt)
359{
360 get_current_monotonic_time(m_when);
361 m_when+= mt;
362 m_marked= false;
363} // eo TimerBase::setWhenTime
364
365
366/**
367 * set the active state of the timer event.
368 * @param active determines if the object should be active (default: yes).
369 */
370void TimerBase::activate(bool active)
371{
372 m_active = active;
373 if (!active)
374 {
375 // clear the mark if we are not active.
376 m_marked= false;
377 }
378} // eo TimerBase::activate
379
380
381/** @fn void TimerBase::deactivate()
382 * deactivates the event by clearing the active state.
383 */
384
385
386/**
387 * called when the timer event occured.
388 */
389void TimerBase::execute()
390{
391} // eo TimerBase::execute
392
393
394/*
395 * implementation of FilterBase class
396 */
397
398
399FilterBase::FilterBase()
400: m_io(NULL)
401{
402} // eo FilterBase::FilterBase()
403
404
405/**
406 * injects incoming data.
407 * @param data the new data
408 */
409void FilterBase::injectIncomingData(const std::string& data)
410{
411 if (m_io)
412 {
413 FilterBasePtr ptr= get_ptr_as< FilterBase >();
414 if (ptr)
415 {
416 m_io->injectIncomingData(ptr,data);
417 }
418 }
419} // FilterBase::injectIncomingData(const std::string&)
420
421
422/**
423 * injects outgoing data.
424 * @param data the new data
425 */
426void FilterBase::injectOutgoingData(const std::string& data)
427{
428 if (m_io)
429 {
430 FilterBasePtr ptr= get_ptr_as< FilterBase >();
431 if (ptr)
432 {
433 m_io->injectOutgoingData(ptr,data);
434 }
435 }
436} // eo FilterBase::injectOutgoingData(const std::string&)
437
438
439
440/**
441 * called when EOF detected on incoming channel (or incoming channel closed)
442 */
443void FilterBase::endOfIncomingData()
444{
445} // eo FilterBase::endOfIncomingData()
446
447/**
448 * called when the filter should reset.
449 * This is used when a new channel is opened or when the filter is taken out of a filter chain.
450 */
451void FilterBase::reset()
452{
453} // eo FilterBase::reset()
454
455
456/*
457 * implementation of IOImplementation class
458 */
459
460
461/**
462 * constructor for the base io class.
463 *
464 * Also adds the object to internal list of io objects (which is used by the backend).
465 *
466 * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
467 * @param write_fd the file descriptor which should be used for writing (default -1 for no value)
468 */
469IOImplementation::IOImplementation(int read_fd, int write_fd)
470: m_read_fd(-1)
471, m_write_fd(-1)
472, m_eof(false)
473, m_not_writable(false)
474, m_input_buffer()
475, m_output_buffer()
476, m_marked_for_reading(false)
477, m_marked_for_writing(false)
478{
479 internal_io::g_io_list().add_item(this);
480 if (read_fd >= 0)
481 {
482 setReadFd( read_fd );
483 }
484 if (write_fd >= 0)
485 {
486 setWriteFd( write_fd );
487 }
488} // eo IOImplementation::IOImplementation
489
490
491/**
492 * destructor of the base io class.
493 *
494 * Removes the object from the interal list of io objects.
495 */
496IOImplementation::~IOImplementation()
497{
498 close();
aba4c34d 499 if (internal_io::IOList::Instances())
5c8a3d40
RP
500 {
501 internal_io::g_io_list().remove_item(this);
502 }
503 // now clear the filters:
504 while (! m_filter_chain.empty() )
505 {
506 FilterChain::iterator it = m_filter_chain.begin();
507 (*it)->reset();
508 (*it)->m_io= NULL;
509 //TODO: signal the filter that it is removed ?!
510 m_filter_chain.erase(it);
511 }
512} // eo IOImplementation::~IOImplementation
513
514
515/**
516 * adds another filter to the filter chain.
517 * @param filter pointer to the new filter.
518 */
519void IOImplementation::addFilter
520(
521 FilterBasePtr filter
522)
523{
524 if (!filter)
525 {
526 return; // nothing to do
527 }
528 if (filter->m_io)
529 {
530 filter->m_io->removeFilter(filter);
531 }
532 m_filter_chain.push_back( filter );
533} // eo IOImplementation::addFilter
534
535
536/**
537 * removes a filter from the filter chain.
538 * @param filter the pointer to the filter which is removed.
539 * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
540 * deleting the filter if it was dynamically allocated.
541 */
542void IOImplementation::removeFilter
543(
544 FilterBasePtr filter
545)
546{
547 FilterChain::iterator it =
548 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
549 if (it != m_filter_chain.end())
550 {
551 filter->reset();
552 filter->m_io= NULL;
553 //TODO: signal the filter that it is removed ?!
554 m_filter_chain.erase(it);
555 }
556} // eo IOImplementation::removeFilter
557
558
559/**
560 * closes the file descriptors (/ the connection).
561 *
562 * @param direction the direction which should be closed (default: @a Direction::both for all).
563 */
564void IOImplementation::close(Direction direction)
565{
566 bool had_read_fd= (m_read_fd >= 0);
567 m_errno= 0;
568 if (direction == Direction::unspecified) direction= Direction::both;
569 if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
570 { // special case: half closed (socket) connections...
571 // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
572 // (read_fd or write_fd) this class works as desired.
573 switch(direction)
574 {
575 case Direction::in:
576 {
577 int res= ::shutdown(m_read_fd, SHUT_RD);
74786da6 578 if (res<0)
5c8a3d40
RP
579 {
580 m_errno= errno;
581 }
582 m_read_fd= -1;
583 if (!m_eof)
584 {
585 for(FilterChain::iterator it= m_filter_chain.begin();
586 it != m_filter_chain.end();
587 ++it)
588 {
589 (*it)->endOfIncomingData();
590 }
591 }
592 }
593 return;
594
595 case Direction::out:
596 {
597 int res= ::shutdown(m_write_fd, SHUT_WR);
598 if (res<0)
599 {
600 m_errno= errno;
601 }
602 m_write_fd= -1;
603 m_output_buffer.clear();
604 }
605 return;
606 }
607 }
608 if (m_write_fd >= 0 && (direction & Direction::out) )
609 {
610 int res1 = ::close(m_write_fd);
611 if (m_write_fd == m_read_fd)
612 {
613 m_read_fd= -1;
614 }
615 m_write_fd= -1;
616 m_output_buffer.clear();
617 if (res1<0) m_errno= errno;
618 }
619 if (m_read_fd >=0 && (direction & Direction::in) )
620 {
621 int res1 = ::close(m_read_fd);
622 m_read_fd= -1;
623 if (res1<0) m_errno= errno;
624 }
625 if (had_read_fd && !m_eof && (m_read_fd<0))
626 {
627 for(FilterChain::iterator it= m_filter_chain.begin();
628 it != m_filter_chain.end();
629 ++it)
630 {
631 (*it)->endOfIncomingData();
632 }
633 }
634} // eo IOImplementation::close
635
636
637/**
638 * determines if the io class wants to read data.
639 * Default implementation checks only for a valid file descriptor value.
640 *
641 * @return @a true if the objects wants to read data
642 */
643bool IOImplementation::wantRead()
644{
645 return (m_read_fd >= 0) && ! m_eof;
646} // eo IOImplementation::wantRead
647
648
649/**
650 * determines if the io class wants to write data.
651 * Default implementation checks for a valid file descriptor value and if the object
652 * cannot write data immediately.
653 *
654 * @return @a true if the objects wants to write data
655 */
656bool IOImplementation::wantWrite()
657{
658 return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
659} // eo IOImplementation::wantWrite
660
661
662/**
663 * delivers if opened.
664 * The default returns @a true if at least one file descriptor (read or write) is valid.
665 * @return @a true if opened.
666 */
667bool IOImplementation::opened() const
668{
669 return (m_read_fd>=0) || (m_write_fd>=0);
670} // eo IOImplementation::opened() const
671
672
673/**
674 * returns if the read side detected an end of file (EOF).
675 * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
676 */
677bool IOImplementation::eof() const
678{
679 return (m_read_fd < 0) || m_eof;
680} // eo IOImplementatio::eof() const
681
682
683/**
684 * @brief returns of the write side didn't detect that it cannot write.
685 * @return @a true if we can write.
686 */
687bool IOImplementation::writable() const
688{
689 return (m_write_fd >=0 ) and not m_not_writable;
690} // eo IOImplementation::writable() const
691
692
693/**
694 * returns if the output buffer is empty.
74786da6 695 * @return
5c8a3d40
RP
696 */
697bool IOImplementation::empty() const
698{
699 return m_output_buffer.empty();
700} // eo IOImplementation::empty
701
702/**
703 * puts data into the output buffer and sends it immediately if possible,
704 *
705 * The data is passed through the filter chain before it's stored in the output buffer
706 * (i.e. the output buffer contains data as it should be send directly to the descriptor).
707 * @param _data the data which should be send.
708 */
709void IOImplementation::lowSend(const std::string& _data)
710{
711 std::string data(_data);
712
713 for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
714 it_filter!= m_filter_chain.rend();
715 ++it_filter)
716 {
717 data= (*it_filter)->filterOutgoingData(data);
718 }
719 m_output_buffer+= data;
720
721 // if we can send immediately, do it:
722 if (! m_output_buffer.empty() && m_marked_for_writing)
723 {
724 doWrite();
725 }
726} // eo IOImplementation::lowSend
727
728
729/**
730 * called by the backend when there is data to read for this object.
731 *
732 * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
733 * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
734 *
735 * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
736 *
737 * @note overload this method only when You know what You are doing!
738 * (overloading is necessary when handling server sockets.)
739 */
740void IOImplementation::doRead()
741{
742 // static read buffer; should be ok as long as we don't use threads
743 static char buffer[8*1024]; // 8 KiB
744
745 m_errno = 0;
746 if (m_read_fd<0 || !m_marked_for_reading)
747 {
748 ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
749 return;
750 }
751
752 // reset the mark:
753 m_marked_for_reading = false;
754
755 // now read the data:
756 ssize_t count;
757 count = ::read(m_read_fd, buffer, sizeof(buffer));
758
759 ODOUT("::read -> " << count);
760
761 // interpret what we got:
762 if (count < 0) // error
763 {
764 m_errno = errno;
765 int fd= m_read_fd;
766
767 switch(m_errno)
768 {
769 case EINVAL:
770 case EBADF:
771 case ECONNRESET:
772 case ENETRESET:
773 if (fd == m_read_fd)
774 {
775 close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
776 }
777 break;
778 }
779 }
780 else if (count==0) // EOF
781 {
782 // remember the read fd:
783 int fd = m_read_fd;
784 // remember the EOF:
785 m_eof= true;
786 // signal EOF
787 m_signal_eof();
788 // if the fd is still the same: close it.
789 if (fd == m_read_fd)
790 {
791 close( Direction::in );
792 }
793 }
794 else // we have valid data
795 {
796 std::string data(buffer,count);
797 ODOUT(" got \"" << data << "\"");
798 for(FilterChain::iterator it_filter= m_filter_chain.begin();
799 it_filter != m_filter_chain.end();
800 ++it_filter)
801 {
802 data= (*it_filter)->filterIncomingData(data);
803 }
804 m_input_buffer+= data;
805 m_signal_read();
806 }
807} // eo IOImplementation::doRead
808
809
810/**
811 * interface for filter classes to inject data into the filter chain (emulating new incoming data).
812 * @param from_filter the filter which injects the new data.
813 * @param _data the new data.
814 */
815void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
816{
817 FilterChain::iterator it_filter =
818 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
819 if (it_filter == m_filter_chain.end())
820 {
821 // dont accept data inject from a unknown filter
822 return;
823 }
824 // well: pass the data through the remaining filters:
825 // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
826 std::string data(_data);
827 for(++it_filter;
828 it_filter != m_filter_chain.end();
829 ++it_filter)
830 {
831 data= (*it_filter)->filterIncomingData(data);
832 }
833 m_input_buffer+= data;
834 m_signal_read();
835} // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
836
837
838/**
839 * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
840 * @param from_filter the filter which injects the new data.
841 * @param _data the new data.
842 */
843void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
844{
845 FilterChain::reverse_iterator it_filter =
846 std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
847 if (it_filter == m_filter_chain.rend())
848 {
849 // dont accept data inject from a unknown filter
850 return;
851 }
852 // well: pass the data through the remaining filters:
853 // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
854 std::string data(_data);
855 for(++it_filter;
856 it_filter!= m_filter_chain.rend();
857 ++it_filter)
858 {
859 data= (*it_filter)->filterOutgoingData(data);
860 }
861 m_output_buffer+= data;
862
863 // if we can send immediately, do it:
864 if (! m_output_buffer.empty() && m_marked_for_writing)
865 {
866 doWrite();
867 }
868} // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
869
870
871/**
872 * set the read file descriptor.
873 * Although a derived class can also set the read fd directly; this method should be used
874 * for this task since it updates some flags on the fd for async operation.
875 * @param fd the new read file descriptor.
876 */
877void IOImplementation::setReadFd(int fd)
878{
879 // test if we already have a valid descriptor (and may have to close it):
880 if (m_read_fd >=0 )
881 {
882 if (m_read_fd == fd)
883 {
884 // fd was already right; consider it to be ok.
885 return;
886 }
887 close(Direction::in);
888 }
889 // reset our errno:
890 m_errno= 0;
891 // if the new descriptor looks valid, set some flags:
892 if (fd >= 0)
893 {
894 long flags= ::fcntl(fd, F_GETFL);
895 if (flags != -1)
896 {
897 // set the flags for non blocking, async operation
898 flags |= O_NONBLOCK|O_ASYNC;
899 ::fcntl(fd,F_SETFL, flags);
900 }
901 else if ( errno == EBADF )
902 {
903 // well, we seemed to be fed with an invalid descriptor...:
904 m_errno = errno;
905 fd= -1;
906 }
907 }
908 if (fd >= 0) // if still valid:
909 {
910 // set the close-on-exec flag
911 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
912 }
913 m_read_fd= fd;
914 m_marked_for_reading= false;
915 m_eof= false;
916} // eo IOImplementation::setReadFd(int)
917
918
919
920/**
921 * set the write file descriptor.
922 * Although a derived class can also set the write fd directly; this method should be used
923 * for this task since it updates some flags on the fd for async operation.
924 * @param fd the new write file descriptor.
925 */
926void IOImplementation::setWriteFd(int fd)
927{
928 if (m_write_fd >=0 )
929 {
930 if (m_write_fd == fd)
931 {
932 // fd was already right; consider it to be ok.
933 return;
934 }
935 close(Direction::out);
936 }
937 // reset our errno:
938 m_errno= 0;
939 // if the new descriptor looks valid, set some flags:
940 if (fd >= 0)
941 {
942 long flags= ::fcntl(fd, F_GETFL);
943 if (flags != -1)
944 {
945 // set the flags for non blocking, async operation
946 flags |= O_NONBLOCK|O_ASYNC;
947 ::fcntl(fd,F_SETFL, flags);
948 }
949 else if (errno == EBADF)
950 {
951 // well, we seemed to be fed with an invalid descriptor...:
952 m_errno = errno;
953 fd= -1;
954 }
955 }
956 if (fd >= 0) // if still valid:
957 {
958 // set the close-on-exec flag
959 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
960 }
961 m_write_fd = fd;
962 m_marked_for_writing= false;
963 m_not_writable= false;
964} // eo IOImplementation::setWriteFd(int)
965
966
967
968/**
969 * called by the backend when this object can write data.
970 *
971 * If some data was sended, the signal @a m_signal_write is called.
972 *
973 * @internal tries to write all buffered data to output; if this succeeds,
974 * the connection is assumed to be still able to accept more data.
975 * (i.e. the internal write mark is kept!)
976 *
977 * @note overload this method only when You know what You are doing!
978*/
979void IOImplementation::doWrite()
980{
981 m_errno = 0;
982 if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
983 {
984 return;
985 }
986
987 ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
988
989 //reset mark:
990 m_marked_for_writing= false;
991
992 // now write the data
993 ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
994
995 ODOUT("::write -> " << count);
996
997 if (count < 0) // error
998 {
999 m_errno= errno;
1000 int fd= m_write_fd;
1001
1002 switch(m_errno)
1003 {
1004 case EPIPE:
1005 m_not_writable= true;
1006 // emit a signal
1007 m_signal_not_writable();
1008 // fall through
1009 case EINVAL:
1010 case EBADF:
1011 case ECONNRESET:
1012 case ENETRESET:
1013 if (fd == m_write_fd)
1014 {
1015 close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
1016 }
1017 break;
1018 }
1019 }
1020 else
1021 {
1022 m_output_buffer.erase(0, count);
1023 if (m_output_buffer.empty())
1024 {
1025 // special case: if we were able to send all the data, we keep the write mark:
1026 m_marked_for_writing= true;
1027 }
1028 }
1029 if (count > 0)
1030 {
1031 m_signal_write();
1032 }
1033} // eo IOImplementation::doWrite
1034
1035
1036/*
1037 * implementation of SimpleIO
1038 */
1039
1040
1041SimpleIO::SimpleIO(int read_fd, int write_fd)
1042: inherited(read_fd, write_fd)
1043{
1044 m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
1045} // eo SimpleIO::SimpleIO()
1046
1047
1048SimpleIO::~SimpleIO()
1049{
1050} // eo SimpleIO::~SimpleIO()
1051
1052
1053/**
1054 * sends a string.
1055 * @param data the string.
1056 */
1057void SimpleIO::sendString(const std::string& data)
1058{
1059 lowSend(data);
1060} // eo SimpleIO::sendString(const std::string&)
1061
1062
1063/**
1064 * emits the signal signalReceived with the received data.
1065 * This slot is connected to IOImplementation::m_signal_read.
1066 */
1067void SimpleIO::slotReceived()
1068{
1069 std::string data;
1070 data.swap(m_input_buffer);
1071 signal_received_string(data);
1072} // eo SimpleIO::slotReceived()
1073
1074
1075
1076/*
1077 * implementation of SimpleIO2
1078 */
1079
1080
1081SimpleIO2::SimpleIO2(int read_fd, int write_fd)
1082: inherited(read_fd, write_fd)
1083{
1084 m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
1085} // eo SimpleIO2::SimpleIO2()
1086
1087
1088SimpleIO2::~SimpleIO2()
1089{
1090} // eo SimpleIO2::~SimpleIO2()
1091
1092
1093/**
1094 * sends a string.
1095 * @param data the string.
1096 */
1097void SimpleIO2::sendString(const std::string& data)
1098{
1099 lowSend(data);
1100} // eo SimpleIO2::sendString(const std::string&)
1101
1102
1103/**
1104 * emits the signal signalReceived with the received data.
1105 * This slot is connected to IOImplementation::m_signal_read.
1106 */
1107void SimpleIO2::slotReceived()
1108{
1109 std::string data;
1110 data.swap(m_input_buffer);
1111 signal_received_string(data);
1112} // eo SimpleIO2::slotReceived()
1113
1114
1115
1116/*
1117 * implementation of class Backend (singleton)
1118 */
1119
1120Backend* Backend::g_backend= NULL;
1121
1122int Backend::m_count_active_steps=0;
1123
1124
1125Backend::Backend()
1126: m_count_active_loops(0)
1127, m_count_stop_requests(0)
1128{
74786da6 1129 SystemTools::ignore_signal( SystemTools::Signal::PIPE );
5c8a3d40
RP
1130} // eo Backend::Backend
1131
1132
1133Backend::~Backend()
1134{
1135 SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
1136} // eo Backend::~Backend()
1137
1138/**
1139 * delivers pointer to the current backend, instantiating a new backend if there was no current one.
1140 *
1141 * This should be the only way to access the backend which should be a singleton.
1142 *
1143 * @return the pointer to the current backend.
1144 */
1145Backend* Backend::getBackend()
1146{
1147 if (!g_backend)
1148 {
1149 g_backend = new Backend();
1150 }
1151 return g_backend;
1152} // eo Backend::getBackend
1153
1154
1155
1156
1157/**
1158 * performs one backend cycle.
1159 *
1160 * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
1161 * Also determines the timer events which become due and adjusts the timeout.
1162 * Constructs the necessary structures and calls poll().
1163 * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
1164 *
1165 * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
1166 * @return @a true if there was at least one active object; otherwise @a false
1167 *
1168 * @note this method is a little beast.
1169 *
74786da6 1170 * @internal
5c8a3d40
RP
1171 * The cycle is divided into four steps: collecting; poll; mark and execute.
1172 * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
1173 * are calling @a Backup::doOneStep or open their own local backend loop.
1174 *
1175 * @todo handle some more error cases.
1176 * @todo provide a plugin interface for external handler.
1177 * (currently inclusion of external handler is possible by (ab)using timer classes)
1178 */
1179bool Backend::doOneStep(int timeout)
1180{
1181 ODOUT( "timeout=" << timeout );
1182 internal_io::PollDataCluster poll_data;
1183 bool had_active_object = false;
1184
1185 ++m_count_active_steps;
1186
1187 try {
1188
1189 FdSetType local_read_fds;
1190 FdSetType local_write_fds;
1191
1192 // step 1 ; collect
1193
1194 { // step 1.1: collect fds for read/write operations
1195 for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
1196 itIOList != internal_io::g_io_list().end();
1197 ++itIOList)
1198 {
1199 if (! *itIOList) continue; // skip NULL entries
1200 int read_fd = (*itIOList)->m_read_fd;
1201 int write_fd = (*itIOList)->m_write_fd;
1202 bool want_read = (read_fd >= 0) and (*itIOList)->wantRead();
1203 bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
1204 if (read_fd >= 0 )
1205 {
1206 local_read_fds.insert( read_fd );
1207 }
1208 if (write_fd >= 0)
1209 {
1210 local_write_fds.insert( write_fd );
1211 }
1212 if (!want_read && !want_write) continue;
1213 if (want_read)
1214 {
1215 FODOUT( (*itIOList), "wants to read (fd=" << read_fd << ")");
1216 poll_data.add_read_fd(read_fd, *itIOList);
1217 }
1218 if (want_write)
1219 {
1220 FODOUT( (*itIOList), "wants to write (fd=" << write_fd << ")");
1221 poll_data.add_write_fd(write_fd, *itIOList);
1222 }
1223 had_active_object= true;
1224 }
1225 }
1226
1227 { // step 1.2: collect timer events
1228 MilliTime current_time;
1229 MilliTime min_event_time;
1230
1231 get_current_monotonic_time(current_time);
1232 bool min_event_time_set;
1233
1234 if (timeout >= 0)
1235 {
1236 min_event_time = current_time + MilliTime(0,timeout);
1237 min_event_time_set= true;
1238 }
1239 else
1240 {
1241 min_event_time = current_time + MilliTime(86400,0);
1242 min_event_time_set= false;
1243 }
1244 // TODO
1245
1246 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
74786da6 1247 it_timer != internal_io::g_timer_list().end()
5c8a3d40
RP
1248 && (!had_active_object || !min_event_time_set || current_time < min_event_time);
1249 ++ it_timer)
1250 {
1251 if (! *it_timer) continue; // skip NULL entries
1252 if (! (*it_timer)->m_active) continue; // skip if not enabled
1253 if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
1254 {
1255 min_event_time = (*it_timer)->m_when;
1256 min_event_time_set= true;
1257 }
1258 had_active_object= true;
1259 }
1260
1261 if (min_event_time_set)
1262 { // we have at a minimal event time, so (re)compute the timeout value:
1263 MilliTime delta= (min_event_time - current_time);
1264 long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
1265 if (delta_ms <= 0L)
1266 {
1267 timeout= 0L;
1268 }
1269 else
1270 {
1271 timeout= delta_ms + (delta_ms<5 ? 1 : 3);
1272 }
1273 }
1274 }
1275
1276 // step 2 : poll
1277 ODOUT(" poll timeout is " << timeout);
1278 {
1279 MilliTime current_time;
1280 get_current_monotonic_time(current_time);
1281 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1282 }
1283 int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
1284
1285 ODOUT("poll -> " << poll_result);
1286 {
1287 MilliTime current_time;
1288 get_current_monotonic_time(current_time);
1289 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1290 }
1291
1292 if (poll_result < 0)
1293 {
1294 //TODO poll error handling (signals ?!)
1295 }
1296
1297 // step 3 : mark
1298
1299 // step 3.1: mark io objects (if necessary)
1300 if (poll_result > 0)
1301 {
1302 for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
1303 itPollItem != poll_data.m_poll_vector.end();
1304 ++itPollItem)
1305 {
1306 ODOUT(" fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
1307 if ( 0 == (itPollItem->revents))
1308 { // preliminary continuation if nothing is to handle for this item(/fd)...
1309 continue;
1310 }
1311 if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
1312 {
1313 IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
1314 if (io && io->m_read_fd==itPollItem->fd)
1315 {
1316 FODOUT(io,"marked for reading");
1317 io->m_marked_for_reading= true;
1318 }
1319 }
1320 if ( 0!= (itPollItem->revents & POLLOUT))
1321 {
1322 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1323 if (io && io->m_write_fd==itPollItem->fd)
1324 {
1325 io->m_marked_for_writing= true;
1326 }
1327 }
1328 if ( 0!= (itPollItem->revents & POLLERR))
1329 {
1330 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1331 if (0!= (itPollItem->events & POLLOUT))
1332 {
1333 if (io && io->m_write_fd==itPollItem->fd)
1334 {
1335 io->m_marked_for_writing= false;
1336 //io->close( Direction::out );
1337 }
1338 }
1339 }
1340 // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
1341 }
1342 }
1343
1344 //Step 3.2: mark timer objects
1345 {
1346 MilliTime current_time;
1347
1348 get_current_monotonic_time(current_time);
1349 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1350
1351 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1352 it_timer != internal_io::g_timer_list().end();
1353 ++ it_timer)
1354 {
1355 ODOUT(" check timer " << *it_timer);
1356 if (! *it_timer) continue; // skip NULL entries
1357 if (! (*it_timer)->m_active) continue; // skip if not enabled
1358 if ( (*it_timer)->m_when <= current_time)
1359 {
1360 ODOUT(" ==> MARK");
1361 (*it_timer)->m_marked = true;
1362 }
1363 }
1364 }
1365
1366
1367 // step 4 : execute
1368
1369 // step 4.1: execute io
1370 ODOUT("execute stage");
1371 for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
1372 it_io != internal_io::g_io_list().end();
1373 ++ it_io)
1374 {
1375 ODOUT(" check obj " << *it_io);
1376 if (NULL == *it_io) continue;
1377 if ((*it_io)->m_marked_for_writing)
1378 {
1379 FODOUT((*it_io),"exec doWrite");
1380 (*it_io)->doWrite();
1381 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1382 if ((*it_io)->m_errno)
1383 {
1384 continue;
1385 }
1386 }
1387 if ((*it_io)->m_marked_for_reading)
1388 {
1389 FODOUT((*it_io),"exec doRead");
1390 (*it_io)->doRead();
1391 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1392 if ((*it_io)->m_errno)
1393 {
1394 continue;
1395 }
1396 }
1397 }
1398
1399 // step 4.2: execute timer events
1400 {
1401 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1402 it_timer != internal_io::g_timer_list().end();
1403 ++ it_timer)
1404 {
1405 if (! *it_timer) continue; // skip NULL entries
1406 if (! (*it_timer)->m_active) continue; // skip if not enabled
1407 if (! (*it_timer)->m_marked) continue; // skip if not marked
1408
1409 // reset the mark and deactivate object now since the execute() method might activate it again
1410 (*it_timer)->m_marked= false;
1411 (*it_timer)->m_active= false;
1412
1413 // now execute the event:
1414 (*it_timer)->execute();
1415 }
1416 }
1417
1418 } // eo try
1419 catch(...)
1420 {
1421 // clean up our counter
1422 --m_count_active_steps;
1423 // and forward the exception
1424 throw;
1425 }
1426
1427 if ( 0 == --m_count_active_steps)
1428 {
1429 internal_io::g_io_list().clean_list();
1430 internal_io::g_timer_list().clean_list();
1431 }
1432
1433 return had_active_object;
1434} // eo Backend::doOneStep
1435
1436
1437/**
1438 * enters a backend loop.
1439 *
1440 * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
1441 * active objects (io objects or timer objects).
1442 */
1443void Backend::run()
1444{
1445 ++m_count_active_loops;
1446 do
1447 {
1448 try
1449 {
1450 if (!doOneStep(c_max_poll_wait))
1451 {
1452 // stop if there are no more active objects.
1453 stop();
1454 }
1455 }
1456 catch(...)
1457 {
1458 // clean up our counter
1459 --m_count_active_loops;
1460 // and forward the exception
1461 throw;
1462 }
74786da6 1463 }
5c8a3d40
RP
1464 while (0 == m_count_stop_requests);
1465 --m_count_active_loops;
1466 --m_count_stop_requests;
1467} // eo Backend::run
1468
1469
1470/**
1471 * @brief stops the latest loop currently run by Backend::run().
1472 * @see Backend::run()
1473 */
1474void Backend::stop()
1475{
1476 if (m_count_active_loops)
1477 {
1478 ++m_count_stop_requests;
1479 }
1480} // eo Backend::stop()
1481
1482
1483
42b7c46d 1484} // eo namespace AsyncIo