Commit | Line | Data |
---|---|---|
8c15b8c7 TJ |
1 | /* |
2 | The software in this package is distributed under the GNU General | |
3 | Public License version 2 (with a special exception described below). | |
4 | ||
5 | A copy of GNU General Public License (GPL) is included in this distribution, | |
6 | in the file COPYING.GPL. | |
7 | ||
8 | As a special exception, if other files instantiate templates or use macros | |
9 | or inline functions from this file, or you compile this file and link it | |
10 | with other works to produce a work based on this file, this file | |
11 | does not by itself cause the resulting work to be covered | |
12 | by the GNU General Public License. | |
13 | ||
14 | However the source code for this file must still be made available | |
15 | in accordance with section (3) of the GNU General Public License. | |
16 | ||
17 | This exception does not invalidate any other reasons why a work based | |
18 | on this file might be covered by the GNU General Public License. | |
19 | */ | |
5c8a3d40 | 20 | /** @file |
5c8a3d40 | 21 | * @copyright Copyright © 2007-2008 by Intra2net AG |
5c8a3d40 RP |
22 | */ |
23 | ||
24 | //#define NOISEDEBUG | |
25 | ||
42b7c46d | 26 | #include "async_io.hpp" |
c78245a3 | 27 | #include <asyncio_config.hpp> |
5c8a3d40 RP |
28 | |
29 | #include <list> | |
30 | #include <vector> | |
31 | #include <map> | |
32 | #include <algorithm> | |
33 | #include <utility> | |
34 | ||
35 | #include <sys/poll.h> | |
36 | #include <sys/time.h> | |
37 | #include <sys/socket.h> | |
38 | #include <unistd.h> | |
39 | #include <errno.h> | |
40 | #include <fcntl.h> | |
41 | ||
42 | #include <boost/bind.hpp> | |
43 | ||
74786da6 | 44 | #include <asyncio_signalfunc.hpp> |
5c8a3d40 RP |
45 | |
46 | #include <iostream> | |
47 | ||
48 | #ifdef NOISEDEBUG | |
49 | #include <iostream> | |
50 | #include <iomanip> | |
51 | #define DOUT(msg) std::cout << msg << std::endl | |
52 | #define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl | |
53 | //#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl | |
54 | #define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl | |
55 | #else | |
56 | #define DOUT(msg) do {} while (0) | |
57 | #define FODOUT(obj,msg) do {} while (0) | |
58 | #define ODOUT(msg) do {} while (0) | |
59 | #endif | |
60 | ||
aba4c34d | 61 | |
5c8a3d40 RP |
62 | namespace |
63 | { | |
64 | ||
aba4c34d | 65 | using namespace AsyncIo::Utils; |
5c8a3d40 RP |
66 | |
67 | /* | |
68 | * configuration: | |
69 | */ | |
70 | ||
71 | ||
72 | const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min | |
73 | ||
74 | ||
75 | /** | |
76 | * contains internal helper structs and functions for io handling. | |
77 | */ | |
78 | namespace internal_io | |
79 | { | |
80 | ||
81 | /** | |
82 | * extends struct pollfd with some convenience functions | |
83 | */ | |
84 | struct PollFd : public ::pollfd | |
85 | { | |
86 | PollFd() | |
87 | { | |
88 | fd= 0; | |
89 | events= revents= 0; | |
90 | } // eo PollFd | |
91 | ||
92 | ||
93 | /** | |
94 | * initializes the struct with a given file descriptor and clears the event mask(s). | |
74786da6 | 95 | * @param _fd |
5c8a3d40 RP |
96 | */ |
97 | PollFd(int _fd) | |
98 | { | |
99 | fd= _fd; | |
100 | events= revents= 0; | |
101 | } // eo PollFd | |
102 | ||
103 | ||
104 | /** | |
105 | * set that we want to be notified about new incoming data | |
106 | */ | |
107 | void setPOLLIN() { events |= POLLIN; } | |
108 | ||
109 | /** | |
110 | * set that we want to be notified if we can send (more) data. | |
111 | */ | |
112 | void setPOLLOUT() { events |= POLLOUT; } | |
113 | ||
114 | }; // eo struct PollFd | |
115 | ||
116 | ||
117 | typedef std::vector<PollFd> PollVector; | |
118 | typedef std::map<int,PollVector::size_type> FdPollMap; | |
42b7c46d | 119 | typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap; |
5c8a3d40 RP |
120 | |
121 | ||
122 | /** | |
123 | * struct for interfacing our local structures with poll() | |
124 | */ | |
125 | struct PollDataCluster | |
126 | { | |
127 | PollVector m_poll_vector; | |
128 | FdPollMap m_fd_poll_map; | |
129 | FdIOMap m_read_fd_io_map; | |
130 | FdIOMap m_write_fd_io_map; | |
131 | ||
42b7c46d RP |
132 | void add_read_fd( int fd, AsyncIo::IOImplementation* io); |
133 | void add_write_fd( int fd, AsyncIo::IOImplementation* io); | |
5c8a3d40 RP |
134 | |
135 | pollfd* get_pollfd_ptr(); | |
136 | unsigned int get_num_pollfds() const; | |
137 | ||
138 | }; // eo struct PollDataCluster | |
139 | ||
140 | ||
aba4c34d RP |
141 | typedef PtrList< AsyncIo::IOImplementation, true > IOList; |
142 | typedef PtrList< AsyncIo::TimerBase, true > TimerList; | |
5c8a3d40 | 143 | |
aba4c34d RP |
144 | template<> int IOList::GlobalCountType::InstanceCount= 0; |
145 | template<> int TimerList::GlobalCountType::InstanceCount= 0; | |
5c8a3d40 RP |
146 | |
147 | ||
148 | /** | |
149 | * the (internal) global list of io objects (object pointers) | |
150 | */ | |
151 | IOList& g_io_list() | |
152 | { | |
153 | static IOList _the_io_list; | |
154 | return _the_io_list; | |
155 | }; | |
156 | ||
157 | ||
158 | /** | |
159 | * the (internal) global list of timer objects (object pointers) | |
160 | */ | |
161 | TimerList& g_timer_list() | |
162 | { | |
163 | static TimerList _the_timer_list; | |
164 | return _the_timer_list; | |
165 | } | |
166 | ||
167 | /* | |
168 | * implementation of PollDataCluster | |
169 | */ | |
170 | ||
171 | ||
172 | /** | |
173 | * add a new file descriptor to the read list. | |
174 | * | |
175 | * @param fd the file descriptor. | |
176 | * @param io the io object which uses the fd for reading. | |
177 | */ | |
42b7c46d | 178 | void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io) |
5c8a3d40 RP |
179 | { |
180 | FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd); | |
181 | if (itPollMap != m_fd_poll_map.end()) | |
182 | { | |
183 | m_poll_vector[ itPollMap->second ].setPOLLIN(); | |
184 | } | |
185 | else | |
186 | { | |
187 | PollFd item(fd); | |
188 | item.setPOLLIN(); | |
189 | m_fd_poll_map[fd] = m_poll_vector.size(); | |
190 | m_poll_vector.push_back( item ); | |
191 | } | |
192 | m_read_fd_io_map[fd]= io; | |
193 | } // eo PollDataCluster::add_read_fd | |
194 | ||
195 | ||
196 | /** | |
197 | * add a new file descriptor to the write list. | |
198 | * | |
199 | * @param fd the file descriptor. | |
200 | * @param io the io object which uses the fd for writing. | |
201 | */ | |
42b7c46d | 202 | void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io) |
5c8a3d40 RP |
203 | { |
204 | FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd); | |
205 | if (itPollMap != m_fd_poll_map.end()) | |
206 | { | |
207 | m_poll_vector[ itPollMap->second ].setPOLLOUT(); | |
208 | } | |
209 | else | |
210 | { | |
211 | PollFd item(fd); | |
212 | item.setPOLLOUT(); | |
213 | m_fd_poll_map[fd] = m_poll_vector.size(); | |
214 | m_poll_vector.push_back( item ); | |
215 | } | |
216 | m_write_fd_io_map[fd]= io; | |
217 | } // eo PollDataCluster::add_write_fd | |
218 | ||
219 | ||
220 | /** | |
221 | * returns a pointer to a pollfd array; suitable for passing to poll() | |
222 | * | |
223 | * @return pointer to pollfd array | |
224 | */ | |
225 | pollfd* PollDataCluster::get_pollfd_ptr() | |
226 | { | |
227 | return m_poll_vector.empty() ? NULL : &m_poll_vector.front(); | |
228 | } // eo get_pollfd_ptr | |
229 | ||
230 | ||
231 | /** | |
232 | * returns the number of entries in the pollfd array; suitable for passing to poll() | |
233 | * | |
234 | * @return the number of entries in the pollfd array | |
235 | */ | |
236 | unsigned int PollDataCluster::get_num_pollfds() const | |
237 | { | |
238 | return m_poll_vector.size(); | |
239 | } // eo get_num_pollfds | |
240 | ||
241 | ||
242 | ||
243 | } // eo namespace internal_io | |
244 | ||
245 | ||
246 | /* | |
247 | * some internal tool functions and structures | |
248 | */ | |
249 | ||
250 | struct FilterMatch { | |
42b7c46d | 251 | AsyncIo::FilterBasePtr m_filter; |
5c8a3d40 | 252 | |
42b7c46d | 253 | FilterMatch(AsyncIo::FilterBasePtr filter) |
5c8a3d40 RP |
254 | : m_filter(filter) |
255 | {} | |
256 | ||
42b7c46d | 257 | bool operator () (const AsyncIo::FilterBasePtr& item) |
5c8a3d40 RP |
258 | { |
259 | return item && item == m_filter; | |
260 | } | |
261 | ||
262 | }; // eo struct FilterMatch | |
263 | ||
264 | ||
5c8a3d40 RP |
265 | |
266 | } // eo anonymous namespace | |
267 | ||
268 | ||
269 | ||
270 | ||
42b7c46d | 271 | namespace AsyncIo |
5c8a3d40 RP |
272 | { |
273 | ||
274 | ||
5c8a3d40 RP |
275 | /* |
276 | * implementation of TimerBase | |
277 | */ | |
278 | ||
279 | /** | |
280 | * constructor. Adds the object to the internal timer list. | |
281 | */ | |
282 | TimerBase::TimerBase() | |
283 | : m_active(false) | |
284 | , m_marked(false) | |
285 | { | |
286 | internal_io::g_timer_list().add_item(this); | |
287 | } // eo TimerBase::TimerBase | |
288 | ||
289 | ||
290 | /** | |
291 | * destructor. Removes the object from the internal timer list. | |
292 | */ | |
293 | TimerBase::~TimerBase() | |
294 | { | |
295 | ODOUT("enter"); | |
aba4c34d | 296 | if (internal_io::TimerList::Instances()) |
5c8a3d40 RP |
297 | { |
298 | ODOUT("remove from list"); | |
299 | internal_io::g_timer_list().remove_item(this); | |
300 | } | |
301 | } // eo TimerBase::~TimerBase | |
302 | ||
303 | ||
304 | /** | |
305 | * @brief returns the point in time when the time is executed in real time. | |
306 | * @return the point in time when the timer is to be executed. | |
307 | */ | |
308 | MilliTime TimerBase::getRealWhenTime() const | |
309 | { | |
310 | MilliTime mono_time; | |
311 | MilliTime real_time; | |
312 | get_current_monotonic_time(mono_time); | |
313 | get_current_real_time(real_time); | |
314 | MilliTime result= m_when - mono_time + real_time; | |
315 | return result; | |
316 | } // eo TimerBase::getRealWhenTime() const | |
317 | ||
318 | ||
319 | /** | |
320 | * sets the time when the event should be executed. | |
321 | * @param sec the seconds part of the point in time. | |
322 | * @param msec the milliseconds part of the point in time. | |
323 | */ | |
324 | void TimerBase::setWhenTime(long sec, long msec) | |
325 | { | |
326 | m_when.set(sec,msec); | |
327 | m_marked= false; | |
328 | } // eo TimerBase::setWhenTime | |
329 | ||
330 | ||
331 | /** | |
332 | * sets the time when the event should be executed. | |
333 | * @param mt the point in time. | |
334 | */ | |
335 | void TimerBase::setWhenTime(const MilliTime& mt) | |
336 | { | |
337 | m_when= mt; | |
338 | m_marked= false; | |
339 | } // eo TimerBase::setWhenTime | |
340 | ||
341 | ||
342 | /** | |
343 | * sets the time delta measured from current time when the event should be executed. | |
344 | * @param sec the seconds of the time delta | |
345 | * @param msec the milli seconds of the time delta | |
346 | */ | |
347 | void TimerBase::setDeltaWhenTime(long sec, long msec) | |
348 | { | |
349 | setDeltaWhenTime( MilliTime(sec,msec) ); | |
350 | } // eo TimerBase::setWhenTime | |
351 | ||
352 | ||
353 | ||
354 | /** | |
355 | * sets the time delta measured from current time when the event should be executed. | |
356 | * @param mt the time delta | |
357 | */ | |
358 | void TimerBase::setDeltaWhenTime(const MilliTime& mt) | |
359 | { | |
360 | get_current_monotonic_time(m_when); | |
361 | m_when+= mt; | |
362 | m_marked= false; | |
363 | } // eo TimerBase::setWhenTime | |
364 | ||
365 | ||
366 | /** | |
367 | * set the active state of the timer event. | |
368 | * @param active determines if the object should be active (default: yes). | |
369 | */ | |
370 | void TimerBase::activate(bool active) | |
371 | { | |
372 | m_active = active; | |
373 | if (!active) | |
374 | { | |
375 | // clear the mark if we are not active. | |
376 | m_marked= false; | |
377 | } | |
378 | } // eo TimerBase::activate | |
379 | ||
380 | ||
381 | /** @fn void TimerBase::deactivate() | |
382 | * deactivates the event by clearing the active state. | |
383 | */ | |
384 | ||
385 | ||
386 | /** | |
387 | * called when the timer event occured. | |
388 | */ | |
389 | void TimerBase::execute() | |
390 | { | |
391 | } // eo TimerBase::execute | |
392 | ||
393 | ||
394 | /* | |
395 | * implementation of FilterBase class | |
396 | */ | |
397 | ||
398 | ||
399 | FilterBase::FilterBase() | |
400 | : m_io(NULL) | |
401 | { | |
402 | } // eo FilterBase::FilterBase() | |
403 | ||
404 | ||
405 | /** | |
406 | * injects incoming data. | |
407 | * @param data the new data | |
408 | */ | |
409 | void FilterBase::injectIncomingData(const std::string& data) | |
410 | { | |
411 | if (m_io) | |
412 | { | |
413 | FilterBasePtr ptr= get_ptr_as< FilterBase >(); | |
414 | if (ptr) | |
415 | { | |
416 | m_io->injectIncomingData(ptr,data); | |
417 | } | |
418 | } | |
419 | } // FilterBase::injectIncomingData(const std::string&) | |
420 | ||
421 | ||
422 | /** | |
423 | * injects outgoing data. | |
424 | * @param data the new data | |
425 | */ | |
426 | void FilterBase::injectOutgoingData(const std::string& data) | |
427 | { | |
428 | if (m_io) | |
429 | { | |
430 | FilterBasePtr ptr= get_ptr_as< FilterBase >(); | |
431 | if (ptr) | |
432 | { | |
433 | m_io->injectOutgoingData(ptr,data); | |
434 | } | |
435 | } | |
436 | } // eo FilterBase::injectOutgoingData(const std::string&) | |
437 | ||
438 | ||
439 | ||
440 | /** | |
441 | * called when EOF detected on incoming channel (or incoming channel closed) | |
442 | */ | |
443 | void FilterBase::endOfIncomingData() | |
444 | { | |
445 | } // eo FilterBase::endOfIncomingData() | |
446 | ||
447 | /** | |
448 | * called when the filter should reset. | |
449 | * This is used when a new channel is opened or when the filter is taken out of a filter chain. | |
450 | */ | |
451 | void FilterBase::reset() | |
452 | { | |
453 | } // eo FilterBase::reset() | |
454 | ||
455 | ||
456 | /* | |
457 | * implementation of IOImplementation class | |
458 | */ | |
459 | ||
460 | ||
461 | /** | |
462 | * constructor for the base io class. | |
463 | * | |
464 | * Also adds the object to internal list of io objects (which is used by the backend). | |
465 | * | |
466 | * @param read_fd the file descriptor which should be used for reading (default -1 for no value) | |
467 | * @param write_fd the file descriptor which should be used for writing (default -1 for no value) | |
468 | */ | |
469 | IOImplementation::IOImplementation(int read_fd, int write_fd) | |
470 | : m_read_fd(-1) | |
471 | , m_write_fd(-1) | |
472 | , m_eof(false) | |
473 | , m_not_writable(false) | |
474 | , m_input_buffer() | |
475 | , m_output_buffer() | |
476 | , m_marked_for_reading(false) | |
477 | , m_marked_for_writing(false) | |
478 | { | |
479 | internal_io::g_io_list().add_item(this); | |
480 | if (read_fd >= 0) | |
481 | { | |
482 | setReadFd( read_fd ); | |
483 | } | |
484 | if (write_fd >= 0) | |
485 | { | |
486 | setWriteFd( write_fd ); | |
487 | } | |
488 | } // eo IOImplementation::IOImplementation | |
489 | ||
490 | ||
491 | /** | |
492 | * destructor of the base io class. | |
493 | * | |
494 | * Removes the object from the interal list of io objects. | |
495 | */ | |
496 | IOImplementation::~IOImplementation() | |
497 | { | |
498 | close(); | |
aba4c34d | 499 | if (internal_io::IOList::Instances()) |
5c8a3d40 RP |
500 | { |
501 | internal_io::g_io_list().remove_item(this); | |
502 | } | |
503 | // now clear the filters: | |
504 | while (! m_filter_chain.empty() ) | |
505 | { | |
506 | FilterChain::iterator it = m_filter_chain.begin(); | |
507 | (*it)->reset(); | |
508 | (*it)->m_io= NULL; | |
509 | //TODO: signal the filter that it is removed ?! | |
510 | m_filter_chain.erase(it); | |
511 | } | |
512 | } // eo IOImplementation::~IOImplementation | |
513 | ||
514 | ||
515 | /** | |
516 | * adds another filter to the filter chain. | |
517 | * @param filter pointer to the new filter. | |
518 | */ | |
519 | void IOImplementation::addFilter | |
520 | ( | |
521 | FilterBasePtr filter | |
522 | ) | |
523 | { | |
524 | if (!filter) | |
525 | { | |
526 | return; // nothing to do | |
527 | } | |
528 | if (filter->m_io) | |
529 | { | |
530 | filter->m_io->removeFilter(filter); | |
531 | } | |
532 | m_filter_chain.push_back( filter ); | |
533 | } // eo IOImplementation::addFilter | |
534 | ||
535 | ||
536 | /** | |
537 | * removes a filter from the filter chain. | |
538 | * @param filter the pointer to the filter which is removed. | |
539 | * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for | |
540 | * deleting the filter if it was dynamically allocated. | |
541 | */ | |
542 | void IOImplementation::removeFilter | |
543 | ( | |
544 | FilterBasePtr filter | |
545 | ) | |
546 | { | |
547 | FilterChain::iterator it = | |
548 | std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) ); | |
549 | if (it != m_filter_chain.end()) | |
550 | { | |
551 | filter->reset(); | |
552 | filter->m_io= NULL; | |
553 | //TODO: signal the filter that it is removed ?! | |
554 | m_filter_chain.erase(it); | |
555 | } | |
556 | } // eo IOImplementation::removeFilter | |
557 | ||
558 | ||
559 | /** | |
560 | * closes the file descriptors (/ the connection). | |
561 | * | |
562 | * @param direction the direction which should be closed (default: @a Direction::both for all). | |
563 | */ | |
564 | void IOImplementation::close(Direction direction) | |
565 | { | |
566 | bool had_read_fd= (m_read_fd >= 0); | |
567 | m_errno= 0; | |
568 | if (direction == Direction::unspecified) direction= Direction::both; | |
569 | if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0) | |
570 | { // special case: half closed (socket) connections... | |
571 | // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part | |
572 | // (read_fd or write_fd) this class works as desired. | |
573 | switch(direction) | |
574 | { | |
575 | case Direction::in: | |
576 | { | |
577 | int res= ::shutdown(m_read_fd, SHUT_RD); | |
74786da6 | 578 | if (res<0) |
5c8a3d40 RP |
579 | { |
580 | m_errno= errno; | |
581 | } | |
582 | m_read_fd= -1; | |
583 | if (!m_eof) | |
584 | { | |
585 | for(FilterChain::iterator it= m_filter_chain.begin(); | |
586 | it != m_filter_chain.end(); | |
587 | ++it) | |
588 | { | |
589 | (*it)->endOfIncomingData(); | |
590 | } | |
591 | } | |
592 | } | |
593 | return; | |
594 | ||
595 | case Direction::out: | |
596 | { | |
597 | int res= ::shutdown(m_write_fd, SHUT_WR); | |
598 | if (res<0) | |
599 | { | |
600 | m_errno= errno; | |
601 | } | |
602 | m_write_fd= -1; | |
603 | m_output_buffer.clear(); | |
604 | } | |
605 | return; | |
606 | } | |
607 | } | |
608 | if (m_write_fd >= 0 && (direction & Direction::out) ) | |
609 | { | |
610 | int res1 = ::close(m_write_fd); | |
611 | if (m_write_fd == m_read_fd) | |
612 | { | |
613 | m_read_fd= -1; | |
614 | } | |
615 | m_write_fd= -1; | |
616 | m_output_buffer.clear(); | |
617 | if (res1<0) m_errno= errno; | |
618 | } | |
619 | if (m_read_fd >=0 && (direction & Direction::in) ) | |
620 | { | |
621 | int res1 = ::close(m_read_fd); | |
622 | m_read_fd= -1; | |
623 | if (res1<0) m_errno= errno; | |
624 | } | |
625 | if (had_read_fd && !m_eof && (m_read_fd<0)) | |
626 | { | |
627 | for(FilterChain::iterator it= m_filter_chain.begin(); | |
628 | it != m_filter_chain.end(); | |
629 | ++it) | |
630 | { | |
631 | (*it)->endOfIncomingData(); | |
632 | } | |
633 | } | |
634 | } // eo IOImplementation::close | |
635 | ||
636 | ||
637 | /** | |
638 | * determines if the io class wants to read data. | |
639 | * Default implementation checks only for a valid file descriptor value. | |
640 | * | |
641 | * @return @a true if the objects wants to read data | |
642 | */ | |
643 | bool IOImplementation::wantRead() | |
644 | { | |
645 | return (m_read_fd >= 0) && ! m_eof; | |
646 | } // eo IOImplementation::wantRead | |
647 | ||
648 | ||
649 | /** | |
650 | * determines if the io class wants to write data. | |
651 | * Default implementation checks for a valid file descriptor value and if the object | |
652 | * cannot write data immediately. | |
653 | * | |
654 | * @return @a true if the objects wants to write data | |
655 | */ | |
656 | bool IOImplementation::wantWrite() | |
657 | { | |
658 | return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable; | |
659 | } // eo IOImplementation::wantWrite | |
660 | ||
661 | ||
662 | /** | |
663 | * delivers if opened. | |
664 | * The default returns @a true if at least one file descriptor (read or write) is valid. | |
665 | * @return @a true if opened. | |
666 | */ | |
667 | bool IOImplementation::opened() const | |
668 | { | |
669 | return (m_read_fd>=0) || (m_write_fd>=0); | |
670 | } // eo IOImplementation::opened() const | |
671 | ||
672 | ||
673 | /** | |
674 | * returns if the read side detected an end of file (EOF). | |
675 | * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid). | |
676 | */ | |
677 | bool IOImplementation::eof() const | |
678 | { | |
679 | return (m_read_fd < 0) || m_eof; | |
680 | } // eo IOImplementatio::eof() const | |
681 | ||
682 | ||
683 | /** | |
684 | * @brief returns of the write side didn't detect that it cannot write. | |
685 | * @return @a true if we can write. | |
686 | */ | |
687 | bool IOImplementation::writable() const | |
688 | { | |
689 | return (m_write_fd >=0 ) and not m_not_writable; | |
690 | } // eo IOImplementation::writable() const | |
691 | ||
692 | ||
693 | /** | |
694 | * returns if the output buffer is empty. | |
74786da6 | 695 | * @return |
5c8a3d40 RP |
696 | */ |
697 | bool IOImplementation::empty() const | |
698 | { | |
699 | return m_output_buffer.empty(); | |
700 | } // eo IOImplementation::empty | |
701 | ||
702 | /** | |
703 | * puts data into the output buffer and sends it immediately if possible, | |
704 | * | |
705 | * The data is passed through the filter chain before it's stored in the output buffer | |
706 | * (i.e. the output buffer contains data as it should be send directly to the descriptor). | |
707 | * @param _data the data which should be send. | |
708 | */ | |
709 | void IOImplementation::lowSend(const std::string& _data) | |
710 | { | |
711 | std::string data(_data); | |
712 | ||
713 | for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin(); | |
714 | it_filter!= m_filter_chain.rend(); | |
715 | ++it_filter) | |
716 | { | |
717 | data= (*it_filter)->filterOutgoingData(data); | |
718 | } | |
719 | m_output_buffer+= data; | |
720 | ||
721 | // if we can send immediately, do it: | |
722 | if (! m_output_buffer.empty() && m_marked_for_writing) | |
723 | { | |
724 | doWrite(); | |
725 | } | |
726 | } // eo IOImplementation::lowSend | |
727 | ||
728 | ||
729 | /** | |
730 | * called by the backend when there is data to read for this object. | |
731 | * | |
732 | * Reads the data from the connection (read file descriptor) and passes the data through the filter chain. | |
733 | * The final data is appended to the input buffer and the signal @a m_signal_read() is called. | |
734 | * | |
735 | * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called. | |
736 | * | |
737 | * @note overload this method only when You know what You are doing! | |
738 | * (overloading is necessary when handling server sockets.) | |
739 | */ | |
740 | void IOImplementation::doRead() | |
741 | { | |
742 | // static read buffer; should be ok as long as we don't use threads | |
743 | static char buffer[8*1024]; // 8 KiB | |
744 | ||
745 | m_errno = 0; | |
746 | if (m_read_fd<0 || !m_marked_for_reading) | |
747 | { | |
748 | ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading); | |
749 | return; | |
750 | } | |
751 | ||
752 | // reset the mark: | |
753 | m_marked_for_reading = false; | |
754 | ||
755 | // now read the data: | |
756 | ssize_t count; | |
757 | count = ::read(m_read_fd, buffer, sizeof(buffer)); | |
758 | ||
759 | ODOUT("::read -> " << count); | |
760 | ||
761 | // interpret what we got: | |
762 | if (count < 0) // error | |
763 | { | |
764 | m_errno = errno; | |
765 | int fd= m_read_fd; | |
766 | ||
767 | switch(m_errno) | |
768 | { | |
769 | case EINVAL: | |
770 | case EBADF: | |
771 | case ECONNRESET: | |
772 | case ENETRESET: | |
773 | if (fd == m_read_fd) | |
774 | { | |
775 | close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in ); | |
776 | } | |
777 | break; | |
778 | } | |
779 | } | |
780 | else if (count==0) // EOF | |
781 | { | |
782 | // remember the read fd: | |
783 | int fd = m_read_fd; | |
784 | // remember the EOF: | |
785 | m_eof= true; | |
786 | // signal EOF | |
787 | m_signal_eof(); | |
788 | // if the fd is still the same: close it. | |
789 | if (fd == m_read_fd) | |
790 | { | |
791 | close( Direction::in ); | |
792 | } | |
793 | } | |
794 | else // we have valid data | |
795 | { | |
796 | std::string data(buffer,count); | |
797 | ODOUT(" got \"" << data << "\""); | |
798 | for(FilterChain::iterator it_filter= m_filter_chain.begin(); | |
799 | it_filter != m_filter_chain.end(); | |
800 | ++it_filter) | |
801 | { | |
802 | data= (*it_filter)->filterIncomingData(data); | |
803 | } | |
804 | m_input_buffer+= data; | |
805 | m_signal_read(); | |
806 | } | |
807 | } // eo IOImplementation::doRead | |
808 | ||
809 | ||
810 | /** | |
811 | * interface for filter classes to inject data into the filter chain (emulating new incoming data). | |
812 | * @param from_filter the filter which injects the new data. | |
813 | * @param _data the new data. | |
814 | */ | |
815 | void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data) | |
816 | { | |
817 | FilterChain::iterator it_filter = | |
818 | std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) ); | |
819 | if (it_filter == m_filter_chain.end()) | |
820 | { | |
821 | // dont accept data inject from a unknown filter | |
822 | return; | |
823 | } | |
824 | // well: pass the data through the remaining filters: | |
825 | // NOTE: processing is (nearly) the same as in IOImplementation::doRead() | |
826 | std::string data(_data); | |
827 | for(++it_filter; | |
828 | it_filter != m_filter_chain.end(); | |
829 | ++it_filter) | |
830 | { | |
831 | data= (*it_filter)->filterIncomingData(data); | |
832 | } | |
833 | m_input_buffer+= data; | |
834 | m_signal_read(); | |
835 | } // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&) | |
836 | ||
837 | ||
838 | /** | |
839 | * interface for filter classes to inject data into the filter chain (emulating new outgoing data). | |
840 | * @param from_filter the filter which injects the new data. | |
841 | * @param _data the new data. | |
842 | */ | |
843 | void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data) | |
844 | { | |
845 | FilterChain::reverse_iterator it_filter = | |
846 | std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) ); | |
847 | if (it_filter == m_filter_chain.rend()) | |
848 | { | |
849 | // dont accept data inject from a unknown filter | |
850 | return; | |
851 | } | |
852 | // well: pass the data through the remaining filters: | |
853 | // NOTE: processing is (nearly) the same as in IOImplementation::lowSend() | |
854 | std::string data(_data); | |
855 | for(++it_filter; | |
856 | it_filter!= m_filter_chain.rend(); | |
857 | ++it_filter) | |
858 | { | |
859 | data= (*it_filter)->filterOutgoingData(data); | |
860 | } | |
861 | m_output_buffer+= data; | |
862 | ||
863 | // if we can send immediately, do it: | |
864 | if (! m_output_buffer.empty() && m_marked_for_writing) | |
865 | { | |
866 | doWrite(); | |
867 | } | |
868 | } // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&) | |
869 | ||
870 | ||
871 | /** | |
872 | * set the read file descriptor. | |
873 | * Although a derived class can also set the read fd directly; this method should be used | |
874 | * for this task since it updates some flags on the fd for async operation. | |
875 | * @param fd the new read file descriptor. | |
876 | */ | |
877 | void IOImplementation::setReadFd(int fd) | |
878 | { | |
879 | // test if we already have a valid descriptor (and may have to close it): | |
880 | if (m_read_fd >=0 ) | |
881 | { | |
882 | if (m_read_fd == fd) | |
883 | { | |
884 | // fd was already right; consider it to be ok. | |
885 | return; | |
886 | } | |
887 | close(Direction::in); | |
888 | } | |
889 | // reset our errno: | |
890 | m_errno= 0; | |
891 | // if the new descriptor looks valid, set some flags: | |
892 | if (fd >= 0) | |
893 | { | |
894 | long flags= ::fcntl(fd, F_GETFL); | |
895 | if (flags != -1) | |
896 | { | |
897 | // set the flags for non blocking, async operation | |
898 | flags |= O_NONBLOCK|O_ASYNC; | |
899 | ::fcntl(fd,F_SETFL, flags); | |
900 | } | |
901 | else if ( errno == EBADF ) | |
902 | { | |
903 | // well, we seemed to be fed with an invalid descriptor...: | |
904 | m_errno = errno; | |
905 | fd= -1; | |
906 | } | |
907 | } | |
908 | if (fd >= 0) // if still valid: | |
909 | { | |
910 | // set the close-on-exec flag | |
911 | ::fcntl(fd,F_SETFD, FD_CLOEXEC); | |
912 | } | |
913 | m_read_fd= fd; | |
914 | m_marked_for_reading= false; | |
915 | m_eof= false; | |
916 | } // eo IOImplementation::setReadFd(int) | |
917 | ||
918 | ||
919 | ||
920 | /** | |
921 | * set the write file descriptor. | |
922 | * Although a derived class can also set the write fd directly; this method should be used | |
923 | * for this task since it updates some flags on the fd for async operation. | |
924 | * @param fd the new write file descriptor. | |
925 | */ | |
926 | void IOImplementation::setWriteFd(int fd) | |
927 | { | |
928 | if (m_write_fd >=0 ) | |
929 | { | |
930 | if (m_write_fd == fd) | |
931 | { | |
932 | // fd was already right; consider it to be ok. | |
933 | return; | |
934 | } | |
935 | close(Direction::out); | |
936 | } | |
937 | // reset our errno: | |
938 | m_errno= 0; | |
939 | // if the new descriptor looks valid, set some flags: | |
940 | if (fd >= 0) | |
941 | { | |
942 | long flags= ::fcntl(fd, F_GETFL); | |
943 | if (flags != -1) | |
944 | { | |
945 | // set the flags for non blocking, async operation | |
946 | flags |= O_NONBLOCK|O_ASYNC; | |
947 | ::fcntl(fd,F_SETFL, flags); | |
948 | } | |
949 | else if (errno == EBADF) | |
950 | { | |
951 | // well, we seemed to be fed with an invalid descriptor...: | |
952 | m_errno = errno; | |
953 | fd= -1; | |
954 | } | |
955 | } | |
956 | if (fd >= 0) // if still valid: | |
957 | { | |
958 | // set the close-on-exec flag | |
959 | ::fcntl(fd,F_SETFD, FD_CLOEXEC); | |
960 | } | |
961 | m_write_fd = fd; | |
962 | m_marked_for_writing= false; | |
963 | m_not_writable= false; | |
964 | } // eo IOImplementation::setWriteFd(int) | |
965 | ||
966 | ||
967 | ||
968 | /** | |
969 | * called by the backend when this object can write data. | |
970 | * | |
971 | * If some data was sended, the signal @a m_signal_write is called. | |
972 | * | |
973 | * @internal tries to write all buffered data to output; if this succeeds, | |
974 | * the connection is assumed to be still able to accept more data. | |
975 | * (i.e. the internal write mark is kept!) | |
976 | * | |
977 | * @note overload this method only when You know what You are doing! | |
978 | */ | |
979 | void IOImplementation::doWrite() | |
980 | { | |
981 | m_errno = 0; | |
982 | if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty()) | |
983 | { | |
984 | return; | |
985 | } | |
986 | ||
987 | ODOUT("doWrite, d=\"" << m_output_buffer << "\""); | |
988 | ||
989 | //reset mark: | |
990 | m_marked_for_writing= false; | |
991 | ||
992 | // now write the data | |
993 | ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size()); | |
994 | ||
995 | ODOUT("::write -> " << count); | |
996 | ||
997 | if (count < 0) // error | |
998 | { | |
999 | m_errno= errno; | |
1000 | int fd= m_write_fd; | |
1001 | ||
1002 | switch(m_errno) | |
1003 | { | |
1004 | case EPIPE: | |
1005 | m_not_writable= true; | |
1006 | // emit a signal | |
1007 | m_signal_not_writable(); | |
1008 | // fall through | |
1009 | case EINVAL: | |
1010 | case EBADF: | |
1011 | case ECONNRESET: | |
1012 | case ENETRESET: | |
1013 | if (fd == m_write_fd) | |
1014 | { | |
1015 | close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out ); | |
1016 | } | |
1017 | break; | |
1018 | } | |
1019 | } | |
1020 | else | |
1021 | { | |
1022 | m_output_buffer.erase(0, count); | |
1023 | if (m_output_buffer.empty()) | |
1024 | { | |
1025 | // special case: if we were able to send all the data, we keep the write mark: | |
1026 | m_marked_for_writing= true; | |
1027 | } | |
1028 | } | |
1029 | if (count > 0) | |
1030 | { | |
1031 | m_signal_write(); | |
1032 | } | |
1033 | } // eo IOImplementation::doWrite | |
1034 | ||
1035 | ||
1036 | /* | |
1037 | * implementation of SimpleIO | |
1038 | */ | |
1039 | ||
1040 | ||
1041 | SimpleIO::SimpleIO(int read_fd, int write_fd) | |
1042 | : inherited(read_fd, write_fd) | |
1043 | { | |
1044 | m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this)); | |
1045 | } // eo SimpleIO::SimpleIO() | |
1046 | ||
1047 | ||
1048 | SimpleIO::~SimpleIO() | |
1049 | { | |
1050 | } // eo SimpleIO::~SimpleIO() | |
1051 | ||
1052 | ||
1053 | /** | |
1054 | * sends a string. | |
1055 | * @param data the string. | |
1056 | */ | |
1057 | void SimpleIO::sendString(const std::string& data) | |
1058 | { | |
1059 | lowSend(data); | |
1060 | } // eo SimpleIO::sendString(const std::string&) | |
1061 | ||
1062 | ||
1063 | /** | |
1064 | * emits the signal signalReceived with the received data. | |
1065 | * This slot is connected to IOImplementation::m_signal_read. | |
1066 | */ | |
1067 | void SimpleIO::slotReceived() | |
1068 | { | |
1069 | std::string data; | |
1070 | data.swap(m_input_buffer); | |
1071 | signal_received_string(data); | |
1072 | } // eo SimpleIO::slotReceived() | |
1073 | ||
1074 | ||
1075 | ||
1076 | /* | |
1077 | * implementation of SimpleIO2 | |
1078 | */ | |
1079 | ||
1080 | ||
1081 | SimpleIO2::SimpleIO2(int read_fd, int write_fd) | |
1082 | : inherited(read_fd, write_fd) | |
1083 | { | |
1084 | m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this)); | |
1085 | } // eo SimpleIO2::SimpleIO2() | |
1086 | ||
1087 | ||
1088 | SimpleIO2::~SimpleIO2() | |
1089 | { | |
1090 | } // eo SimpleIO2::~SimpleIO2() | |
1091 | ||
1092 | ||
1093 | /** | |
1094 | * sends a string. | |
1095 | * @param data the string. | |
1096 | */ | |
1097 | void SimpleIO2::sendString(const std::string& data) | |
1098 | { | |
1099 | lowSend(data); | |
1100 | } // eo SimpleIO2::sendString(const std::string&) | |
1101 | ||
1102 | ||
1103 | /** | |
1104 | * emits the signal signalReceived with the received data. | |
1105 | * This slot is connected to IOImplementation::m_signal_read. | |
1106 | */ | |
1107 | void SimpleIO2::slotReceived() | |
1108 | { | |
1109 | std::string data; | |
1110 | data.swap(m_input_buffer); | |
1111 | signal_received_string(data); | |
1112 | } // eo SimpleIO2::slotReceived() | |
1113 | ||
1114 | ||
1115 | ||
1116 | /* | |
1117 | * implementation of class Backend (singleton) | |
1118 | */ | |
1119 | ||
1120 | Backend* Backend::g_backend= NULL; | |
1121 | ||
1122 | int Backend::m_count_active_steps=0; | |
1123 | ||
1124 | ||
1125 | Backend::Backend() | |
1126 | : m_count_active_loops(0) | |
1127 | , m_count_stop_requests(0) | |
1128 | { | |
74786da6 | 1129 | SystemTools::ignore_signal( SystemTools::Signal::PIPE ); |
5c8a3d40 RP |
1130 | } // eo Backend::Backend |
1131 | ||
1132 | ||
1133 | Backend::~Backend() | |
1134 | { | |
1135 | SystemTools::restore_signal_handler( SystemTools::Signal::PIPE ); | |
1136 | } // eo Backend::~Backend() | |
1137 | ||
1138 | /** | |
1139 | * delivers pointer to the current backend, instantiating a new backend if there was no current one. | |
1140 | * | |
1141 | * This should be the only way to access the backend which should be a singleton. | |
1142 | * | |
1143 | * @return the pointer to the current backend. | |
1144 | */ | |
1145 | Backend* Backend::getBackend() | |
1146 | { | |
1147 | if (!g_backend) | |
1148 | { | |
1149 | g_backend = new Backend(); | |
1150 | } | |
1151 | return g_backend; | |
1152 | } // eo Backend::getBackend | |
1153 | ||
1154 | ||
1155 | ||
1156 | ||
1157 | /** | |
1158 | * performs one backend cycle. | |
1159 | * | |
1160 | * Collects all file descriptors from the active io objects which should be selected for reading and/or writing. | |
1161 | * Also determines the timer events which become due and adjusts the timeout. | |
1162 | * Constructs the necessary structures and calls poll(). | |
1163 | * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events) | |
1164 | * | |
1165 | * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured. | |
1166 | * @return @a true if there was at least one active object; otherwise @a false | |
1167 | * | |
1168 | * @note this method is a little beast. | |
1169 | * | |
74786da6 | 1170 | * @internal |
5c8a3d40 RP |
1171 | * The cycle is divided into four steps: collecting; poll; mark and execute. |
1172 | * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage | |
1173 | * are calling @a Backup::doOneStep or open their own local backend loop. | |
1174 | * | |
1175 | * @todo handle some more error cases. | |
1176 | * @todo provide a plugin interface for external handler. | |
1177 | * (currently inclusion of external handler is possible by (ab)using timer classes) | |
1178 | */ | |
1179 | bool Backend::doOneStep(int timeout) | |
1180 | { | |
1181 | ODOUT( "timeout=" << timeout ); | |
1182 | internal_io::PollDataCluster poll_data; | |
1183 | bool had_active_object = false; | |
1184 | ||
1185 | ++m_count_active_steps; | |
1186 | ||
1187 | try { | |
1188 | ||
1189 | FdSetType local_read_fds; | |
1190 | FdSetType local_write_fds; | |
1191 | ||
1192 | // step 1 ; collect | |
1193 | ||
1194 | { // step 1.1: collect fds for read/write operations | |
1195 | for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin(); | |
1196 | itIOList != internal_io::g_io_list().end(); | |
1197 | ++itIOList) | |
1198 | { | |
1199 | if (! *itIOList) continue; // skip NULL entries | |
1200 | int read_fd = (*itIOList)->m_read_fd; | |
1201 | int write_fd = (*itIOList)->m_write_fd; | |
1202 | bool want_read = (read_fd >= 0) and (*itIOList)->wantRead(); | |
1203 | bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite(); | |
1204 | if (read_fd >= 0 ) | |
1205 | { | |
1206 | local_read_fds.insert( read_fd ); | |
1207 | } | |
1208 | if (write_fd >= 0) | |
1209 | { | |
1210 | local_write_fds.insert( write_fd ); | |
1211 | } | |
1212 | if (!want_read && !want_write) continue; | |
1213 | if (want_read) | |
1214 | { | |
1215 | FODOUT( (*itIOList), "wants to read (fd=" << read_fd << ")"); | |
1216 | poll_data.add_read_fd(read_fd, *itIOList); | |
1217 | } | |
1218 | if (want_write) | |
1219 | { | |
1220 | FODOUT( (*itIOList), "wants to write (fd=" << write_fd << ")"); | |
1221 | poll_data.add_write_fd(write_fd, *itIOList); | |
1222 | } | |
1223 | had_active_object= true; | |
1224 | } | |
1225 | } | |
1226 | ||
1227 | { // step 1.2: collect timer events | |
1228 | MilliTime current_time; | |
1229 | MilliTime min_event_time; | |
1230 | ||
1231 | get_current_monotonic_time(current_time); | |
1232 | bool min_event_time_set; | |
1233 | ||
1234 | if (timeout >= 0) | |
1235 | { | |
1236 | min_event_time = current_time + MilliTime(0,timeout); | |
1237 | min_event_time_set= true; | |
1238 | } | |
1239 | else | |
1240 | { | |
1241 | min_event_time = current_time + MilliTime(86400,0); | |
1242 | min_event_time_set= false; | |
1243 | } | |
1244 | // TODO | |
1245 | ||
1246 | for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin(); | |
74786da6 | 1247 | it_timer != internal_io::g_timer_list().end() |
5c8a3d40 RP |
1248 | && (!had_active_object || !min_event_time_set || current_time < min_event_time); |
1249 | ++ it_timer) | |
1250 | { | |
1251 | if (! *it_timer) continue; // skip NULL entries | |
1252 | if (! (*it_timer)->m_active) continue; // skip if not enabled | |
1253 | if ( !min_event_time_set || (*it_timer)->m_when < min_event_time) | |
1254 | { | |
1255 | min_event_time = (*it_timer)->m_when; | |
1256 | min_event_time_set= true; | |
1257 | } | |
1258 | had_active_object= true; | |
1259 | } | |
1260 | ||
1261 | if (min_event_time_set) | |
1262 | { // we have at a minimal event time, so (re)compute the timeout value: | |
1263 | MilliTime delta= (min_event_time - current_time); | |
1264 | long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h | |
1265 | if (delta_ms <= 0L) | |
1266 | { | |
1267 | timeout= 0L; | |
1268 | } | |
1269 | else | |
1270 | { | |
1271 | timeout= delta_ms + (delta_ms<5 ? 1 : 3); | |
1272 | } | |
1273 | } | |
1274 | } | |
1275 | ||
1276 | // step 2 : poll | |
1277 | ODOUT(" poll timeout is " << timeout); | |
1278 | { | |
1279 | MilliTime current_time; | |
1280 | get_current_monotonic_time(current_time); | |
1281 | ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec); | |
1282 | } | |
1283 | int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout); | |
1284 | ||
1285 | ODOUT("poll -> " << poll_result); | |
1286 | { | |
1287 | MilliTime current_time; | |
1288 | get_current_monotonic_time(current_time); | |
1289 | ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec); | |
1290 | } | |
1291 | ||
1292 | if (poll_result < 0) | |
1293 | { | |
1294 | //TODO poll error handling (signals ?!) | |
1295 | } | |
1296 | ||
1297 | // step 3 : mark | |
1298 | ||
1299 | // step 3.1: mark io objects (if necessary) | |
1300 | if (poll_result > 0) | |
1301 | { | |
1302 | for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin(); | |
1303 | itPollItem != poll_data.m_poll_vector.end(); | |
1304 | ++itPollItem) | |
1305 | { | |
1306 | ODOUT(" fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents); | |
1307 | if ( 0 == (itPollItem->revents)) | |
1308 | { // preliminary continuation if nothing is to handle for this item(/fd)... | |
1309 | continue; | |
1310 | } | |
1311 | if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP))) | |
1312 | { | |
1313 | IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ]; | |
1314 | if (io && io->m_read_fd==itPollItem->fd) | |
1315 | { | |
1316 | FODOUT(io,"marked for reading"); | |
1317 | io->m_marked_for_reading= true; | |
1318 | } | |
1319 | } | |
1320 | if ( 0!= (itPollItem->revents & POLLOUT)) | |
1321 | { | |
1322 | IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ]; | |
1323 | if (io && io->m_write_fd==itPollItem->fd) | |
1324 | { | |
1325 | io->m_marked_for_writing= true; | |
1326 | } | |
1327 | } | |
1328 | if ( 0!= (itPollItem->revents & POLLERR)) | |
1329 | { | |
1330 | IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ]; | |
1331 | if (0!= (itPollItem->events & POLLOUT)) | |
1332 | { | |
1333 | if (io && io->m_write_fd==itPollItem->fd) | |
1334 | { | |
1335 | io->m_marked_for_writing= false; | |
1336 | //io->close( Direction::out ); | |
1337 | } | |
1338 | } | |
1339 | } | |
1340 | // TODO error handling (POLLERR, POLLHUP, POLLNVAL) | |
1341 | } | |
1342 | } | |
1343 | ||
1344 | //Step 3.2: mark timer objects | |
1345 | { | |
1346 | MilliTime current_time; | |
1347 | ||
1348 | get_current_monotonic_time(current_time); | |
1349 | ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec); | |
1350 | ||
1351 | for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin(); | |
1352 | it_timer != internal_io::g_timer_list().end(); | |
1353 | ++ it_timer) | |
1354 | { | |
1355 | ODOUT(" check timer " << *it_timer); | |
1356 | if (! *it_timer) continue; // skip NULL entries | |
1357 | if (! (*it_timer)->m_active) continue; // skip if not enabled | |
1358 | if ( (*it_timer)->m_when <= current_time) | |
1359 | { | |
1360 | ODOUT(" ==> MARK"); | |
1361 | (*it_timer)->m_marked = true; | |
1362 | } | |
1363 | } | |
1364 | } | |
1365 | ||
1366 | ||
1367 | // step 4 : execute | |
1368 | ||
1369 | // step 4.1: execute io | |
1370 | ODOUT("execute stage"); | |
1371 | for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin(); | |
1372 | it_io != internal_io::g_io_list().end(); | |
1373 | ++ it_io) | |
1374 | { | |
1375 | ODOUT(" check obj " << *it_io); | |
1376 | if (NULL == *it_io) continue; | |
1377 | if ((*it_io)->m_marked_for_writing) | |
1378 | { | |
1379 | FODOUT((*it_io),"exec doWrite"); | |
1380 | (*it_io)->doWrite(); | |
1381 | if ((*it_io) == NULL) continue; // skip remaining if we lost the object | |
1382 | if ((*it_io)->m_errno) | |
1383 | { | |
1384 | continue; | |
1385 | } | |
1386 | } | |
1387 | if ((*it_io)->m_marked_for_reading) | |
1388 | { | |
1389 | FODOUT((*it_io),"exec doRead"); | |
1390 | (*it_io)->doRead(); | |
1391 | if ((*it_io) == NULL) continue; // skip remaining if we lost the object | |
1392 | if ((*it_io)->m_errno) | |
1393 | { | |
1394 | continue; | |
1395 | } | |
1396 | } | |
1397 | } | |
1398 | ||
1399 | // step 4.2: execute timer events | |
1400 | { | |
1401 | for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin(); | |
1402 | it_timer != internal_io::g_timer_list().end(); | |
1403 | ++ it_timer) | |
1404 | { | |
1405 | if (! *it_timer) continue; // skip NULL entries | |
1406 | if (! (*it_timer)->m_active) continue; // skip if not enabled | |
1407 | if (! (*it_timer)->m_marked) continue; // skip if not marked | |
1408 | ||
1409 | // reset the mark and deactivate object now since the execute() method might activate it again | |
1410 | (*it_timer)->m_marked= false; | |
1411 | (*it_timer)->m_active= false; | |
1412 | ||
1413 | // now execute the event: | |
1414 | (*it_timer)->execute(); | |
1415 | } | |
1416 | } | |
1417 | ||
1418 | } // eo try | |
1419 | catch(...) | |
1420 | { | |
1421 | // clean up our counter | |
1422 | --m_count_active_steps; | |
1423 | // and forward the exception | |
1424 | throw; | |
1425 | } | |
1426 | ||
1427 | if ( 0 == --m_count_active_steps) | |
1428 | { | |
1429 | internal_io::g_io_list().clean_list(); | |
1430 | internal_io::g_timer_list().clean_list(); | |
1431 | } | |
1432 | ||
1433 | return had_active_object; | |
1434 | } // eo Backend::doOneStep | |
1435 | ||
1436 | ||
1437 | /** | |
1438 | * enters a backend loop. | |
1439 | * | |
1440 | * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more | |
1441 | * active objects (io objects or timer objects). | |
1442 | */ | |
1443 | void Backend::run() | |
1444 | { | |
1445 | ++m_count_active_loops; | |
1446 | do | |
1447 | { | |
1448 | try | |
1449 | { | |
1450 | if (!doOneStep(c_max_poll_wait)) | |
1451 | { | |
1452 | // stop if there are no more active objects. | |
1453 | stop(); | |
1454 | } | |
1455 | } | |
1456 | catch(...) | |
1457 | { | |
1458 | // clean up our counter | |
1459 | --m_count_active_loops; | |
1460 | // and forward the exception | |
1461 | throw; | |
1462 | } | |
74786da6 | 1463 | } |
5c8a3d40 RP |
1464 | while (0 == m_count_stop_requests); |
1465 | --m_count_active_loops; | |
1466 | --m_count_stop_requests; | |
1467 | } // eo Backend::run | |
1468 | ||
1469 | ||
1470 | /** | |
1471 | * @brief stops the latest loop currently run by Backend::run(). | |
1472 | * @see Backend::run() | |
1473 | */ | |
1474 | void Backend::stop() | |
1475 | { | |
1476 | if (m_count_active_loops) | |
1477 | { | |
1478 | ++m_count_stop_requests; | |
1479 | } | |
1480 | } // eo Backend::stop() | |
1481 | ||
1482 | ||
1483 | ||
42b7c46d | 1484 | } // eo namespace AsyncIo |