4 * @copyright Copyright © 2007-2008 by Intra2net AG
6 * @contact info@intra2net.com
11 #include "async_io.hpp"
12 #include <asyncio_config.hpp>
22 #include <sys/socket.h>
27 #include <boost/bind.hpp>
29 #ifdef HAVE_LIBI2NCOMMON
30 # include <signalfunc.hpp>
31 //# include <timefunc.hxx>
41 #define DOUT(msg) std::cout << msg << std::endl
42 #define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
43 //#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
44 #define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
46 #define DOUT(msg) do {} while (0)
47 #define FODOUT(obj,msg) do {} while (0)
48 #define ODOUT(msg) do {} while (0)
55 using namespace AsyncIo::Utils;
62 const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
66 * contains internal helper structs and functions for io handling.
72 * extends struct pollfd with some convenience functions
74 struct PollFd : public ::pollfd
84 * initializes the struct with a given file descriptor and clears the event mask(s).
95 * set that we want to be notified about new incoming data
97 void setPOLLIN() { events |= POLLIN; }
100 * set that we want to be notified if we can send (more) data.
102 void setPOLLOUT() { events |= POLLOUT; }
104 }; // eo struct PollFd
107 typedef std::vector<PollFd> PollVector;
108 typedef std::map<int,PollVector::size_type> FdPollMap;
109 typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap;
113 * struct for interfacing our local structures with poll()
115 struct PollDataCluster
117 PollVector m_poll_vector;
118 FdPollMap m_fd_poll_map;
119 FdIOMap m_read_fd_io_map;
120 FdIOMap m_write_fd_io_map;
122 void add_read_fd( int fd, AsyncIo::IOImplementation* io);
123 void add_write_fd( int fd, AsyncIo::IOImplementation* io);
125 pollfd* get_pollfd_ptr();
126 unsigned int get_num_pollfds() const;
128 }; // eo struct PollDataCluster
131 typedef PtrList< AsyncIo::IOImplementation, true > IOList;
132 typedef PtrList< AsyncIo::TimerBase, true > TimerList;
134 template<> int IOList::GlobalCountType::InstanceCount= 0;
135 template<> int TimerList::GlobalCountType::InstanceCount= 0;
139 * the (internal) global list of io objects (object pointers)
143 static IOList _the_io_list;
149 * the (internal) global list of timer objects (object pointers)
151 TimerList& g_timer_list()
153 static TimerList _the_timer_list;
154 return _the_timer_list;
158 * implementation of PollDataCluster
163 * add a new file descriptor to the read list.
165 * @param fd the file descriptor.
166 * @param io the io object which uses the fd for reading.
168 void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io)
170 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
171 if (itPollMap != m_fd_poll_map.end())
173 m_poll_vector[ itPollMap->second ].setPOLLIN();
179 m_fd_poll_map[fd] = m_poll_vector.size();
180 m_poll_vector.push_back( item );
182 m_read_fd_io_map[fd]= io;
183 } // eo PollDataCluster::add_read_fd
187 * add a new file descriptor to the write list.
189 * @param fd the file descriptor.
190 * @param io the io object which uses the fd for writing.
192 void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io)
194 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
195 if (itPollMap != m_fd_poll_map.end())
197 m_poll_vector[ itPollMap->second ].setPOLLOUT();
203 m_fd_poll_map[fd] = m_poll_vector.size();
204 m_poll_vector.push_back( item );
206 m_write_fd_io_map[fd]= io;
207 } // eo PollDataCluster::add_write_fd
211 * returns a pointer to a pollfd array; suitable for passing to poll()
213 * @return pointer to pollfd array
215 pollfd* PollDataCluster::get_pollfd_ptr()
217 return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
218 } // eo get_pollfd_ptr
222 * returns the number of entries in the pollfd array; suitable for passing to poll()
224 * @return the number of entries in the pollfd array
226 unsigned int PollDataCluster::get_num_pollfds() const
228 return m_poll_vector.size();
229 } // eo get_num_pollfds
233 } // eo namespace internal_io
237 * some internal tool functions and structures
241 AsyncIo::FilterBasePtr m_filter;
243 FilterMatch(AsyncIo::FilterBasePtr filter)
247 bool operator () (const AsyncIo::FilterBasePtr& item)
249 return item && item == m_filter;
252 }; // eo struct FilterMatch
256 } // eo anonymous namespace
266 * implementation of TimerBase
270 * constructor. Adds the object to the internal timer list.
272 TimerBase::TimerBase()
276 internal_io::g_timer_list().add_item(this);
277 } // eo TimerBase::TimerBase
281 * destructor. Removes the object from the internal timer list.
283 TimerBase::~TimerBase()
286 if (internal_io::TimerList::Instances())
288 ODOUT("remove from list");
289 internal_io::g_timer_list().remove_item(this);
291 } // eo TimerBase::~TimerBase
295 * @brief returns the point in time when the time is executed in real time.
296 * @return the point in time when the timer is to be executed.
298 MilliTime TimerBase::getRealWhenTime() const
302 get_current_monotonic_time(mono_time);
303 get_current_real_time(real_time);
304 MilliTime result= m_when - mono_time + real_time;
306 } // eo TimerBase::getRealWhenTime() const
310 * sets the time when the event should be executed.
311 * @param sec the seconds part of the point in time.
312 * @param msec the milliseconds part of the point in time.
314 void TimerBase::setWhenTime(long sec, long msec)
316 m_when.set(sec,msec);
318 } // eo TimerBase::setWhenTime
322 * sets the time when the event should be executed.
323 * @param mt the point in time.
325 void TimerBase::setWhenTime(const MilliTime& mt)
329 } // eo TimerBase::setWhenTime
333 * sets the time delta measured from current time when the event should be executed.
334 * @param sec the seconds of the time delta
335 * @param msec the milli seconds of the time delta
337 void TimerBase::setDeltaWhenTime(long sec, long msec)
339 setDeltaWhenTime( MilliTime(sec,msec) );
340 } // eo TimerBase::setWhenTime
345 * sets the time delta measured from current time when the event should be executed.
346 * @param mt the time delta
348 void TimerBase::setDeltaWhenTime(const MilliTime& mt)
350 get_current_monotonic_time(m_when);
353 } // eo TimerBase::setWhenTime
357 * set the active state of the timer event.
358 * @param active determines if the object should be active (default: yes).
360 void TimerBase::activate(bool active)
365 // clear the mark if we are not active.
368 } // eo TimerBase::activate
371 /** @fn void TimerBase::deactivate()
372 * deactivates the event by clearing the active state.
377 * called when the timer event occured.
379 void TimerBase::execute()
381 } // eo TimerBase::execute
385 * implementation of FilterBase class
389 FilterBase::FilterBase()
392 } // eo FilterBase::FilterBase()
396 * injects incoming data.
397 * @param data the new data
399 void FilterBase::injectIncomingData(const std::string& data)
403 FilterBasePtr ptr= get_ptr_as< FilterBase >();
406 m_io->injectIncomingData(ptr,data);
409 } // FilterBase::injectIncomingData(const std::string&)
413 * injects outgoing data.
414 * @param data the new data
416 void FilterBase::injectOutgoingData(const std::string& data)
420 FilterBasePtr ptr= get_ptr_as< FilterBase >();
423 m_io->injectOutgoingData(ptr,data);
426 } // eo FilterBase::injectOutgoingData(const std::string&)
431 * called when EOF detected on incoming channel (or incoming channel closed)
433 void FilterBase::endOfIncomingData()
435 } // eo FilterBase::endOfIncomingData()
438 * called when the filter should reset.
439 * This is used when a new channel is opened or when the filter is taken out of a filter chain.
441 void FilterBase::reset()
443 } // eo FilterBase::reset()
447 * implementation of IOImplementation class
452 * constructor for the base io class.
454 * Also adds the object to internal list of io objects (which is used by the backend).
456 * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
457 * @param write_fd the file descriptor which should be used for writing (default -1 for no value)
459 IOImplementation::IOImplementation(int read_fd, int write_fd)
463 , m_not_writable(false)
466 , m_marked_for_reading(false)
467 , m_marked_for_writing(false)
469 internal_io::g_io_list().add_item(this);
472 setReadFd( read_fd );
476 setWriteFd( write_fd );
478 } // eo IOImplementation::IOImplementation
482 * destructor of the base io class.
484 * Removes the object from the interal list of io objects.
486 IOImplementation::~IOImplementation()
489 if (internal_io::IOList::Instances())
491 internal_io::g_io_list().remove_item(this);
493 // now clear the filters:
494 while (! m_filter_chain.empty() )
496 FilterChain::iterator it = m_filter_chain.begin();
499 //TODO: signal the filter that it is removed ?!
500 m_filter_chain.erase(it);
502 } // eo IOImplementation::~IOImplementation
506 * adds another filter to the filter chain.
507 * @param filter pointer to the new filter.
509 void IOImplementation::addFilter
516 return; // nothing to do
520 filter->m_io->removeFilter(filter);
522 m_filter_chain.push_back( filter );
523 } // eo IOImplementation::addFilter
527 * removes a filter from the filter chain.
528 * @param filter the pointer to the filter which is removed.
529 * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
530 * deleting the filter if it was dynamically allocated.
532 void IOImplementation::removeFilter
537 FilterChain::iterator it =
538 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
539 if (it != m_filter_chain.end())
543 //TODO: signal the filter that it is removed ?!
544 m_filter_chain.erase(it);
546 } // eo IOImplementation::removeFilter
550 * closes the file descriptors (/ the connection).
552 * @param direction the direction which should be closed (default: @a Direction::both for all).
554 void IOImplementation::close(Direction direction)
556 bool had_read_fd= (m_read_fd >= 0);
558 if (direction == Direction::unspecified) direction= Direction::both;
559 if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
560 { // special case: half closed (socket) connections...
561 // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
562 // (read_fd or write_fd) this class works as desired.
567 int res= ::shutdown(m_read_fd, SHUT_RD);
575 for(FilterChain::iterator it= m_filter_chain.begin();
576 it != m_filter_chain.end();
579 (*it)->endOfIncomingData();
587 int res= ::shutdown(m_write_fd, SHUT_WR);
593 m_output_buffer.clear();
598 if (m_write_fd >= 0 && (direction & Direction::out) )
600 int res1 = ::close(m_write_fd);
601 if (m_write_fd == m_read_fd)
606 m_output_buffer.clear();
607 if (res1<0) m_errno= errno;
609 if (m_read_fd >=0 && (direction & Direction::in) )
611 int res1 = ::close(m_read_fd);
613 if (res1<0) m_errno= errno;
615 if (had_read_fd && !m_eof && (m_read_fd<0))
617 for(FilterChain::iterator it= m_filter_chain.begin();
618 it != m_filter_chain.end();
621 (*it)->endOfIncomingData();
624 } // eo IOImplementation::close
628 * determines if the io class wants to read data.
629 * Default implementation checks only for a valid file descriptor value.
631 * @return @a true if the objects wants to read data
633 bool IOImplementation::wantRead()
635 return (m_read_fd >= 0) && ! m_eof;
636 } // eo IOImplementation::wantRead
640 * determines if the io class wants to write data.
641 * Default implementation checks for a valid file descriptor value and if the object
642 * cannot write data immediately.
644 * @return @a true if the objects wants to write data
646 bool IOImplementation::wantWrite()
648 return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
649 } // eo IOImplementation::wantWrite
653 * delivers if opened.
654 * The default returns @a true if at least one file descriptor (read or write) is valid.
655 * @return @a true if opened.
657 bool IOImplementation::opened() const
659 return (m_read_fd>=0) || (m_write_fd>=0);
660 } // eo IOImplementation::opened() const
664 * returns if the read side detected an end of file (EOF).
665 * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
667 bool IOImplementation::eof() const
669 return (m_read_fd < 0) || m_eof;
670 } // eo IOImplementatio::eof() const
674 * @brief returns of the write side didn't detect that it cannot write.
675 * @return @a true if we can write.
677 bool IOImplementation::writable() const
679 return (m_write_fd >=0 ) and not m_not_writable;
680 } // eo IOImplementation::writable() const
684 * returns if the output buffer is empty.
687 bool IOImplementation::empty() const
689 return m_output_buffer.empty();
690 } // eo IOImplementation::empty
693 * puts data into the output buffer and sends it immediately if possible,
695 * The data is passed through the filter chain before it's stored in the output buffer
696 * (i.e. the output buffer contains data as it should be send directly to the descriptor).
697 * @param _data the data which should be send.
699 void IOImplementation::lowSend(const std::string& _data)
701 std::string data(_data);
703 for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
704 it_filter!= m_filter_chain.rend();
707 data= (*it_filter)->filterOutgoingData(data);
709 m_output_buffer+= data;
711 // if we can send immediately, do it:
712 if (! m_output_buffer.empty() && m_marked_for_writing)
716 } // eo IOImplementation::lowSend
720 * called by the backend when there is data to read for this object.
722 * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
723 * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
725 * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
727 * @note overload this method only when You know what You are doing!
728 * (overloading is necessary when handling server sockets.)
730 void IOImplementation::doRead()
732 // static read buffer; should be ok as long as we don't use threads
733 static char buffer[8*1024]; // 8 KiB
736 if (m_read_fd<0 || !m_marked_for_reading)
738 ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
743 m_marked_for_reading = false;
745 // now read the data:
747 count = ::read(m_read_fd, buffer, sizeof(buffer));
749 ODOUT("::read -> " << count);
751 // interpret what we got:
752 if (count < 0) // error
765 close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
770 else if (count==0) // EOF
772 // remember the read fd:
778 // if the fd is still the same: close it.
781 close( Direction::in );
784 else // we have valid data
786 std::string data(buffer,count);
787 ODOUT(" got \"" << data << "\"");
788 for(FilterChain::iterator it_filter= m_filter_chain.begin();
789 it_filter != m_filter_chain.end();
792 data= (*it_filter)->filterIncomingData(data);
794 m_input_buffer+= data;
797 } // eo IOImplementation::doRead
801 * interface for filter classes to inject data into the filter chain (emulating new incoming data).
802 * @param from_filter the filter which injects the new data.
803 * @param _data the new data.
805 void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
807 FilterChain::iterator it_filter =
808 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
809 if (it_filter == m_filter_chain.end())
811 // dont accept data inject from a unknown filter
814 // well: pass the data through the remaining filters:
815 // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
816 std::string data(_data);
818 it_filter != m_filter_chain.end();
821 data= (*it_filter)->filterIncomingData(data);
823 m_input_buffer+= data;
825 } // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
829 * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
830 * @param from_filter the filter which injects the new data.
831 * @param _data the new data.
833 void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
835 FilterChain::reverse_iterator it_filter =
836 std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
837 if (it_filter == m_filter_chain.rend())
839 // dont accept data inject from a unknown filter
842 // well: pass the data through the remaining filters:
843 // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
844 std::string data(_data);
846 it_filter!= m_filter_chain.rend();
849 data= (*it_filter)->filterOutgoingData(data);
851 m_output_buffer+= data;
853 // if we can send immediately, do it:
854 if (! m_output_buffer.empty() && m_marked_for_writing)
858 } // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
862 * set the read file descriptor.
863 * Although a derived class can also set the read fd directly; this method should be used
864 * for this task since it updates some flags on the fd for async operation.
865 * @param fd the new read file descriptor.
867 void IOImplementation::setReadFd(int fd)
869 // test if we already have a valid descriptor (and may have to close it):
874 // fd was already right; consider it to be ok.
877 close(Direction::in);
881 // if the new descriptor looks valid, set some flags:
884 long flags= ::fcntl(fd, F_GETFL);
887 // set the flags for non blocking, async operation
888 flags |= O_NONBLOCK|O_ASYNC;
889 ::fcntl(fd,F_SETFL, flags);
891 else if ( errno == EBADF )
893 // well, we seemed to be fed with an invalid descriptor...:
898 if (fd >= 0) // if still valid:
900 // set the close-on-exec flag
901 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
904 m_marked_for_reading= false;
906 } // eo IOImplementation::setReadFd(int)
911 * set the write file descriptor.
912 * Although a derived class can also set the write fd directly; this method should be used
913 * for this task since it updates some flags on the fd for async operation.
914 * @param fd the new write file descriptor.
916 void IOImplementation::setWriteFd(int fd)
920 if (m_write_fd == fd)
922 // fd was already right; consider it to be ok.
925 close(Direction::out);
929 // if the new descriptor looks valid, set some flags:
932 long flags= ::fcntl(fd, F_GETFL);
935 // set the flags for non blocking, async operation
936 flags |= O_NONBLOCK|O_ASYNC;
937 ::fcntl(fd,F_SETFL, flags);
939 else if (errno == EBADF)
941 // well, we seemed to be fed with an invalid descriptor...:
946 if (fd >= 0) // if still valid:
948 // set the close-on-exec flag
949 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
952 m_marked_for_writing= false;
953 m_not_writable= false;
954 } // eo IOImplementation::setWriteFd(int)
959 * called by the backend when this object can write data.
961 * If some data was sended, the signal @a m_signal_write is called.
963 * @internal tries to write all buffered data to output; if this succeeds,
964 * the connection is assumed to be still able to accept more data.
965 * (i.e. the internal write mark is kept!)
967 * @note overload this method only when You know what You are doing!
969 void IOImplementation::doWrite()
972 if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
977 ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
980 m_marked_for_writing= false;
982 // now write the data
983 ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
985 ODOUT("::write -> " << count);
987 if (count < 0) // error
995 m_not_writable= true;
997 m_signal_not_writable();
1003 if (fd == m_write_fd)
1005 close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
1012 m_output_buffer.erase(0, count);
1013 if (m_output_buffer.empty())
1015 // special case: if we were able to send all the data, we keep the write mark:
1016 m_marked_for_writing= true;
1023 } // eo IOImplementation::doWrite
1027 * implementation of SimpleIO
1031 SimpleIO::SimpleIO(int read_fd, int write_fd)
1032 : inherited(read_fd, write_fd)
1034 m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
1035 } // eo SimpleIO::SimpleIO()
1038 SimpleIO::~SimpleIO()
1040 } // eo SimpleIO::~SimpleIO()
1045 * @param data the string.
1047 void SimpleIO::sendString(const std::string& data)
1050 } // eo SimpleIO::sendString(const std::string&)
1054 * emits the signal signalReceived with the received data.
1055 * This slot is connected to IOImplementation::m_signal_read.
1057 void SimpleIO::slotReceived()
1060 data.swap(m_input_buffer);
1061 signal_received_string(data);
1062 } // eo SimpleIO::slotReceived()
1067 * implementation of SimpleIO2
1071 SimpleIO2::SimpleIO2(int read_fd, int write_fd)
1072 : inherited(read_fd, write_fd)
1074 m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
1075 } // eo SimpleIO2::SimpleIO2()
1078 SimpleIO2::~SimpleIO2()
1080 } // eo SimpleIO2::~SimpleIO2()
1085 * @param data the string.
1087 void SimpleIO2::sendString(const std::string& data)
1090 } // eo SimpleIO2::sendString(const std::string&)
1094 * emits the signal signalReceived with the received data.
1095 * This slot is connected to IOImplementation::m_signal_read.
1097 void SimpleIO2::slotReceived()
1100 data.swap(m_input_buffer);
1101 signal_received_string(data);
1102 } // eo SimpleIO2::slotReceived()
1107 * implementation of class Backend (singleton)
1110 Backend* Backend::g_backend= NULL;
1112 int Backend::m_count_active_steps=0;
1116 : m_count_active_loops(0)
1117 , m_count_stop_requests(0)
1119 #ifdef HAVE_LIBI2NCOMMON
1120 SystemTools::ignore_signal( SystemTools::Signal::PIPE );
1122 signal( SIGPIPE, SIG_IGN );
1124 } // eo Backend::Backend
1129 #ifdef HAVE_LIBI2NCOMMON
1130 SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
1132 signal( SIGPIPE, SIG_DFL );
1134 } // eo Backend::~Backend()
1137 * delivers pointer to the current backend, instantiating a new backend if there was no current one.
1139 * This should be the only way to access the backend which should be a singleton.
1141 * @return the pointer to the current backend.
1143 Backend* Backend::getBackend()
1147 g_backend = new Backend();
1150 } // eo Backend::getBackend
1156 * performs one backend cycle.
1158 * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
1159 * Also determines the timer events which become due and adjusts the timeout.
1160 * Constructs the necessary structures and calls poll().
1161 * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
1163 * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
1164 * @return @a true if there was at least one active object; otherwise @a false
1166 * @note this method is a little beast.
1169 * The cycle is divided into four steps: collecting; poll; mark and execute.
1170 * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
1171 * are calling @a Backup::doOneStep or open their own local backend loop.
1173 * @todo handle some more error cases.
1174 * @todo provide a plugin interface for external handler.
1175 * (currently inclusion of external handler is possible by (ab)using timer classes)
1177 bool Backend::doOneStep(int timeout)
1179 ODOUT( "timeout=" << timeout );
1180 internal_io::PollDataCluster poll_data;
1181 bool had_active_object = false;
1183 ++m_count_active_steps;
1187 FdSetType local_read_fds;
1188 FdSetType local_write_fds;
1192 { // step 1.1: collect fds for read/write operations
1193 for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
1194 itIOList != internal_io::g_io_list().end();
1197 if (! *itIOList) continue; // skip NULL entries
1198 int read_fd = (*itIOList)->m_read_fd;
1199 int write_fd = (*itIOList)->m_write_fd;
1200 bool want_read = (read_fd >= 0) and (*itIOList)->wantRead();
1201 bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
1204 local_read_fds.insert( read_fd );
1208 local_write_fds.insert( write_fd );
1210 if (!want_read && !want_write) continue;
1213 FODOUT( (*itIOList), "wants to read (fd=" << read_fd << ")");
1214 poll_data.add_read_fd(read_fd, *itIOList);
1218 FODOUT( (*itIOList), "wants to write (fd=" << write_fd << ")");
1219 poll_data.add_write_fd(write_fd, *itIOList);
1221 had_active_object= true;
1225 { // step 1.2: collect timer events
1226 MilliTime current_time;
1227 MilliTime min_event_time;
1229 get_current_monotonic_time(current_time);
1230 bool min_event_time_set;
1234 min_event_time = current_time + MilliTime(0,timeout);
1235 min_event_time_set= true;
1239 min_event_time = current_time + MilliTime(86400,0);
1240 min_event_time_set= false;
1244 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1245 it_timer != internal_io::g_timer_list().end()
1246 && (!had_active_object || !min_event_time_set || current_time < min_event_time);
1249 if (! *it_timer) continue; // skip NULL entries
1250 if (! (*it_timer)->m_active) continue; // skip if not enabled
1251 if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
1253 min_event_time = (*it_timer)->m_when;
1254 min_event_time_set= true;
1256 had_active_object= true;
1259 if (min_event_time_set)
1260 { // we have at a minimal event time, so (re)compute the timeout value:
1261 MilliTime delta= (min_event_time - current_time);
1262 long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
1269 timeout= delta_ms + (delta_ms<5 ? 1 : 3);
1275 ODOUT(" poll timeout is " << timeout);
1277 MilliTime current_time;
1278 get_current_monotonic_time(current_time);
1279 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1281 int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
1283 ODOUT("poll -> " << poll_result);
1285 MilliTime current_time;
1286 get_current_monotonic_time(current_time);
1287 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1290 if (poll_result < 0)
1292 //TODO poll error handling (signals ?!)
1297 // step 3.1: mark io objects (if necessary)
1298 if (poll_result > 0)
1300 for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
1301 itPollItem != poll_data.m_poll_vector.end();
1304 ODOUT(" fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
1305 if ( 0 == (itPollItem->revents))
1306 { // preliminary continuation if nothing is to handle for this item(/fd)...
1309 if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
1311 IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
1312 if (io && io->m_read_fd==itPollItem->fd)
1314 FODOUT(io,"marked for reading");
1315 io->m_marked_for_reading= true;
1318 if ( 0!= (itPollItem->revents & POLLOUT))
1320 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1321 if (io && io->m_write_fd==itPollItem->fd)
1323 io->m_marked_for_writing= true;
1326 if ( 0!= (itPollItem->revents & POLLERR))
1328 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1329 if (0!= (itPollItem->events & POLLOUT))
1331 if (io && io->m_write_fd==itPollItem->fd)
1333 io->m_marked_for_writing= false;
1334 //io->close( Direction::out );
1338 // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
1342 //Step 3.2: mark timer objects
1344 MilliTime current_time;
1346 get_current_monotonic_time(current_time);
1347 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1349 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1350 it_timer != internal_io::g_timer_list().end();
1353 ODOUT(" check timer " << *it_timer);
1354 if (! *it_timer) continue; // skip NULL entries
1355 if (! (*it_timer)->m_active) continue; // skip if not enabled
1356 if ( (*it_timer)->m_when <= current_time)
1359 (*it_timer)->m_marked = true;
1367 // step 4.1: execute io
1368 ODOUT("execute stage");
1369 for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
1370 it_io != internal_io::g_io_list().end();
1373 ODOUT(" check obj " << *it_io);
1374 if (NULL == *it_io) continue;
1375 if ((*it_io)->m_marked_for_writing)
1377 FODOUT((*it_io),"exec doWrite");
1378 (*it_io)->doWrite();
1379 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1380 if ((*it_io)->m_errno)
1385 if ((*it_io)->m_marked_for_reading)
1387 FODOUT((*it_io),"exec doRead");
1389 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1390 if ((*it_io)->m_errno)
1397 // step 4.2: execute timer events
1399 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1400 it_timer != internal_io::g_timer_list().end();
1403 if (! *it_timer) continue; // skip NULL entries
1404 if (! (*it_timer)->m_active) continue; // skip if not enabled
1405 if (! (*it_timer)->m_marked) continue; // skip if not marked
1407 // reset the mark and deactivate object now since the execute() method might activate it again
1408 (*it_timer)->m_marked= false;
1409 (*it_timer)->m_active= false;
1411 // now execute the event:
1412 (*it_timer)->execute();
1419 // clean up our counter
1420 --m_count_active_steps;
1421 // and forward the exception
1425 if ( 0 == --m_count_active_steps)
1427 internal_io::g_io_list().clean_list();
1428 internal_io::g_timer_list().clean_list();
1431 return had_active_object;
1432 } // eo Backend::doOneStep
1436 * enters a backend loop.
1438 * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
1439 * active objects (io objects or timer objects).
1443 ++m_count_active_loops;
1448 if (!doOneStep(c_max_poll_wait))
1450 // stop if there are no more active objects.
1456 // clean up our counter
1457 --m_count_active_loops;
1458 // and forward the exception
1462 while (0 == m_count_stop_requests);
1463 --m_count_active_loops;
1464 --m_count_stop_requests;
1465 } // eo Backend::run
1469 * @brief stops the latest loop currently run by Backend::run().
1470 * @see Backend::run()
1472 void Backend::stop()
1474 if (m_count_active_loops)
1476 ++m_count_stop_requests;
1478 } // eo Backend::stop()
1482 } // eo namespace AsyncIo