4 * @copyright Copyright © 2007-2008 by Intra2net AG
6 * @contact info@intra2net.com
11 #include "async_io.hpp"
21 #include <sys/socket.h>
26 #include <boost/bind.hpp>
28 #include <signalfunc.hpp>
29 #include <timefunc.hxx>
36 #define DOUT(msg) std::cout << msg << std::endl
37 #define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
38 //#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
39 #define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
41 #define DOUT(msg) do {} while (0)
42 #define FODOUT(obj,msg) do {} while (0)
43 #define ODOUT(msg) do {} while (0)
50 using namespace AsyncIo::Utils;
57 const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
61 * contains internal helper structs and functions for io handling.
67 * extends struct pollfd with some convenience functions
69 struct PollFd : public ::pollfd
79 * initializes the struct with a given file descriptor and clears the event mask(s).
90 * set that we want to be notified about new incoming data
92 void setPOLLIN() { events |= POLLIN; }
95 * set that we want to be notified if we can send (more) data.
97 void setPOLLOUT() { events |= POLLOUT; }
99 }; // eo struct PollFd
102 typedef std::vector<PollFd> PollVector;
103 typedef std::map<int,PollVector::size_type> FdPollMap;
104 typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap;
108 * struct for interfacing our local structures with poll()
110 struct PollDataCluster
112 PollVector m_poll_vector;
113 FdPollMap m_fd_poll_map;
114 FdIOMap m_read_fd_io_map;
115 FdIOMap m_write_fd_io_map;
117 void add_read_fd( int fd, AsyncIo::IOImplementation* io);
118 void add_write_fd( int fd, AsyncIo::IOImplementation* io);
120 pollfd* get_pollfd_ptr();
121 unsigned int get_num_pollfds() const;
123 }; // eo struct PollDataCluster
126 typedef PtrList< AsyncIo::IOImplementation, true > IOList;
127 typedef PtrList< AsyncIo::TimerBase, true > TimerList;
129 template<> int IOList::GlobalCountType::InstanceCount= 0;
130 template<> int TimerList::GlobalCountType::InstanceCount= 0;
134 * the (internal) global list of io objects (object pointers)
138 static IOList _the_io_list;
144 * the (internal) global list of timer objects (object pointers)
146 TimerList& g_timer_list()
148 static TimerList _the_timer_list;
149 return _the_timer_list;
153 * implementation of PollDataCluster
158 * add a new file descriptor to the read list.
160 * @param fd the file descriptor.
161 * @param io the io object which uses the fd for reading.
163 void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io)
165 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
166 if (itPollMap != m_fd_poll_map.end())
168 m_poll_vector[ itPollMap->second ].setPOLLIN();
174 m_fd_poll_map[fd] = m_poll_vector.size();
175 m_poll_vector.push_back( item );
177 m_read_fd_io_map[fd]= io;
178 } // eo PollDataCluster::add_read_fd
182 * add a new file descriptor to the write list.
184 * @param fd the file descriptor.
185 * @param io the io object which uses the fd for writing.
187 void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io)
189 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
190 if (itPollMap != m_fd_poll_map.end())
192 m_poll_vector[ itPollMap->second ].setPOLLOUT();
198 m_fd_poll_map[fd] = m_poll_vector.size();
199 m_poll_vector.push_back( item );
201 m_write_fd_io_map[fd]= io;
202 } // eo PollDataCluster::add_write_fd
206 * returns a pointer to a pollfd array; suitable for passing to poll()
208 * @return pointer to pollfd array
210 pollfd* PollDataCluster::get_pollfd_ptr()
212 return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
213 } // eo get_pollfd_ptr
217 * returns the number of entries in the pollfd array; suitable for passing to poll()
219 * @return the number of entries in the pollfd array
221 unsigned int PollDataCluster::get_num_pollfds() const
223 return m_poll_vector.size();
224 } // eo get_num_pollfds
228 } // eo namespace internal_io
232 * some internal tool functions and structures
236 AsyncIo::FilterBasePtr m_filter;
238 FilterMatch(AsyncIo::FilterBasePtr filter)
242 bool operator () (const AsyncIo::FilterBasePtr& item)
244 return item && item == m_filter;
247 }; // eo struct FilterMatch
251 } // eo anonymous namespace
261 * implementation of TimerBase
265 * constructor. Adds the object to the internal timer list.
267 TimerBase::TimerBase()
271 internal_io::g_timer_list().add_item(this);
272 } // eo TimerBase::TimerBase
276 * destructor. Removes the object from the internal timer list.
278 TimerBase::~TimerBase()
281 if (internal_io::TimerList::Instances())
283 ODOUT("remove from list");
284 internal_io::g_timer_list().remove_item(this);
286 } // eo TimerBase::~TimerBase
290 * @brief returns the point in time when the time is executed in real time.
291 * @return the point in time when the timer is to be executed.
293 MilliTime TimerBase::getRealWhenTime() const
297 get_current_monotonic_time(mono_time);
298 get_current_real_time(real_time);
299 MilliTime result= m_when - mono_time + real_time;
301 } // eo TimerBase::getRealWhenTime() const
305 * sets the time when the event should be executed.
306 * @param sec the seconds part of the point in time.
307 * @param msec the milliseconds part of the point in time.
309 void TimerBase::setWhenTime(long sec, long msec)
311 m_when.set(sec,msec);
313 } // eo TimerBase::setWhenTime
317 * sets the time when the event should be executed.
318 * @param mt the point in time.
320 void TimerBase::setWhenTime(const MilliTime& mt)
324 } // eo TimerBase::setWhenTime
328 * sets the time delta measured from current time when the event should be executed.
329 * @param sec the seconds of the time delta
330 * @param msec the milli seconds of the time delta
332 void TimerBase::setDeltaWhenTime(long sec, long msec)
334 setDeltaWhenTime( MilliTime(sec,msec) );
335 } // eo TimerBase::setWhenTime
340 * sets the time delta measured from current time when the event should be executed.
341 * @param mt the time delta
343 void TimerBase::setDeltaWhenTime(const MilliTime& mt)
345 get_current_monotonic_time(m_when);
348 } // eo TimerBase::setWhenTime
352 * set the active state of the timer event.
353 * @param active determines if the object should be active (default: yes).
355 void TimerBase::activate(bool active)
360 // clear the mark if we are not active.
363 } // eo TimerBase::activate
366 /** @fn void TimerBase::deactivate()
367 * deactivates the event by clearing the active state.
372 * called when the timer event occured.
374 void TimerBase::execute()
376 } // eo TimerBase::execute
380 * implementation of FilterBase class
384 FilterBase::FilterBase()
387 } // eo FilterBase::FilterBase()
391 * injects incoming data.
392 * @param data the new data
394 void FilterBase::injectIncomingData(const std::string& data)
398 FilterBasePtr ptr= get_ptr_as< FilterBase >();
401 m_io->injectIncomingData(ptr,data);
404 } // FilterBase::injectIncomingData(const std::string&)
408 * injects outgoing data.
409 * @param data the new data
411 void FilterBase::injectOutgoingData(const std::string& data)
415 FilterBasePtr ptr= get_ptr_as< FilterBase >();
418 m_io->injectOutgoingData(ptr,data);
421 } // eo FilterBase::injectOutgoingData(const std::string&)
426 * called when EOF detected on incoming channel (or incoming channel closed)
428 void FilterBase::endOfIncomingData()
430 } // eo FilterBase::endOfIncomingData()
433 * called when the filter should reset.
434 * This is used when a new channel is opened or when the filter is taken out of a filter chain.
436 void FilterBase::reset()
438 } // eo FilterBase::reset()
442 * implementation of IOImplementation class
447 * constructor for the base io class.
449 * Also adds the object to internal list of io objects (which is used by the backend).
451 * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
452 * @param write_fd the file descriptor which should be used for writing (default -1 for no value)
454 IOImplementation::IOImplementation(int read_fd, int write_fd)
458 , m_not_writable(false)
461 , m_marked_for_reading(false)
462 , m_marked_for_writing(false)
464 internal_io::g_io_list().add_item(this);
467 setReadFd( read_fd );
471 setWriteFd( write_fd );
473 } // eo IOImplementation::IOImplementation
477 * destructor of the base io class.
479 * Removes the object from the interal list of io objects.
481 IOImplementation::~IOImplementation()
484 if (internal_io::IOList::Instances())
486 internal_io::g_io_list().remove_item(this);
488 // now clear the filters:
489 while (! m_filter_chain.empty() )
491 FilterChain::iterator it = m_filter_chain.begin();
494 //TODO: signal the filter that it is removed ?!
495 m_filter_chain.erase(it);
497 } // eo IOImplementation::~IOImplementation
501 * adds another filter to the filter chain.
502 * @param filter pointer to the new filter.
504 void IOImplementation::addFilter
511 return; // nothing to do
515 filter->m_io->removeFilter(filter);
517 m_filter_chain.push_back( filter );
518 } // eo IOImplementation::addFilter
522 * removes a filter from the filter chain.
523 * @param filter the pointer to the filter which is removed.
524 * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
525 * deleting the filter if it was dynamically allocated.
527 void IOImplementation::removeFilter
532 FilterChain::iterator it =
533 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
534 if (it != m_filter_chain.end())
538 //TODO: signal the filter that it is removed ?!
539 m_filter_chain.erase(it);
541 } // eo IOImplementation::removeFilter
545 * closes the file descriptors (/ the connection).
547 * @param direction the direction which should be closed (default: @a Direction::both for all).
549 void IOImplementation::close(Direction direction)
551 bool had_read_fd= (m_read_fd >= 0);
553 if (direction == Direction::unspecified) direction= Direction::both;
554 if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
555 { // special case: half closed (socket) connections...
556 // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
557 // (read_fd or write_fd) this class works as desired.
562 int res= ::shutdown(m_read_fd, SHUT_RD);
570 for(FilterChain::iterator it= m_filter_chain.begin();
571 it != m_filter_chain.end();
574 (*it)->endOfIncomingData();
582 int res= ::shutdown(m_write_fd, SHUT_WR);
588 m_output_buffer.clear();
593 if (m_write_fd >= 0 && (direction & Direction::out) )
595 int res1 = ::close(m_write_fd);
596 if (m_write_fd == m_read_fd)
601 m_output_buffer.clear();
602 if (res1<0) m_errno= errno;
604 if (m_read_fd >=0 && (direction & Direction::in) )
606 int res1 = ::close(m_read_fd);
608 if (res1<0) m_errno= errno;
610 if (had_read_fd && !m_eof && (m_read_fd<0))
612 for(FilterChain::iterator it= m_filter_chain.begin();
613 it != m_filter_chain.end();
616 (*it)->endOfIncomingData();
619 } // eo IOImplementation::close
623 * determines if the io class wants to read data.
624 * Default implementation checks only for a valid file descriptor value.
626 * @return @a true if the objects wants to read data
628 bool IOImplementation::wantRead()
630 return (m_read_fd >= 0) && ! m_eof;
631 } // eo IOImplementation::wantRead
635 * determines if the io class wants to write data.
636 * Default implementation checks for a valid file descriptor value and if the object
637 * cannot write data immediately.
639 * @return @a true if the objects wants to write data
641 bool IOImplementation::wantWrite()
643 return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
644 } // eo IOImplementation::wantWrite
648 * delivers if opened.
649 * The default returns @a true if at least one file descriptor (read or write) is valid.
650 * @return @a true if opened.
652 bool IOImplementation::opened() const
654 return (m_read_fd>=0) || (m_write_fd>=0);
655 } // eo IOImplementation::opened() const
659 * returns if the read side detected an end of file (EOF).
660 * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
662 bool IOImplementation::eof() const
664 return (m_read_fd < 0) || m_eof;
665 } // eo IOImplementatio::eof() const
669 * @brief returns of the write side didn't detect that it cannot write.
670 * @return @a true if we can write.
672 bool IOImplementation::writable() const
674 return (m_write_fd >=0 ) and not m_not_writable;
675 } // eo IOImplementation::writable() const
679 * returns if the output buffer is empty.
682 bool IOImplementation::empty() const
684 return m_output_buffer.empty();
685 } // eo IOImplementation::empty
688 * puts data into the output buffer and sends it immediately if possible,
690 * The data is passed through the filter chain before it's stored in the output buffer
691 * (i.e. the output buffer contains data as it should be send directly to the descriptor).
692 * @param _data the data which should be send.
694 void IOImplementation::lowSend(const std::string& _data)
696 std::string data(_data);
698 for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
699 it_filter!= m_filter_chain.rend();
702 data= (*it_filter)->filterOutgoingData(data);
704 m_output_buffer+= data;
706 // if we can send immediately, do it:
707 if (! m_output_buffer.empty() && m_marked_for_writing)
711 } // eo IOImplementation::lowSend
715 * called by the backend when there is data to read for this object.
717 * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
718 * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
720 * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
722 * @note overload this method only when You know what You are doing!
723 * (overloading is necessary when handling server sockets.)
725 void IOImplementation::doRead()
727 // static read buffer; should be ok as long as we don't use threads
728 static char buffer[8*1024]; // 8 KiB
731 if (m_read_fd<0 || !m_marked_for_reading)
733 ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
738 m_marked_for_reading = false;
740 // now read the data:
742 count = ::read(m_read_fd, buffer, sizeof(buffer));
744 ODOUT("::read -> " << count);
746 // interpret what we got:
747 if (count < 0) // error
760 close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
765 else if (count==0) // EOF
767 // remember the read fd:
773 // if the fd is still the same: close it.
776 close( Direction::in );
779 else // we have valid data
781 std::string data(buffer,count);
782 ODOUT(" got \"" << data << "\"");
783 for(FilterChain::iterator it_filter= m_filter_chain.begin();
784 it_filter != m_filter_chain.end();
787 data= (*it_filter)->filterIncomingData(data);
789 m_input_buffer+= data;
792 } // eo IOImplementation::doRead
796 * interface for filter classes to inject data into the filter chain (emulating new incoming data).
797 * @param from_filter the filter which injects the new data.
798 * @param _data the new data.
800 void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
802 FilterChain::iterator it_filter =
803 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
804 if (it_filter == m_filter_chain.end())
806 // dont accept data inject from a unknown filter
809 // well: pass the data through the remaining filters:
810 // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
811 std::string data(_data);
813 it_filter != m_filter_chain.end();
816 data= (*it_filter)->filterIncomingData(data);
818 m_input_buffer+= data;
820 } // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
824 * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
825 * @param from_filter the filter which injects the new data.
826 * @param _data the new data.
828 void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
830 FilterChain::reverse_iterator it_filter =
831 std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
832 if (it_filter == m_filter_chain.rend())
834 // dont accept data inject from a unknown filter
837 // well: pass the data through the remaining filters:
838 // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
839 std::string data(_data);
841 it_filter!= m_filter_chain.rend();
844 data= (*it_filter)->filterOutgoingData(data);
846 m_output_buffer+= data;
848 // if we can send immediately, do it:
849 if (! m_output_buffer.empty() && m_marked_for_writing)
853 } // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
857 * set the read file descriptor.
858 * Although a derived class can also set the read fd directly; this method should be used
859 * for this task since it updates some flags on the fd for async operation.
860 * @param fd the new read file descriptor.
862 void IOImplementation::setReadFd(int fd)
864 // test if we already have a valid descriptor (and may have to close it):
869 // fd was already right; consider it to be ok.
872 close(Direction::in);
876 // if the new descriptor looks valid, set some flags:
879 long flags= ::fcntl(fd, F_GETFL);
882 // set the flags for non blocking, async operation
883 flags |= O_NONBLOCK|O_ASYNC;
884 ::fcntl(fd,F_SETFL, flags);
886 else if ( errno == EBADF )
888 // well, we seemed to be fed with an invalid descriptor...:
893 if (fd >= 0) // if still valid:
895 // set the close-on-exec flag
896 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
899 m_marked_for_reading= false;
901 } // eo IOImplementation::setReadFd(int)
906 * set the write file descriptor.
907 * Although a derived class can also set the write fd directly; this method should be used
908 * for this task since it updates some flags on the fd for async operation.
909 * @param fd the new write file descriptor.
911 void IOImplementation::setWriteFd(int fd)
915 if (m_write_fd == fd)
917 // fd was already right; consider it to be ok.
920 close(Direction::out);
924 // if the new descriptor looks valid, set some flags:
927 long flags= ::fcntl(fd, F_GETFL);
930 // set the flags for non blocking, async operation
931 flags |= O_NONBLOCK|O_ASYNC;
932 ::fcntl(fd,F_SETFL, flags);
934 else if (errno == EBADF)
936 // well, we seemed to be fed with an invalid descriptor...:
941 if (fd >= 0) // if still valid:
943 // set the close-on-exec flag
944 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
947 m_marked_for_writing= false;
948 m_not_writable= false;
949 } // eo IOImplementation::setWriteFd(int)
954 * called by the backend when this object can write data.
956 * If some data was sended, the signal @a m_signal_write is called.
958 * @internal tries to write all buffered data to output; if this succeeds,
959 * the connection is assumed to be still able to accept more data.
960 * (i.e. the internal write mark is kept!)
962 * @note overload this method only when You know what You are doing!
964 void IOImplementation::doWrite()
967 if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
972 ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
975 m_marked_for_writing= false;
977 // now write the data
978 ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
980 ODOUT("::write -> " << count);
982 if (count < 0) // error
990 m_not_writable= true;
992 m_signal_not_writable();
998 if (fd == m_write_fd)
1000 close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
1007 m_output_buffer.erase(0, count);
1008 if (m_output_buffer.empty())
1010 // special case: if we were able to send all the data, we keep the write mark:
1011 m_marked_for_writing= true;
1018 } // eo IOImplementation::doWrite
1022 * implementation of SimpleIO
1026 SimpleIO::SimpleIO(int read_fd, int write_fd)
1027 : inherited(read_fd, write_fd)
1029 m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
1030 } // eo SimpleIO::SimpleIO()
1033 SimpleIO::~SimpleIO()
1035 } // eo SimpleIO::~SimpleIO()
1040 * @param data the string.
1042 void SimpleIO::sendString(const std::string& data)
1045 } // eo SimpleIO::sendString(const std::string&)
1049 * emits the signal signalReceived with the received data.
1050 * This slot is connected to IOImplementation::m_signal_read.
1052 void SimpleIO::slotReceived()
1055 data.swap(m_input_buffer);
1056 signal_received_string(data);
1057 } // eo SimpleIO::slotReceived()
1062 * implementation of SimpleIO2
1066 SimpleIO2::SimpleIO2(int read_fd, int write_fd)
1067 : inherited(read_fd, write_fd)
1069 m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
1070 } // eo SimpleIO2::SimpleIO2()
1073 SimpleIO2::~SimpleIO2()
1075 } // eo SimpleIO2::~SimpleIO2()
1080 * @param data the string.
1082 void SimpleIO2::sendString(const std::string& data)
1085 } // eo SimpleIO2::sendString(const std::string&)
1089 * emits the signal signalReceived with the received data.
1090 * This slot is connected to IOImplementation::m_signal_read.
1092 void SimpleIO2::slotReceived()
1095 data.swap(m_input_buffer);
1096 signal_received_string(data);
1097 } // eo SimpleIO2::slotReceived()
1102 * implementation of class Backend (singleton)
1105 Backend* Backend::g_backend= NULL;
1107 int Backend::m_count_active_steps=0;
1111 : m_count_active_loops(0)
1112 , m_count_stop_requests(0)
1114 SystemTools::ignore_signal( SystemTools::Signal::PIPE );
1115 } // eo Backend::Backend
1120 SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
1121 } // eo Backend::~Backend()
1124 * delivers pointer to the current backend, instantiating a new backend if there was no current one.
1126 * This should be the only way to access the backend which should be a singleton.
1128 * @return the pointer to the current backend.
1130 Backend* Backend::getBackend()
1134 g_backend = new Backend();
1137 } // eo Backend::getBackend
1143 * performs one backend cycle.
1145 * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
1146 * Also determines the timer events which become due and adjusts the timeout.
1147 * Constructs the necessary structures and calls poll().
1148 * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
1150 * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
1151 * @return @a true if there was at least one active object; otherwise @a false
1153 * @note this method is a little beast.
1156 * The cycle is divided into four steps: collecting; poll; mark and execute.
1157 * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
1158 * are calling @a Backup::doOneStep or open their own local backend loop.
1160 * @todo handle some more error cases.
1161 * @todo provide a plugin interface for external handler.
1162 * (currently inclusion of external handler is possible by (ab)using timer classes)
1164 bool Backend::doOneStep(int timeout)
1166 ODOUT( "timeout=" << timeout );
1167 internal_io::PollDataCluster poll_data;
1168 bool had_active_object = false;
1170 ++m_count_active_steps;
1174 FdSetType local_read_fds;
1175 FdSetType local_write_fds;
1179 { // step 1.1: collect fds for read/write operations
1180 for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
1181 itIOList != internal_io::g_io_list().end();
1184 if (! *itIOList) continue; // skip NULL entries
1185 int read_fd = (*itIOList)->m_read_fd;
1186 int write_fd = (*itIOList)->m_write_fd;
1187 bool want_read = (read_fd >= 0) and (*itIOList)->wantRead();
1188 bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
1191 local_read_fds.insert( read_fd );
1195 local_write_fds.insert( write_fd );
1197 if (!want_read && !want_write) continue;
1200 FODOUT( (*itIOList), "wants to read (fd=" << read_fd << ")");
1201 poll_data.add_read_fd(read_fd, *itIOList);
1205 FODOUT( (*itIOList), "wants to write (fd=" << write_fd << ")");
1206 poll_data.add_write_fd(write_fd, *itIOList);
1208 had_active_object= true;
1212 { // step 1.2: collect timer events
1213 MilliTime current_time;
1214 MilliTime min_event_time;
1216 get_current_monotonic_time(current_time);
1217 bool min_event_time_set;
1221 min_event_time = current_time + MilliTime(0,timeout);
1222 min_event_time_set= true;
1226 min_event_time = current_time + MilliTime(86400,0);
1227 min_event_time_set= false;
1231 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1232 it_timer != internal_io::g_timer_list().end()
1233 && (!had_active_object || !min_event_time_set || current_time < min_event_time);
1236 if (! *it_timer) continue; // skip NULL entries
1237 if (! (*it_timer)->m_active) continue; // skip if not enabled
1238 if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
1240 min_event_time = (*it_timer)->m_when;
1241 min_event_time_set= true;
1243 had_active_object= true;
1246 if (min_event_time_set)
1247 { // we have at a minimal event time, so (re)compute the timeout value:
1248 MilliTime delta= (min_event_time - current_time);
1249 long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
1256 timeout= delta_ms + (delta_ms<5 ? 1 : 3);
1262 ODOUT(" poll timeout is " << timeout);
1264 MilliTime current_time;
1265 get_current_monotonic_time(current_time);
1266 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1268 int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
1270 ODOUT("poll -> " << poll_result);
1272 MilliTime current_time;
1273 get_current_monotonic_time(current_time);
1274 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1277 if (poll_result < 0)
1279 //TODO poll error handling (signals ?!)
1284 // step 3.1: mark io objects (if necessary)
1285 if (poll_result > 0)
1287 for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
1288 itPollItem != poll_data.m_poll_vector.end();
1291 ODOUT(" fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
1292 if ( 0 == (itPollItem->revents))
1293 { // preliminary continuation if nothing is to handle for this item(/fd)...
1296 if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
1298 IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
1299 if (io && io->m_read_fd==itPollItem->fd)
1301 FODOUT(io,"marked for reading");
1302 io->m_marked_for_reading= true;
1305 if ( 0!= (itPollItem->revents & POLLOUT))
1307 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1308 if (io && io->m_write_fd==itPollItem->fd)
1310 io->m_marked_for_writing= true;
1313 if ( 0!= (itPollItem->revents & POLLERR))
1315 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1316 if (0!= (itPollItem->events & POLLOUT))
1318 if (io && io->m_write_fd==itPollItem->fd)
1320 io->m_marked_for_writing= false;
1321 //io->close( Direction::out );
1325 // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
1329 //Step 3.2: mark timer objects
1331 MilliTime current_time;
1333 get_current_monotonic_time(current_time);
1334 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1336 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1337 it_timer != internal_io::g_timer_list().end();
1340 ODOUT(" check timer " << *it_timer);
1341 if (! *it_timer) continue; // skip NULL entries
1342 if (! (*it_timer)->m_active) continue; // skip if not enabled
1343 if ( (*it_timer)->m_when <= current_time)
1346 (*it_timer)->m_marked = true;
1354 // step 4.1: execute io
1355 ODOUT("execute stage");
1356 for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
1357 it_io != internal_io::g_io_list().end();
1360 ODOUT(" check obj " << *it_io);
1361 if (NULL == *it_io) continue;
1362 if ((*it_io)->m_marked_for_writing)
1364 FODOUT((*it_io),"exec doWrite");
1365 (*it_io)->doWrite();
1366 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1367 if ((*it_io)->m_errno)
1372 if ((*it_io)->m_marked_for_reading)
1374 FODOUT((*it_io),"exec doRead");
1376 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1377 if ((*it_io)->m_errno)
1384 // step 4.2: execute timer events
1386 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1387 it_timer != internal_io::g_timer_list().end();
1390 if (! *it_timer) continue; // skip NULL entries
1391 if (! (*it_timer)->m_active) continue; // skip if not enabled
1392 if (! (*it_timer)->m_marked) continue; // skip if not marked
1394 // reset the mark and deactivate object now since the execute() method might activate it again
1395 (*it_timer)->m_marked= false;
1396 (*it_timer)->m_active= false;
1398 // now execute the event:
1399 (*it_timer)->execute();
1406 // clean up our counter
1407 --m_count_active_steps;
1408 // and forward the exception
1412 if ( 0 == --m_count_active_steps)
1414 internal_io::g_io_list().clean_list();
1415 internal_io::g_timer_list().clean_list();
1418 return had_active_object;
1419 } // eo Backend::doOneStep
1423 * enters a backend loop.
1425 * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
1426 * active objects (io objects or timer objects).
1430 ++m_count_active_loops;
1435 if (!doOneStep(c_max_poll_wait))
1437 // stop if there are no more active objects.
1443 // clean up our counter
1444 --m_count_active_loops;
1445 // and forward the exception
1449 while (0 == m_count_stop_requests);
1450 --m_count_active_loops;
1451 --m_count_stop_requests;
1452 } // eo Backend::run
1456 * @brief stops the latest loop currently run by Backend::run().
1457 * @see Backend::run()
1459 void Backend::stop()
1461 if (m_count_active_loops)
1463 ++m_count_stop_requests;
1465 } // eo Backend::stop()
1469 } // eo namespace AsyncIo