2 The software in this package is distributed under the GNU General
3 Public License version 2 (with a special exception described below).
5 A copy of GNU General Public License (GPL) is included in this distribution,
6 in the file COPYING.GPL.
8 As a special exception, if other files instantiate templates or use macros
9 or inline functions from this file, or you compile this file and link it
10 with other works to produce a work based on this file, this file
11 does not by itself cause the resulting work to be covered
12 by the GNU General Public License.
14 However the source code for this file must still be made available
15 in accordance with section (3) of the GNU General Public License.
17 This exception does not invalidate any other reasons why a work based
18 on this file might be covered by the GNU General Public License.
21 * @copyright Copyright © 2007-2008 by Intra2net AG
26 #include "async_io.hpp"
27 #include <asyncio_config.hpp>
37 #include <sys/socket.h>
42 #include <boost/bind.hpp>
44 #ifdef HAVE_LIBI2NCOMMON
45 # include <signalfunc.hpp>
46 //# include <timefunc.hxx>
56 #define DOUT(msg) std::cout << msg << std::endl
57 #define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
58 //#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
59 #define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
61 #define DOUT(msg) do {} while (0)
62 #define FODOUT(obj,msg) do {} while (0)
63 #define ODOUT(msg) do {} while (0)
70 using namespace AsyncIo::Utils;
77 const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
81 * contains internal helper structs and functions for io handling.
87 * extends struct pollfd with some convenience functions
89 struct PollFd : public ::pollfd
99 * initializes the struct with a given file descriptor and clears the event mask(s).
110 * set that we want to be notified about new incoming data
112 void setPOLLIN() { events |= POLLIN; }
115 * set that we want to be notified if we can send (more) data.
117 void setPOLLOUT() { events |= POLLOUT; }
119 }; // eo struct PollFd
122 typedef std::vector<PollFd> PollVector;
123 typedef std::map<int,PollVector::size_type> FdPollMap;
124 typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap;
128 * struct for interfacing our local structures with poll()
130 struct PollDataCluster
132 PollVector m_poll_vector;
133 FdPollMap m_fd_poll_map;
134 FdIOMap m_read_fd_io_map;
135 FdIOMap m_write_fd_io_map;
137 void add_read_fd( int fd, AsyncIo::IOImplementation* io);
138 void add_write_fd( int fd, AsyncIo::IOImplementation* io);
140 pollfd* get_pollfd_ptr();
141 unsigned int get_num_pollfds() const;
143 }; // eo struct PollDataCluster
146 typedef PtrList< AsyncIo::IOImplementation, true > IOList;
147 typedef PtrList< AsyncIo::TimerBase, true > TimerList;
149 template<> int IOList::GlobalCountType::InstanceCount= 0;
150 template<> int TimerList::GlobalCountType::InstanceCount= 0;
154 * the (internal) global list of io objects (object pointers)
158 static IOList _the_io_list;
164 * the (internal) global list of timer objects (object pointers)
166 TimerList& g_timer_list()
168 static TimerList _the_timer_list;
169 return _the_timer_list;
173 * implementation of PollDataCluster
178 * add a new file descriptor to the read list.
180 * @param fd the file descriptor.
181 * @param io the io object which uses the fd for reading.
183 void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io)
185 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
186 if (itPollMap != m_fd_poll_map.end())
188 m_poll_vector[ itPollMap->second ].setPOLLIN();
194 m_fd_poll_map[fd] = m_poll_vector.size();
195 m_poll_vector.push_back( item );
197 m_read_fd_io_map[fd]= io;
198 } // eo PollDataCluster::add_read_fd
202 * add a new file descriptor to the write list.
204 * @param fd the file descriptor.
205 * @param io the io object which uses the fd for writing.
207 void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io)
209 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
210 if (itPollMap != m_fd_poll_map.end())
212 m_poll_vector[ itPollMap->second ].setPOLLOUT();
218 m_fd_poll_map[fd] = m_poll_vector.size();
219 m_poll_vector.push_back( item );
221 m_write_fd_io_map[fd]= io;
222 } // eo PollDataCluster::add_write_fd
226 * returns a pointer to a pollfd array; suitable for passing to poll()
228 * @return pointer to pollfd array
230 pollfd* PollDataCluster::get_pollfd_ptr()
232 return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
233 } // eo get_pollfd_ptr
237 * returns the number of entries in the pollfd array; suitable for passing to poll()
239 * @return the number of entries in the pollfd array
241 unsigned int PollDataCluster::get_num_pollfds() const
243 return m_poll_vector.size();
244 } // eo get_num_pollfds
248 } // eo namespace internal_io
252 * some internal tool functions and structures
256 AsyncIo::FilterBasePtr m_filter;
258 FilterMatch(AsyncIo::FilterBasePtr filter)
262 bool operator () (const AsyncIo::FilterBasePtr& item)
264 return item && item == m_filter;
267 }; // eo struct FilterMatch
271 } // eo anonymous namespace
281 * implementation of TimerBase
285 * constructor. Adds the object to the internal timer list.
287 TimerBase::TimerBase()
291 internal_io::g_timer_list().add_item(this);
292 } // eo TimerBase::TimerBase
296 * destructor. Removes the object from the internal timer list.
298 TimerBase::~TimerBase()
301 if (internal_io::TimerList::Instances())
303 ODOUT("remove from list");
304 internal_io::g_timer_list().remove_item(this);
306 } // eo TimerBase::~TimerBase
310 * @brief returns the point in time when the time is executed in real time.
311 * @return the point in time when the timer is to be executed.
313 MilliTime TimerBase::getRealWhenTime() const
317 get_current_monotonic_time(mono_time);
318 get_current_real_time(real_time);
319 MilliTime result= m_when - mono_time + real_time;
321 } // eo TimerBase::getRealWhenTime() const
325 * sets the time when the event should be executed.
326 * @param sec the seconds part of the point in time.
327 * @param msec the milliseconds part of the point in time.
329 void TimerBase::setWhenTime(long sec, long msec)
331 m_when.set(sec,msec);
333 } // eo TimerBase::setWhenTime
337 * sets the time when the event should be executed.
338 * @param mt the point in time.
340 void TimerBase::setWhenTime(const MilliTime& mt)
344 } // eo TimerBase::setWhenTime
348 * sets the time delta measured from current time when the event should be executed.
349 * @param sec the seconds of the time delta
350 * @param msec the milli seconds of the time delta
352 void TimerBase::setDeltaWhenTime(long sec, long msec)
354 setDeltaWhenTime( MilliTime(sec,msec) );
355 } // eo TimerBase::setWhenTime
360 * sets the time delta measured from current time when the event should be executed.
361 * @param mt the time delta
363 void TimerBase::setDeltaWhenTime(const MilliTime& mt)
365 get_current_monotonic_time(m_when);
368 } // eo TimerBase::setWhenTime
372 * set the active state of the timer event.
373 * @param active determines if the object should be active (default: yes).
375 void TimerBase::activate(bool active)
380 // clear the mark if we are not active.
383 } // eo TimerBase::activate
386 /** @fn void TimerBase::deactivate()
387 * deactivates the event by clearing the active state.
392 * called when the timer event occured.
394 void TimerBase::execute()
396 } // eo TimerBase::execute
400 * implementation of FilterBase class
404 FilterBase::FilterBase()
407 } // eo FilterBase::FilterBase()
411 * injects incoming data.
412 * @param data the new data
414 void FilterBase::injectIncomingData(const std::string& data)
418 FilterBasePtr ptr= get_ptr_as< FilterBase >();
421 m_io->injectIncomingData(ptr,data);
424 } // FilterBase::injectIncomingData(const std::string&)
428 * injects outgoing data.
429 * @param data the new data
431 void FilterBase::injectOutgoingData(const std::string& data)
435 FilterBasePtr ptr= get_ptr_as< FilterBase >();
438 m_io->injectOutgoingData(ptr,data);
441 } // eo FilterBase::injectOutgoingData(const std::string&)
446 * called when EOF detected on incoming channel (or incoming channel closed)
448 void FilterBase::endOfIncomingData()
450 } // eo FilterBase::endOfIncomingData()
453 * called when the filter should reset.
454 * This is used when a new channel is opened or when the filter is taken out of a filter chain.
456 void FilterBase::reset()
458 } // eo FilterBase::reset()
462 * implementation of IOImplementation class
467 * constructor for the base io class.
469 * Also adds the object to internal list of io objects (which is used by the backend).
471 * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
472 * @param write_fd the file descriptor which should be used for writing (default -1 for no value)
474 IOImplementation::IOImplementation(int read_fd, int write_fd)
478 , m_not_writable(false)
481 , m_marked_for_reading(false)
482 , m_marked_for_writing(false)
484 internal_io::g_io_list().add_item(this);
487 setReadFd( read_fd );
491 setWriteFd( write_fd );
493 } // eo IOImplementation::IOImplementation
497 * destructor of the base io class.
499 * Removes the object from the interal list of io objects.
501 IOImplementation::~IOImplementation()
504 if (internal_io::IOList::Instances())
506 internal_io::g_io_list().remove_item(this);
508 // now clear the filters:
509 while (! m_filter_chain.empty() )
511 FilterChain::iterator it = m_filter_chain.begin();
514 //TODO: signal the filter that it is removed ?!
515 m_filter_chain.erase(it);
517 } // eo IOImplementation::~IOImplementation
521 * adds another filter to the filter chain.
522 * @param filter pointer to the new filter.
524 void IOImplementation::addFilter
531 return; // nothing to do
535 filter->m_io->removeFilter(filter);
537 m_filter_chain.push_back( filter );
538 } // eo IOImplementation::addFilter
542 * removes a filter from the filter chain.
543 * @param filter the pointer to the filter which is removed.
544 * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
545 * deleting the filter if it was dynamically allocated.
547 void IOImplementation::removeFilter
552 FilterChain::iterator it =
553 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
554 if (it != m_filter_chain.end())
558 //TODO: signal the filter that it is removed ?!
559 m_filter_chain.erase(it);
561 } // eo IOImplementation::removeFilter
565 * closes the file descriptors (/ the connection).
567 * @param direction the direction which should be closed (default: @a Direction::both for all).
569 void IOImplementation::close(Direction direction)
571 bool had_read_fd= (m_read_fd >= 0);
573 if (direction == Direction::unspecified) direction= Direction::both;
574 if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
575 { // special case: half closed (socket) connections...
576 // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
577 // (read_fd or write_fd) this class works as desired.
582 int res= ::shutdown(m_read_fd, SHUT_RD);
590 for(FilterChain::iterator it= m_filter_chain.begin();
591 it != m_filter_chain.end();
594 (*it)->endOfIncomingData();
602 int res= ::shutdown(m_write_fd, SHUT_WR);
608 m_output_buffer.clear();
613 if (m_write_fd >= 0 && (direction & Direction::out) )
615 int res1 = ::close(m_write_fd);
616 if (m_write_fd == m_read_fd)
621 m_output_buffer.clear();
622 if (res1<0) m_errno= errno;
624 if (m_read_fd >=0 && (direction & Direction::in) )
626 int res1 = ::close(m_read_fd);
628 if (res1<0) m_errno= errno;
630 if (had_read_fd && !m_eof && (m_read_fd<0))
632 for(FilterChain::iterator it= m_filter_chain.begin();
633 it != m_filter_chain.end();
636 (*it)->endOfIncomingData();
639 } // eo IOImplementation::close
643 * determines if the io class wants to read data.
644 * Default implementation checks only for a valid file descriptor value.
646 * @return @a true if the objects wants to read data
648 bool IOImplementation::wantRead()
650 return (m_read_fd >= 0) && ! m_eof;
651 } // eo IOImplementation::wantRead
655 * determines if the io class wants to write data.
656 * Default implementation checks for a valid file descriptor value and if the object
657 * cannot write data immediately.
659 * @return @a true if the objects wants to write data
661 bool IOImplementation::wantWrite()
663 return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
664 } // eo IOImplementation::wantWrite
668 * delivers if opened.
669 * The default returns @a true if at least one file descriptor (read or write) is valid.
670 * @return @a true if opened.
672 bool IOImplementation::opened() const
674 return (m_read_fd>=0) || (m_write_fd>=0);
675 } // eo IOImplementation::opened() const
679 * returns if the read side detected an end of file (EOF).
680 * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
682 bool IOImplementation::eof() const
684 return (m_read_fd < 0) || m_eof;
685 } // eo IOImplementatio::eof() const
689 * @brief returns of the write side didn't detect that it cannot write.
690 * @return @a true if we can write.
692 bool IOImplementation::writable() const
694 return (m_write_fd >=0 ) and not m_not_writable;
695 } // eo IOImplementation::writable() const
699 * returns if the output buffer is empty.
702 bool IOImplementation::empty() const
704 return m_output_buffer.empty();
705 } // eo IOImplementation::empty
708 * puts data into the output buffer and sends it immediately if possible,
710 * The data is passed through the filter chain before it's stored in the output buffer
711 * (i.e. the output buffer contains data as it should be send directly to the descriptor).
712 * @param _data the data which should be send.
714 void IOImplementation::lowSend(const std::string& _data)
716 std::string data(_data);
718 for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
719 it_filter!= m_filter_chain.rend();
722 data= (*it_filter)->filterOutgoingData(data);
724 m_output_buffer+= data;
726 // if we can send immediately, do it:
727 if (! m_output_buffer.empty() && m_marked_for_writing)
731 } // eo IOImplementation::lowSend
735 * called by the backend when there is data to read for this object.
737 * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
738 * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
740 * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
742 * @note overload this method only when You know what You are doing!
743 * (overloading is necessary when handling server sockets.)
745 void IOImplementation::doRead()
747 // static read buffer; should be ok as long as we don't use threads
748 static char buffer[8*1024]; // 8 KiB
751 if (m_read_fd<0 || !m_marked_for_reading)
753 ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
758 m_marked_for_reading = false;
760 // now read the data:
762 count = ::read(m_read_fd, buffer, sizeof(buffer));
764 ODOUT("::read -> " << count);
766 // interpret what we got:
767 if (count < 0) // error
780 close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
785 else if (count==0) // EOF
787 // remember the read fd:
793 // if the fd is still the same: close it.
796 close( Direction::in );
799 else // we have valid data
801 std::string data(buffer,count);
802 ODOUT(" got \"" << data << "\"");
803 for(FilterChain::iterator it_filter= m_filter_chain.begin();
804 it_filter != m_filter_chain.end();
807 data= (*it_filter)->filterIncomingData(data);
809 m_input_buffer+= data;
812 } // eo IOImplementation::doRead
816 * interface for filter classes to inject data into the filter chain (emulating new incoming data).
817 * @param from_filter the filter which injects the new data.
818 * @param _data the new data.
820 void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
822 FilterChain::iterator it_filter =
823 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
824 if (it_filter == m_filter_chain.end())
826 // dont accept data inject from a unknown filter
829 // well: pass the data through the remaining filters:
830 // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
831 std::string data(_data);
833 it_filter != m_filter_chain.end();
836 data= (*it_filter)->filterIncomingData(data);
838 m_input_buffer+= data;
840 } // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
844 * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
845 * @param from_filter the filter which injects the new data.
846 * @param _data the new data.
848 void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
850 FilterChain::reverse_iterator it_filter =
851 std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
852 if (it_filter == m_filter_chain.rend())
854 // dont accept data inject from a unknown filter
857 // well: pass the data through the remaining filters:
858 // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
859 std::string data(_data);
861 it_filter!= m_filter_chain.rend();
864 data= (*it_filter)->filterOutgoingData(data);
866 m_output_buffer+= data;
868 // if we can send immediately, do it:
869 if (! m_output_buffer.empty() && m_marked_for_writing)
873 } // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
877 * set the read file descriptor.
878 * Although a derived class can also set the read fd directly; this method should be used
879 * for this task since it updates some flags on the fd for async operation.
880 * @param fd the new read file descriptor.
882 void IOImplementation::setReadFd(int fd)
884 // test if we already have a valid descriptor (and may have to close it):
889 // fd was already right; consider it to be ok.
892 close(Direction::in);
896 // if the new descriptor looks valid, set some flags:
899 long flags= ::fcntl(fd, F_GETFL);
902 // set the flags for non blocking, async operation
903 flags |= O_NONBLOCK|O_ASYNC;
904 ::fcntl(fd,F_SETFL, flags);
906 else if ( errno == EBADF )
908 // well, we seemed to be fed with an invalid descriptor...:
913 if (fd >= 0) // if still valid:
915 // set the close-on-exec flag
916 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
919 m_marked_for_reading= false;
921 } // eo IOImplementation::setReadFd(int)
926 * set the write file descriptor.
927 * Although a derived class can also set the write fd directly; this method should be used
928 * for this task since it updates some flags on the fd for async operation.
929 * @param fd the new write file descriptor.
931 void IOImplementation::setWriteFd(int fd)
935 if (m_write_fd == fd)
937 // fd was already right; consider it to be ok.
940 close(Direction::out);
944 // if the new descriptor looks valid, set some flags:
947 long flags= ::fcntl(fd, F_GETFL);
950 // set the flags for non blocking, async operation
951 flags |= O_NONBLOCK|O_ASYNC;
952 ::fcntl(fd,F_SETFL, flags);
954 else if (errno == EBADF)
956 // well, we seemed to be fed with an invalid descriptor...:
961 if (fd >= 0) // if still valid:
963 // set the close-on-exec flag
964 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
967 m_marked_for_writing= false;
968 m_not_writable= false;
969 } // eo IOImplementation::setWriteFd(int)
974 * called by the backend when this object can write data.
976 * If some data was sended, the signal @a m_signal_write is called.
978 * @internal tries to write all buffered data to output; if this succeeds,
979 * the connection is assumed to be still able to accept more data.
980 * (i.e. the internal write mark is kept!)
982 * @note overload this method only when You know what You are doing!
984 void IOImplementation::doWrite()
987 if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
992 ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
995 m_marked_for_writing= false;
997 // now write the data
998 ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
1000 ODOUT("::write -> " << count);
1002 if (count < 0) // error
1010 m_not_writable= true;
1012 m_signal_not_writable();
1018 if (fd == m_write_fd)
1020 close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
1027 m_output_buffer.erase(0, count);
1028 if (m_output_buffer.empty())
1030 // special case: if we were able to send all the data, we keep the write mark:
1031 m_marked_for_writing= true;
1038 } // eo IOImplementation::doWrite
1042 * implementation of SimpleIO
1046 SimpleIO::SimpleIO(int read_fd, int write_fd)
1047 : inherited(read_fd, write_fd)
1049 m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
1050 } // eo SimpleIO::SimpleIO()
1053 SimpleIO::~SimpleIO()
1055 } // eo SimpleIO::~SimpleIO()
1060 * @param data the string.
1062 void SimpleIO::sendString(const std::string& data)
1065 } // eo SimpleIO::sendString(const std::string&)
1069 * emits the signal signalReceived with the received data.
1070 * This slot is connected to IOImplementation::m_signal_read.
1072 void SimpleIO::slotReceived()
1075 data.swap(m_input_buffer);
1076 signal_received_string(data);
1077 } // eo SimpleIO::slotReceived()
1082 * implementation of SimpleIO2
1086 SimpleIO2::SimpleIO2(int read_fd, int write_fd)
1087 : inherited(read_fd, write_fd)
1089 m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
1090 } // eo SimpleIO2::SimpleIO2()
1093 SimpleIO2::~SimpleIO2()
1095 } // eo SimpleIO2::~SimpleIO2()
1100 * @param data the string.
1102 void SimpleIO2::sendString(const std::string& data)
1105 } // eo SimpleIO2::sendString(const std::string&)
1109 * emits the signal signalReceived with the received data.
1110 * This slot is connected to IOImplementation::m_signal_read.
1112 void SimpleIO2::slotReceived()
1115 data.swap(m_input_buffer);
1116 signal_received_string(data);
1117 } // eo SimpleIO2::slotReceived()
1122 * implementation of class Backend (singleton)
1125 Backend* Backend::g_backend= NULL;
1127 int Backend::m_count_active_steps=0;
1131 : m_count_active_loops(0)
1132 , m_count_stop_requests(0)
1134 #ifdef HAVE_LIBI2NCOMMON
1135 SystemTools::ignore_signal( SystemTools::Signal::PIPE );
1137 signal( SIGPIPE, SIG_IGN );
1139 } // eo Backend::Backend
1144 #ifdef HAVE_LIBI2NCOMMON
1145 SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
1147 signal( SIGPIPE, SIG_DFL );
1149 } // eo Backend::~Backend()
1152 * delivers pointer to the current backend, instantiating a new backend if there was no current one.
1154 * This should be the only way to access the backend which should be a singleton.
1156 * @return the pointer to the current backend.
1158 Backend* Backend::getBackend()
1162 g_backend = new Backend();
1165 } // eo Backend::getBackend
1171 * performs one backend cycle.
1173 * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
1174 * Also determines the timer events which become due and adjusts the timeout.
1175 * Constructs the necessary structures and calls poll().
1176 * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
1178 * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
1179 * @return @a true if there was at least one active object; otherwise @a false
1181 * @note this method is a little beast.
1184 * The cycle is divided into four steps: collecting; poll; mark and execute.
1185 * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
1186 * are calling @a Backup::doOneStep or open their own local backend loop.
1188 * @todo handle some more error cases.
1189 * @todo provide a plugin interface for external handler.
1190 * (currently inclusion of external handler is possible by (ab)using timer classes)
1192 bool Backend::doOneStep(int timeout)
1194 ODOUT( "timeout=" << timeout );
1195 internal_io::PollDataCluster poll_data;
1196 bool had_active_object = false;
1198 ++m_count_active_steps;
1202 FdSetType local_read_fds;
1203 FdSetType local_write_fds;
1207 { // step 1.1: collect fds for read/write operations
1208 for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
1209 itIOList != internal_io::g_io_list().end();
1212 if (! *itIOList) continue; // skip NULL entries
1213 int read_fd = (*itIOList)->m_read_fd;
1214 int write_fd = (*itIOList)->m_write_fd;
1215 bool want_read = (read_fd >= 0) and (*itIOList)->wantRead();
1216 bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
1219 local_read_fds.insert( read_fd );
1223 local_write_fds.insert( write_fd );
1225 if (!want_read && !want_write) continue;
1228 FODOUT( (*itIOList), "wants to read (fd=" << read_fd << ")");
1229 poll_data.add_read_fd(read_fd, *itIOList);
1233 FODOUT( (*itIOList), "wants to write (fd=" << write_fd << ")");
1234 poll_data.add_write_fd(write_fd, *itIOList);
1236 had_active_object= true;
1240 { // step 1.2: collect timer events
1241 MilliTime current_time;
1242 MilliTime min_event_time;
1244 get_current_monotonic_time(current_time);
1245 bool min_event_time_set;
1249 min_event_time = current_time + MilliTime(0,timeout);
1250 min_event_time_set= true;
1254 min_event_time = current_time + MilliTime(86400,0);
1255 min_event_time_set= false;
1259 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1260 it_timer != internal_io::g_timer_list().end()
1261 && (!had_active_object || !min_event_time_set || current_time < min_event_time);
1264 if (! *it_timer) continue; // skip NULL entries
1265 if (! (*it_timer)->m_active) continue; // skip if not enabled
1266 if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
1268 min_event_time = (*it_timer)->m_when;
1269 min_event_time_set= true;
1271 had_active_object= true;
1274 if (min_event_time_set)
1275 { // we have at a minimal event time, so (re)compute the timeout value:
1276 MilliTime delta= (min_event_time - current_time);
1277 long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
1284 timeout= delta_ms + (delta_ms<5 ? 1 : 3);
1290 ODOUT(" poll timeout is " << timeout);
1292 MilliTime current_time;
1293 get_current_monotonic_time(current_time);
1294 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1296 int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
1298 ODOUT("poll -> " << poll_result);
1300 MilliTime current_time;
1301 get_current_monotonic_time(current_time);
1302 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1305 if (poll_result < 0)
1307 //TODO poll error handling (signals ?!)
1312 // step 3.1: mark io objects (if necessary)
1313 if (poll_result > 0)
1315 for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
1316 itPollItem != poll_data.m_poll_vector.end();
1319 ODOUT(" fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
1320 if ( 0 == (itPollItem->revents))
1321 { // preliminary continuation if nothing is to handle for this item(/fd)...
1324 if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
1326 IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
1327 if (io && io->m_read_fd==itPollItem->fd)
1329 FODOUT(io,"marked for reading");
1330 io->m_marked_for_reading= true;
1333 if ( 0!= (itPollItem->revents & POLLOUT))
1335 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1336 if (io && io->m_write_fd==itPollItem->fd)
1338 io->m_marked_for_writing= true;
1341 if ( 0!= (itPollItem->revents & POLLERR))
1343 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1344 if (0!= (itPollItem->events & POLLOUT))
1346 if (io && io->m_write_fd==itPollItem->fd)
1348 io->m_marked_for_writing= false;
1349 //io->close( Direction::out );
1353 // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
1357 //Step 3.2: mark timer objects
1359 MilliTime current_time;
1361 get_current_monotonic_time(current_time);
1362 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1364 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1365 it_timer != internal_io::g_timer_list().end();
1368 ODOUT(" check timer " << *it_timer);
1369 if (! *it_timer) continue; // skip NULL entries
1370 if (! (*it_timer)->m_active) continue; // skip if not enabled
1371 if ( (*it_timer)->m_when <= current_time)
1374 (*it_timer)->m_marked = true;
1382 // step 4.1: execute io
1383 ODOUT("execute stage");
1384 for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
1385 it_io != internal_io::g_io_list().end();
1388 ODOUT(" check obj " << *it_io);
1389 if (NULL == *it_io) continue;
1390 if ((*it_io)->m_marked_for_writing)
1392 FODOUT((*it_io),"exec doWrite");
1393 (*it_io)->doWrite();
1394 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1395 if ((*it_io)->m_errno)
1400 if ((*it_io)->m_marked_for_reading)
1402 FODOUT((*it_io),"exec doRead");
1404 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1405 if ((*it_io)->m_errno)
1412 // step 4.2: execute timer events
1414 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1415 it_timer != internal_io::g_timer_list().end();
1418 if (! *it_timer) continue; // skip NULL entries
1419 if (! (*it_timer)->m_active) continue; // skip if not enabled
1420 if (! (*it_timer)->m_marked) continue; // skip if not marked
1422 // reset the mark and deactivate object now since the execute() method might activate it again
1423 (*it_timer)->m_marked= false;
1424 (*it_timer)->m_active= false;
1426 // now execute the event:
1427 (*it_timer)->execute();
1434 // clean up our counter
1435 --m_count_active_steps;
1436 // and forward the exception
1440 if ( 0 == --m_count_active_steps)
1442 internal_io::g_io_list().clean_list();
1443 internal_io::g_timer_list().clean_list();
1446 return had_active_object;
1447 } // eo Backend::doOneStep
1451 * enters a backend loop.
1453 * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
1454 * active objects (io objects or timer objects).
1458 ++m_count_active_loops;
1463 if (!doOneStep(c_max_poll_wait))
1465 // stop if there are no more active objects.
1471 // clean up our counter
1472 --m_count_active_loops;
1473 // and forward the exception
1477 while (0 == m_count_stop_requests);
1478 --m_count_active_loops;
1479 --m_count_stop_requests;
1480 } // eo Backend::run
1484 * @brief stops the latest loop currently run by Backend::run().
1485 * @see Backend::run()
1487 void Backend::stop()
1489 if (m_count_active_loops)
1491 ++m_count_stop_requests;
1493 } // eo Backend::stop()
1497 } // eo namespace AsyncIo