2 The software in this package is distributed under the GNU General
3 Public License version 2 (with a special exception described below).
5 A copy of GNU General Public License (GPL) is included in this distribution,
6 in the file COPYING.GPL.
8 As a special exception, if other files instantiate templates or use macros
9 or inline functions from this file, or you compile this file and link it
10 with other works to produce a work based on this file, this file
11 does not by itself cause the resulting work to be covered
12 by the GNU General Public License.
14 However the source code for this file must still be made available
15 in accordance with section (3) of the GNU General Public License.
17 This exception does not invalidate any other reasons why a work based
18 on this file might be covered by the GNU General Public License.
21 * @copyright Copyright © 2007-2008 by Intra2net AG
26 #include "async_io.hpp"
27 #include <asyncio_config.hpp>
37 #include <sys/socket.h>
42 #include <boost/bind.hpp>
44 #include <asyncio_signalfunc.hpp>
51 #define DOUT(msg) std::cout << msg << std::endl
52 #define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
53 //#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
54 #define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
56 #define DOUT(msg) do {} while (0)
57 #define FODOUT(obj,msg) do {} while (0)
58 #define ODOUT(msg) do {} while (0)
65 using namespace AsyncIo::Utils;
72 const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
76 * contains internal helper structs and functions for io handling.
82 * extends struct pollfd with some convenience functions
84 struct PollFd : public ::pollfd
94 * initializes the struct with a given file descriptor and clears the event mask(s).
105 * set that we want to be notified about new incoming data
107 void setPOLLIN() { events |= POLLIN; }
110 * set that we want to be notified if we can send (more) data.
112 void setPOLLOUT() { events |= POLLOUT; }
114 }; // eo struct PollFd
117 typedef std::vector<PollFd> PollVector;
118 typedef std::map<int,PollVector::size_type> FdPollMap;
119 typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap;
123 * struct for interfacing our local structures with poll()
125 struct PollDataCluster
127 PollVector m_poll_vector;
128 FdPollMap m_fd_poll_map;
129 FdIOMap m_read_fd_io_map;
130 FdIOMap m_write_fd_io_map;
132 void add_read_fd( int fd, AsyncIo::IOImplementation* io);
133 void add_write_fd( int fd, AsyncIo::IOImplementation* io);
135 pollfd* get_pollfd_ptr();
136 unsigned int get_num_pollfds() const;
138 }; // eo struct PollDataCluster
141 typedef PtrList< AsyncIo::IOImplementation, true > IOList;
142 typedef PtrList< AsyncIo::TimerBase, true > TimerList;
144 template<> int IOList::GlobalCountType::InstanceCount= 0;
145 template<> int TimerList::GlobalCountType::InstanceCount= 0;
149 * the (internal) global list of io objects (object pointers)
153 static IOList _the_io_list;
159 * the (internal) global list of timer objects (object pointers)
161 TimerList& g_timer_list()
163 static TimerList _the_timer_list;
164 return _the_timer_list;
168 * implementation of PollDataCluster
173 * add a new file descriptor to the read list.
175 * @param fd the file descriptor.
176 * @param io the io object which uses the fd for reading.
178 void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io)
180 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
181 if (itPollMap != m_fd_poll_map.end())
183 m_poll_vector[ itPollMap->second ].setPOLLIN();
189 m_fd_poll_map[fd] = m_poll_vector.size();
190 m_poll_vector.push_back( item );
192 m_read_fd_io_map[fd]= io;
193 } // eo PollDataCluster::add_read_fd
197 * add a new file descriptor to the write list.
199 * @param fd the file descriptor.
200 * @param io the io object which uses the fd for writing.
202 void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io)
204 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
205 if (itPollMap != m_fd_poll_map.end())
207 m_poll_vector[ itPollMap->second ].setPOLLOUT();
213 m_fd_poll_map[fd] = m_poll_vector.size();
214 m_poll_vector.push_back( item );
216 m_write_fd_io_map[fd]= io;
217 } // eo PollDataCluster::add_write_fd
221 * returns a pointer to a pollfd array; suitable for passing to poll()
223 * @return pointer to pollfd array
225 pollfd* PollDataCluster::get_pollfd_ptr()
227 return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
228 } // eo get_pollfd_ptr
232 * returns the number of entries in the pollfd array; suitable for passing to poll()
234 * @return the number of entries in the pollfd array
236 unsigned int PollDataCluster::get_num_pollfds() const
238 return m_poll_vector.size();
239 } // eo get_num_pollfds
243 } // eo namespace internal_io
247 * some internal tool functions and structures
251 AsyncIo::FilterBasePtr m_filter;
253 FilterMatch(AsyncIo::FilterBasePtr filter)
257 bool operator () (const AsyncIo::FilterBasePtr& item)
259 return item && item == m_filter;
262 }; // eo struct FilterMatch
266 } // eo anonymous namespace
276 * implementation of TimerBase
280 * constructor. Adds the object to the internal timer list.
282 TimerBase::TimerBase()
286 internal_io::g_timer_list().add_item(this);
287 } // eo TimerBase::TimerBase
291 * destructor. Removes the object from the internal timer list.
293 TimerBase::~TimerBase()
296 if (internal_io::TimerList::Instances())
298 ODOUT("remove from list");
299 internal_io::g_timer_list().remove_item(this);
301 } // eo TimerBase::~TimerBase
305 * @brief returns the point in time when the time is executed in real time.
306 * @return the point in time when the timer is to be executed.
308 MilliTime TimerBase::getRealWhenTime() const
312 get_current_monotonic_time(mono_time);
313 get_current_real_time(real_time);
314 MilliTime result= m_when - mono_time + real_time;
316 } // eo TimerBase::getRealWhenTime() const
320 * sets the time when the event should be executed.
321 * @param sec the seconds part of the point in time.
322 * @param msec the milliseconds part of the point in time.
324 void TimerBase::setWhenTime(long sec, long msec)
326 m_when.set(sec,msec);
328 } // eo TimerBase::setWhenTime
332 * sets the time when the event should be executed.
333 * @param mt the point in time.
335 void TimerBase::setWhenTime(const MilliTime& mt)
339 } // eo TimerBase::setWhenTime
343 * sets the time delta measured from current time when the event should be executed.
344 * @param sec the seconds of the time delta
345 * @param msec the milli seconds of the time delta
347 void TimerBase::setDeltaWhenTime(long sec, long msec)
349 setDeltaWhenTime( MilliTime(sec,msec) );
350 } // eo TimerBase::setWhenTime
355 * sets the time delta measured from current time when the event should be executed.
356 * @param mt the time delta
358 void TimerBase::setDeltaWhenTime(const MilliTime& mt)
360 get_current_monotonic_time(m_when);
363 } // eo TimerBase::setWhenTime
367 * set the active state of the timer event.
368 * @param active determines if the object should be active (default: yes).
370 void TimerBase::activate(bool active)
375 // clear the mark if we are not active.
378 } // eo TimerBase::activate
381 /** @fn void TimerBase::deactivate()
382 * deactivates the event by clearing the active state.
387 * called when the timer event occured.
389 void TimerBase::execute()
391 } // eo TimerBase::execute
395 * implementation of FilterBase class
399 FilterBase::FilterBase()
402 } // eo FilterBase::FilterBase()
406 * injects incoming data.
407 * @param data the new data
409 void FilterBase::injectIncomingData(const std::string& data)
413 FilterBasePtr ptr= get_ptr_as< FilterBase >();
416 m_io->injectIncomingData(ptr,data);
419 } // FilterBase::injectIncomingData(const std::string&)
423 * injects outgoing data.
424 * @param data the new data
426 void FilterBase::injectOutgoingData(const std::string& data)
430 FilterBasePtr ptr= get_ptr_as< FilterBase >();
433 m_io->injectOutgoingData(ptr,data);
436 } // eo FilterBase::injectOutgoingData(const std::string&)
441 * called when EOF detected on incoming channel (or incoming channel closed)
443 void FilterBase::endOfIncomingData()
445 } // eo FilterBase::endOfIncomingData()
448 * called when the filter should reset.
449 * This is used when a new channel is opened or when the filter is taken out of a filter chain.
451 void FilterBase::reset()
453 } // eo FilterBase::reset()
457 * implementation of IOImplementation class
462 * constructor for the base io class.
464 * Also adds the object to internal list of io objects (which is used by the backend).
466 * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
467 * @param write_fd the file descriptor which should be used for writing (default -1 for no value)
469 IOImplementation::IOImplementation(int read_fd, int write_fd)
473 , m_not_writable(false)
476 , m_marked_for_reading(false)
477 , m_marked_for_writing(false)
479 internal_io::g_io_list().add_item(this);
482 setReadFd( read_fd );
486 setWriteFd( write_fd );
488 } // eo IOImplementation::IOImplementation
492 * destructor of the base io class.
494 * Removes the object from the interal list of io objects.
496 IOImplementation::~IOImplementation()
499 if (internal_io::IOList::Instances())
501 internal_io::g_io_list().remove_item(this);
503 // now clear the filters:
504 while (! m_filter_chain.empty() )
506 FilterChain::iterator it = m_filter_chain.begin();
509 //TODO: signal the filter that it is removed ?!
510 m_filter_chain.erase(it);
512 } // eo IOImplementation::~IOImplementation
516 * adds another filter to the filter chain.
517 * @param filter pointer to the new filter.
519 void IOImplementation::addFilter
526 return; // nothing to do
530 filter->m_io->removeFilter(filter);
532 m_filter_chain.push_back( filter );
533 } // eo IOImplementation::addFilter
537 * removes a filter from the filter chain.
538 * @param filter the pointer to the filter which is removed.
539 * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
540 * deleting the filter if it was dynamically allocated.
542 void IOImplementation::removeFilter
547 FilterChain::iterator it =
548 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
549 if (it != m_filter_chain.end())
553 //TODO: signal the filter that it is removed ?!
554 m_filter_chain.erase(it);
556 } // eo IOImplementation::removeFilter
560 * closes the file descriptors (/ the connection).
562 * @param direction the direction which should be closed (default: @a Direction::both for all).
564 void IOImplementation::close(Direction direction)
566 bool had_read_fd= (m_read_fd >= 0);
568 if (direction == Direction::unspecified) direction= Direction::both;
569 if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
570 { // special case: half closed (socket) connections...
571 // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
572 // (read_fd or write_fd) this class works as desired.
577 int res= ::shutdown(m_read_fd, SHUT_RD);
585 for(FilterChain::iterator it= m_filter_chain.begin();
586 it != m_filter_chain.end();
589 (*it)->endOfIncomingData();
597 int res= ::shutdown(m_write_fd, SHUT_WR);
603 m_output_buffer.clear();
608 if (m_write_fd >= 0 && (direction & Direction::out) )
610 int res1 = ::close(m_write_fd);
611 if (m_write_fd == m_read_fd)
616 m_output_buffer.clear();
617 if (res1<0) m_errno= errno;
619 if (m_read_fd >=0 && (direction & Direction::in) )
621 int res1 = ::close(m_read_fd);
623 if (res1<0) m_errno= errno;
625 if (had_read_fd && !m_eof && (m_read_fd<0))
627 for(FilterChain::iterator it= m_filter_chain.begin();
628 it != m_filter_chain.end();
631 (*it)->endOfIncomingData();
634 } // eo IOImplementation::close
638 * determines if the io class wants to read data.
639 * Default implementation checks only for a valid file descriptor value.
641 * @return @a true if the objects wants to read data
643 bool IOImplementation::wantRead()
645 return (m_read_fd >= 0) && ! m_eof;
646 } // eo IOImplementation::wantRead
650 * determines if the io class wants to write data.
651 * Default implementation checks for a valid file descriptor value and if the object
652 * cannot write data immediately.
654 * @return @a true if the objects wants to write data
656 bool IOImplementation::wantWrite()
658 return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
659 } // eo IOImplementation::wantWrite
663 * delivers if opened.
664 * The default returns @a true if at least one file descriptor (read or write) is valid.
665 * @return @a true if opened.
667 bool IOImplementation::opened() const
669 return (m_read_fd>=0) || (m_write_fd>=0);
670 } // eo IOImplementation::opened() const
674 * returns if the read side detected an end of file (EOF).
675 * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
677 bool IOImplementation::eof() const
679 return (m_read_fd < 0) || m_eof;
680 } // eo IOImplementatio::eof() const
684 * @brief returns of the write side didn't detect that it cannot write.
685 * @return @a true if we can write.
687 bool IOImplementation::writable() const
689 return (m_write_fd >=0 ) and not m_not_writable;
690 } // eo IOImplementation::writable() const
694 * returns if the output buffer is empty.
697 bool IOImplementation::empty() const
699 return m_output_buffer.empty();
700 } // eo IOImplementation::empty
703 * puts data into the output buffer and sends it immediately if possible,
705 * The data is passed through the filter chain before it's stored in the output buffer
706 * (i.e. the output buffer contains data as it should be send directly to the descriptor).
707 * @param _data the data which should be send.
709 void IOImplementation::lowSend(const std::string& _data)
711 std::string data(_data);
713 for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
714 it_filter!= m_filter_chain.rend();
717 data= (*it_filter)->filterOutgoingData(data);
719 m_output_buffer+= data;
721 // if we can send immediately, do it:
722 if (! m_output_buffer.empty() && m_marked_for_writing)
726 } // eo IOImplementation::lowSend
730 * called by the backend when there is data to read for this object.
732 * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
733 * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
735 * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
737 * @note overload this method only when You know what You are doing!
738 * (overloading is necessary when handling server sockets.)
740 void IOImplementation::doRead()
742 // static read buffer; should be ok as long as we don't use threads
743 static char buffer[8*1024]; // 8 KiB
746 if (m_read_fd<0 || !m_marked_for_reading)
748 ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
753 m_marked_for_reading = false;
755 // now read the data:
757 count = ::read(m_read_fd, buffer, sizeof(buffer));
759 ODOUT("::read -> " << count);
761 // interpret what we got:
762 if (count < 0) // error
775 close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
780 else if (count==0) // EOF
782 // remember the read fd:
788 // if the fd is still the same: close it.
791 close( Direction::in );
794 else // we have valid data
796 std::string data(buffer,count);
797 ODOUT(" got \"" << data << "\"");
798 for(FilterChain::iterator it_filter= m_filter_chain.begin();
799 it_filter != m_filter_chain.end();
802 data= (*it_filter)->filterIncomingData(data);
804 m_input_buffer+= data;
807 } // eo IOImplementation::doRead
811 * interface for filter classes to inject data into the filter chain (emulating new incoming data).
812 * @param from_filter the filter which injects the new data.
813 * @param _data the new data.
815 void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
817 FilterChain::iterator it_filter =
818 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
819 if (it_filter == m_filter_chain.end())
821 // dont accept data inject from a unknown filter
824 // well: pass the data through the remaining filters:
825 // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
826 std::string data(_data);
828 it_filter != m_filter_chain.end();
831 data= (*it_filter)->filterIncomingData(data);
833 m_input_buffer+= data;
835 } // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
839 * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
840 * @param from_filter the filter which injects the new data.
841 * @param _data the new data.
843 void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
845 FilterChain::reverse_iterator it_filter =
846 std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
847 if (it_filter == m_filter_chain.rend())
849 // dont accept data inject from a unknown filter
852 // well: pass the data through the remaining filters:
853 // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
854 std::string data(_data);
856 it_filter!= m_filter_chain.rend();
859 data= (*it_filter)->filterOutgoingData(data);
861 m_output_buffer+= data;
863 // if we can send immediately, do it:
864 if (! m_output_buffer.empty() && m_marked_for_writing)
868 } // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
872 * set the read file descriptor.
873 * Although a derived class can also set the read fd directly; this method should be used
874 * for this task since it updates some flags on the fd for async operation.
875 * @param fd the new read file descriptor.
877 void IOImplementation::setReadFd(int fd)
879 // test if we already have a valid descriptor (and may have to close it):
884 // fd was already right; consider it to be ok.
887 close(Direction::in);
891 // if the new descriptor looks valid, set some flags:
894 long flags= ::fcntl(fd, F_GETFL);
897 // set the flags for non blocking, async operation
898 flags |= O_NONBLOCK|O_ASYNC;
899 ::fcntl(fd,F_SETFL, flags);
901 else if ( errno == EBADF )
903 // well, we seemed to be fed with an invalid descriptor...:
908 if (fd >= 0) // if still valid:
910 // set the close-on-exec flag
911 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
914 m_marked_for_reading= false;
916 } // eo IOImplementation::setReadFd(int)
921 * set the write file descriptor.
922 * Although a derived class can also set the write fd directly; this method should be used
923 * for this task since it updates some flags on the fd for async operation.
924 * @param fd the new write file descriptor.
926 void IOImplementation::setWriteFd(int fd)
930 if (m_write_fd == fd)
932 // fd was already right; consider it to be ok.
935 close(Direction::out);
939 // if the new descriptor looks valid, set some flags:
942 long flags= ::fcntl(fd, F_GETFL);
945 // set the flags for non blocking, async operation
946 flags |= O_NONBLOCK|O_ASYNC;
947 ::fcntl(fd,F_SETFL, flags);
949 else if (errno == EBADF)
951 // well, we seemed to be fed with an invalid descriptor...:
956 if (fd >= 0) // if still valid:
958 // set the close-on-exec flag
959 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
962 m_marked_for_writing= false;
963 m_not_writable= false;
964 } // eo IOImplementation::setWriteFd(int)
969 * called by the backend when this object can write data.
971 * If some data was sended, the signal @a m_signal_write is called.
973 * @internal tries to write all buffered data to output; if this succeeds,
974 * the connection is assumed to be still able to accept more data.
975 * (i.e. the internal write mark is kept!)
977 * @note overload this method only when You know what You are doing!
979 void IOImplementation::doWrite()
982 if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
987 ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
990 m_marked_for_writing= false;
992 // now write the data
993 ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
995 ODOUT("::write -> " << count);
997 if (count < 0) // error
1005 m_not_writable= true;
1007 m_signal_not_writable();
1013 if (fd == m_write_fd)
1015 close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
1022 m_output_buffer.erase(0, count);
1023 if (m_output_buffer.empty())
1025 // special case: if we were able to send all the data, we keep the write mark:
1026 m_marked_for_writing= true;
1033 } // eo IOImplementation::doWrite
1037 * implementation of SimpleIO
1041 SimpleIO::SimpleIO(int read_fd, int write_fd)
1042 : inherited(read_fd, write_fd)
1044 m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
1045 } // eo SimpleIO::SimpleIO()
1048 SimpleIO::~SimpleIO()
1050 } // eo SimpleIO::~SimpleIO()
1055 * @param data the string.
1057 void SimpleIO::sendString(const std::string& data)
1060 } // eo SimpleIO::sendString(const std::string&)
1064 * emits the signal signalReceived with the received data.
1065 * This slot is connected to IOImplementation::m_signal_read.
1067 void SimpleIO::slotReceived()
1070 data.swap(m_input_buffer);
1071 signal_received_string(data);
1072 } // eo SimpleIO::slotReceived()
1077 * implementation of SimpleIO2
1081 SimpleIO2::SimpleIO2(int read_fd, int write_fd)
1082 : inherited(read_fd, write_fd)
1084 m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
1085 } // eo SimpleIO2::SimpleIO2()
1088 SimpleIO2::~SimpleIO2()
1090 } // eo SimpleIO2::~SimpleIO2()
1095 * @param data the string.
1097 void SimpleIO2::sendString(const std::string& data)
1100 } // eo SimpleIO2::sendString(const std::string&)
1104 * emits the signal signalReceived with the received data.
1105 * This slot is connected to IOImplementation::m_signal_read.
1107 void SimpleIO2::slotReceived()
1110 data.swap(m_input_buffer);
1111 signal_received_string(data);
1112 } // eo SimpleIO2::slotReceived()
1117 * implementation of class Backend (singleton)
1120 Backend* Backend::g_backend= NULL;
1122 int Backend::m_count_active_steps=0;
1126 : m_count_active_loops(0)
1127 , m_count_stop_requests(0)
1129 SystemTools::ignore_signal( SystemTools::Signal::PIPE );
1130 } // eo Backend::Backend
1135 SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
1136 } // eo Backend::~Backend()
1139 * delivers pointer to the current backend, instantiating a new backend if there was no current one.
1141 * This should be the only way to access the backend which should be a singleton.
1143 * @return the pointer to the current backend.
1145 Backend* Backend::getBackend()
1149 g_backend = new Backend();
1152 } // eo Backend::getBackend
1158 * performs one backend cycle.
1160 * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
1161 * Also determines the timer events which become due and adjusts the timeout.
1162 * Constructs the necessary structures and calls poll().
1163 * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
1165 * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
1166 * @return @a true if there was at least one active object; otherwise @a false
1168 * @note this method is a little beast.
1171 * The cycle is divided into four steps: collecting; poll; mark and execute.
1172 * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
1173 * are calling @a Backup::doOneStep or open their own local backend loop.
1175 * @todo handle some more error cases.
1176 * @todo provide a plugin interface for external handler.
1177 * (currently inclusion of external handler is possible by (ab)using timer classes)
1179 bool Backend::doOneStep(int timeout)
1181 ODOUT( "timeout=" << timeout );
1182 internal_io::PollDataCluster poll_data;
1183 bool had_active_object = false;
1185 ++m_count_active_steps;
1189 FdSetType local_read_fds;
1190 FdSetType local_write_fds;
1194 { // step 1.1: collect fds for read/write operations
1195 for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
1196 itIOList != internal_io::g_io_list().end();
1199 if (! *itIOList) continue; // skip NULL entries
1200 int read_fd = (*itIOList)->m_read_fd;
1201 int write_fd = (*itIOList)->m_write_fd;
1202 bool want_read = (read_fd >= 0) and (*itIOList)->wantRead();
1203 bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
1206 local_read_fds.insert( read_fd );
1210 local_write_fds.insert( write_fd );
1212 if (!want_read && !want_write) continue;
1215 FODOUT( (*itIOList), "wants to read (fd=" << read_fd << ")");
1216 poll_data.add_read_fd(read_fd, *itIOList);
1220 FODOUT( (*itIOList), "wants to write (fd=" << write_fd << ")");
1221 poll_data.add_write_fd(write_fd, *itIOList);
1223 had_active_object= true;
1227 { // step 1.2: collect timer events
1228 MilliTime current_time;
1229 MilliTime min_event_time;
1231 get_current_monotonic_time(current_time);
1232 bool min_event_time_set;
1236 min_event_time = current_time + MilliTime(0,timeout);
1237 min_event_time_set= true;
1241 min_event_time = current_time + MilliTime(86400,0);
1242 min_event_time_set= false;
1246 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1247 it_timer != internal_io::g_timer_list().end()
1248 && (!had_active_object || !min_event_time_set || current_time < min_event_time);
1251 if (! *it_timer) continue; // skip NULL entries
1252 if (! (*it_timer)->m_active) continue; // skip if not enabled
1253 if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
1255 min_event_time = (*it_timer)->m_when;
1256 min_event_time_set= true;
1258 had_active_object= true;
1261 if (min_event_time_set)
1262 { // we have at a minimal event time, so (re)compute the timeout value:
1263 MilliTime delta= (min_event_time - current_time);
1264 long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
1271 timeout= delta_ms + (delta_ms<5 ? 1 : 3);
1277 ODOUT(" poll timeout is " << timeout);
1279 MilliTime current_time;
1280 get_current_monotonic_time(current_time);
1281 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1283 int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
1285 ODOUT("poll -> " << poll_result);
1287 MilliTime current_time;
1288 get_current_monotonic_time(current_time);
1289 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1292 if (poll_result < 0)
1294 //TODO poll error handling (signals ?!)
1299 // step 3.1: mark io objects (if necessary)
1300 if (poll_result > 0)
1302 for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
1303 itPollItem != poll_data.m_poll_vector.end();
1306 ODOUT(" fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
1307 if ( 0 == (itPollItem->revents))
1308 { // preliminary continuation if nothing is to handle for this item(/fd)...
1311 if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
1313 IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
1314 if (io && io->m_read_fd==itPollItem->fd)
1316 FODOUT(io,"marked for reading");
1317 io->m_marked_for_reading= true;
1320 if ( 0!= (itPollItem->revents & POLLOUT))
1322 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1323 if (io && io->m_write_fd==itPollItem->fd)
1325 io->m_marked_for_writing= true;
1328 if ( 0!= (itPollItem->revents & POLLERR))
1330 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1331 if (0!= (itPollItem->events & POLLOUT))
1333 if (io && io->m_write_fd==itPollItem->fd)
1335 io->m_marked_for_writing= false;
1336 //io->close( Direction::out );
1340 // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
1344 //Step 3.2: mark timer objects
1346 MilliTime current_time;
1348 get_current_monotonic_time(current_time);
1349 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1351 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1352 it_timer != internal_io::g_timer_list().end();
1355 ODOUT(" check timer " << *it_timer);
1356 if (! *it_timer) continue; // skip NULL entries
1357 if (! (*it_timer)->m_active) continue; // skip if not enabled
1358 if ( (*it_timer)->m_when <= current_time)
1361 (*it_timer)->m_marked = true;
1369 // step 4.1: execute io
1370 ODOUT("execute stage");
1371 for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
1372 it_io != internal_io::g_io_list().end();
1375 ODOUT(" check obj " << *it_io);
1376 if (NULL == *it_io) continue;
1377 if ((*it_io)->m_marked_for_writing)
1379 FODOUT((*it_io),"exec doWrite");
1380 (*it_io)->doWrite();
1381 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1382 if ((*it_io)->m_errno)
1387 if ((*it_io)->m_marked_for_reading)
1389 FODOUT((*it_io),"exec doRead");
1391 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1392 if ((*it_io)->m_errno)
1399 // step 4.2: execute timer events
1401 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1402 it_timer != internal_io::g_timer_list().end();
1405 if (! *it_timer) continue; // skip NULL entries
1406 if (! (*it_timer)->m_active) continue; // skip if not enabled
1407 if (! (*it_timer)->m_marked) continue; // skip if not marked
1409 // reset the mark and deactivate object now since the execute() method might activate it again
1410 (*it_timer)->m_marked= false;
1411 (*it_timer)->m_active= false;
1413 // now execute the event:
1414 (*it_timer)->execute();
1421 // clean up our counter
1422 --m_count_active_steps;
1423 // and forward the exception
1427 if ( 0 == --m_count_active_steps)
1429 internal_io::g_io_list().clean_list();
1430 internal_io::g_timer_list().clean_list();
1433 return had_active_object;
1434 } // eo Backend::doOneStep
1438 * enters a backend loop.
1440 * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
1441 * active objects (io objects or timer objects).
1445 ++m_count_active_loops;
1450 if (!doOneStep(c_max_poll_wait))
1452 // stop if there are no more active objects.
1458 // clean up our counter
1459 --m_count_active_loops;
1460 // and forward the exception
1464 while (0 == m_count_stop_requests);
1465 --m_count_active_loops;
1466 --m_count_stop_requests;
1467 } // eo Backend::run
1471 * @brief stops the latest loop currently run by Backend::run().
1472 * @see Backend::run()
1474 void Backend::stop()
1476 if (m_count_active_loops)
1478 ++m_count_stop_requests;
1480 } // eo Backend::stop()
1484 } // eo namespace AsyncIo