Implement getline() for IOImplementation.
[libasyncio] / asyncio / async_io.cpp
CommitLineData
8c15b8c7
TJ
1/*
2The software in this package is distributed under the GNU General
3Public License version 2 (with a special exception described below).
4
5A copy of GNU General Public License (GPL) is included in this distribution,
6in the file COPYING.GPL.
7
8As a special exception, if other files instantiate templates or use macros
9or inline functions from this file, or you compile this file and link it
10with other works to produce a work based on this file, this file
11does not by itself cause the resulting work to be covered
12by the GNU General Public License.
13
14However the source code for this file must still be made available
15in accordance with section (3) of the GNU General Public License.
16
17This exception does not invalidate any other reasons why a work based
18on this file might be covered by the GNU General Public License.
19*/
5c8a3d40 20/** @file
5c8a3d40 21 * @copyright Copyright © 2007-2008 by Intra2net AG
5c8a3d40
RP
22 */
23
24//#define NOISEDEBUG
25
42b7c46d 26#include "async_io.hpp"
c78245a3 27#include <asyncio_config.hpp>
5c8a3d40
RP
28
29#include <list>
30#include <vector>
31#include <map>
32#include <algorithm>
33#include <utility>
34
35#include <sys/poll.h>
36#include <sys/time.h>
37#include <sys/socket.h>
38#include <unistd.h>
39#include <errno.h>
40#include <fcntl.h>
41
42#include <boost/bind.hpp>
43
74786da6 44#include <asyncio_signalfunc.hpp>
5c8a3d40
RP
45
46#include <iostream>
47
48#ifdef NOISEDEBUG
49#include <iostream>
50#include <iomanip>
51#define DOUT(msg) std::cout << msg << std::endl
52#define FODOUT(obj,msg) std::cout << typeid(*obj).name() << "[" << obj << "]:" << msg << std::endl
53//#define ODOUT(msg) std::cout << typeid(*this).name() << "[" << this << "]:" << msg << std::endl
54#define ODOUT(msg) std::cout << __PRETTY_FUNCTION__ << "[" << this << "]:" << msg << std::endl
55#else
56#define DOUT(msg) do {} while (0)
57#define FODOUT(obj,msg) do {} while (0)
58#define ODOUT(msg) do {} while (0)
59#endif
60
aba4c34d 61
5c8a3d40
RP
62namespace
63{
64
aba4c34d 65using namespace AsyncIo::Utils;
5c8a3d40
RP
66
67/*
68 * configuration:
69 */
70
71
72const int c_max_poll_wait= 10*60*1000; // maximal poll wait (while in backend loop): 10 min
73
74
75/**
76 * contains internal helper structs and functions for io handling.
77 */
78namespace internal_io
79{
80
81/**
82 * extends struct pollfd with some convenience functions
83 */
84struct PollFd : public ::pollfd
85{
86 PollFd()
87 {
88 fd= 0;
89 events= revents= 0;
90 } // eo PollFd
91
92
93 /**
94 * initializes the struct with a given file descriptor and clears the event mask(s).
74786da6 95 * @param _fd
5c8a3d40
RP
96 */
97 PollFd(int _fd)
98 {
99 fd= _fd;
100 events= revents= 0;
101 } // eo PollFd
102
103
104 /**
105 * set that we want to be notified about new incoming data
106 */
107 void setPOLLIN() { events |= POLLIN; }
108
109 /**
110 * set that we want to be notified if we can send (more) data.
111 */
112 void setPOLLOUT() { events |= POLLOUT; }
113
114}; // eo struct PollFd
115
116
117typedef std::vector<PollFd> PollVector;
118typedef std::map<int,PollVector::size_type> FdPollMap;
42b7c46d 119typedef std::map<int,AsyncIo::IOImplementation*> FdIOMap;
5c8a3d40
RP
120
121
122/**
123 * struct for interfacing our local structures with poll()
124 */
125struct PollDataCluster
126{
127 PollVector m_poll_vector;
128 FdPollMap m_fd_poll_map;
129 FdIOMap m_read_fd_io_map;
130 FdIOMap m_write_fd_io_map;
131
42b7c46d
RP
132 void add_read_fd( int fd, AsyncIo::IOImplementation* io);
133 void add_write_fd( int fd, AsyncIo::IOImplementation* io);
5c8a3d40
RP
134
135 pollfd* get_pollfd_ptr();
136 unsigned int get_num_pollfds() const;
137
138}; // eo struct PollDataCluster
139
140
aba4c34d
RP
141typedef PtrList< AsyncIo::IOImplementation, true > IOList;
142typedef PtrList< AsyncIo::TimerBase, true > TimerList;
5c8a3d40 143
aba4c34d
RP
144template<> int IOList::GlobalCountType::InstanceCount= 0;
145template<> int TimerList::GlobalCountType::InstanceCount= 0;
5c8a3d40
RP
146
147
148/**
149 * the (internal) global list of io objects (object pointers)
150 */
151IOList& g_io_list()
152{
153 static IOList _the_io_list;
154 return _the_io_list;
155};
156
157
158/**
159 * the (internal) global list of timer objects (object pointers)
160 */
161TimerList& g_timer_list()
162{
163 static TimerList _the_timer_list;
164 return _the_timer_list;
165}
166
167/*
168 * implementation of PollDataCluster
169 */
170
171
172/**
173 * add a new file descriptor to the read list.
174 *
175 * @param fd the file descriptor.
176 * @param io the io object which uses the fd for reading.
177 */
42b7c46d 178void PollDataCluster::add_read_fd( int fd, AsyncIo::IOImplementation* io)
5c8a3d40
RP
179{
180 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
181 if (itPollMap != m_fd_poll_map.end())
182 {
183 m_poll_vector[ itPollMap->second ].setPOLLIN();
184 }
185 else
186 {
187 PollFd item(fd);
188 item.setPOLLIN();
189 m_fd_poll_map[fd] = m_poll_vector.size();
190 m_poll_vector.push_back( item );
191 }
192 m_read_fd_io_map[fd]= io;
193} // eo PollDataCluster::add_read_fd
194
195
196/**
197 * add a new file descriptor to the write list.
198 *
199 * @param fd the file descriptor.
200 * @param io the io object which uses the fd for writing.
201 */
42b7c46d 202void PollDataCluster::add_write_fd( int fd, AsyncIo::IOImplementation* io)
5c8a3d40
RP
203{
204 FdPollMap::iterator itPollMap = m_fd_poll_map.find(fd);
205 if (itPollMap != m_fd_poll_map.end())
206 {
207 m_poll_vector[ itPollMap->second ].setPOLLOUT();
208 }
209 else
210 {
211 PollFd item(fd);
212 item.setPOLLOUT();
213 m_fd_poll_map[fd] = m_poll_vector.size();
214 m_poll_vector.push_back( item );
215 }
216 m_write_fd_io_map[fd]= io;
217} // eo PollDataCluster::add_write_fd
218
219
220/**
221 * returns a pointer to a pollfd array; suitable for passing to poll()
222 *
223 * @return pointer to pollfd array
224 */
225pollfd* PollDataCluster::get_pollfd_ptr()
226{
227 return m_poll_vector.empty() ? NULL : &m_poll_vector.front();
228} // eo get_pollfd_ptr
229
230
231/**
232 * returns the number of entries in the pollfd array; suitable for passing to poll()
233 *
234 * @return the number of entries in the pollfd array
235 */
236unsigned int PollDataCluster::get_num_pollfds() const
237{
238 return m_poll_vector.size();
239} // eo get_num_pollfds
240
241
242
243} // eo namespace internal_io
244
245
246/*
247 * some internal tool functions and structures
248 */
249
250struct FilterMatch {
42b7c46d 251 AsyncIo::FilterBasePtr m_filter;
5c8a3d40 252
42b7c46d 253 FilterMatch(AsyncIo::FilterBasePtr filter)
5c8a3d40
RP
254 : m_filter(filter)
255 {}
256
42b7c46d 257 bool operator () (const AsyncIo::FilterBasePtr& item)
5c8a3d40
RP
258 {
259 return item && item == m_filter;
260 }
261
262}; // eo struct FilterMatch
263
264
5c8a3d40
RP
265
266} // eo anonymous namespace
267
268
269
270
42b7c46d 271namespace AsyncIo
5c8a3d40
RP
272{
273
274
5c8a3d40
RP
275/*
276 * implementation of TimerBase
277 */
278
279/**
280 * constructor. Adds the object to the internal timer list.
281 */
282TimerBase::TimerBase()
283: m_active(false)
284, m_marked(false)
285{
286 internal_io::g_timer_list().add_item(this);
287} // eo TimerBase::TimerBase
288
289
290/**
291 * destructor. Removes the object from the internal timer list.
292 */
293TimerBase::~TimerBase()
294{
295 ODOUT("enter");
aba4c34d 296 if (internal_io::TimerList::Instances())
5c8a3d40
RP
297 {
298 ODOUT("remove from list");
299 internal_io::g_timer_list().remove_item(this);
300 }
301} // eo TimerBase::~TimerBase
302
303
304/**
305 * @brief returns the point in time when the time is executed in real time.
306 * @return the point in time when the timer is to be executed.
307 */
308MilliTime TimerBase::getRealWhenTime() const
309{
310 MilliTime mono_time;
311 MilliTime real_time;
312 get_current_monotonic_time(mono_time);
313 get_current_real_time(real_time);
314 MilliTime result= m_when - mono_time + real_time;
315 return result;
316} // eo TimerBase::getRealWhenTime() const
317
318
319/**
320 * sets the time when the event should be executed.
321 * @param sec the seconds part of the point in time.
322 * @param msec the milliseconds part of the point in time.
323 */
324void TimerBase::setWhenTime(long sec, long msec)
325{
326 m_when.set(sec,msec);
327 m_marked= false;
328} // eo TimerBase::setWhenTime
329
330
331/**
332 * sets the time when the event should be executed.
333 * @param mt the point in time.
334 */
335void TimerBase::setWhenTime(const MilliTime& mt)
336{
337 m_when= mt;
338 m_marked= false;
339} // eo TimerBase::setWhenTime
340
341
342/**
343 * sets the time delta measured from current time when the event should be executed.
344 * @param sec the seconds of the time delta
345 * @param msec the milli seconds of the time delta
346 */
347void TimerBase::setDeltaWhenTime(long sec, long msec)
348{
349 setDeltaWhenTime( MilliTime(sec,msec) );
350} // eo TimerBase::setWhenTime
351
352
353
354/**
355 * sets the time delta measured from current time when the event should be executed.
356 * @param mt the time delta
357 */
358void TimerBase::setDeltaWhenTime(const MilliTime& mt)
359{
360 get_current_monotonic_time(m_when);
361 m_when+= mt;
362 m_marked= false;
363} // eo TimerBase::setWhenTime
364
365
366/**
367 * set the active state of the timer event.
368 * @param active determines if the object should be active (default: yes).
369 */
370void TimerBase::activate(bool active)
371{
372 m_active = active;
373 if (!active)
374 {
375 // clear the mark if we are not active.
376 m_marked= false;
377 }
378} // eo TimerBase::activate
379
380
381/** @fn void TimerBase::deactivate()
382 * deactivates the event by clearing the active state.
383 */
384
385
386/**
387 * called when the timer event occured.
388 */
389void TimerBase::execute()
390{
391} // eo TimerBase::execute
392
393
394/*
395 * implementation of FilterBase class
396 */
397
398
399FilterBase::FilterBase()
400: m_io(NULL)
401{
402} // eo FilterBase::FilterBase()
403
404
405/**
406 * injects incoming data.
407 * @param data the new data
408 */
409void FilterBase::injectIncomingData(const std::string& data)
410{
411 if (m_io)
412 {
413 FilterBasePtr ptr= get_ptr_as< FilterBase >();
414 if (ptr)
415 {
416 m_io->injectIncomingData(ptr,data);
417 }
418 }
419} // FilterBase::injectIncomingData(const std::string&)
420
421
422/**
423 * injects outgoing data.
424 * @param data the new data
425 */
426void FilterBase::injectOutgoingData(const std::string& data)
427{
428 if (m_io)
429 {
430 FilterBasePtr ptr= get_ptr_as< FilterBase >();
431 if (ptr)
432 {
433 m_io->injectOutgoingData(ptr,data);
434 }
435 }
436} // eo FilterBase::injectOutgoingData(const std::string&)
437
438
439
440/**
441 * called when EOF detected on incoming channel (or incoming channel closed)
442 */
443void FilterBase::endOfIncomingData()
444{
445} // eo FilterBase::endOfIncomingData()
446
447/**
448 * called when the filter should reset.
449 * This is used when a new channel is opened or when the filter is taken out of a filter chain.
450 */
451void FilterBase::reset()
452{
453} // eo FilterBase::reset()
454
455
456/*
457 * implementation of IOImplementation class
458 */
459
460
461/**
462 * constructor for the base io class.
463 *
464 * Also adds the object to internal list of io objects (which is used by the backend).
465 *
466 * @param read_fd the file descriptor which should be used for reading (default -1 for no value)
467 * @param write_fd the file descriptor which should be used for writing (default -1 for no value)
468 */
469IOImplementation::IOImplementation(int read_fd, int write_fd)
470: m_read_fd(-1)
471, m_write_fd(-1)
472, m_eof(false)
473, m_not_writable(false)
474, m_input_buffer()
475, m_output_buffer()
476, m_marked_for_reading(false)
477, m_marked_for_writing(false)
478{
479 internal_io::g_io_list().add_item(this);
480 if (read_fd >= 0)
481 {
482 setReadFd( read_fd );
483 }
484 if (write_fd >= 0)
485 {
486 setWriteFd( write_fd );
487 }
488} // eo IOImplementation::IOImplementation
489
490
491/**
492 * destructor of the base io class.
493 *
494 * Removes the object from the interal list of io objects.
495 */
496IOImplementation::~IOImplementation()
497{
498 close();
aba4c34d 499 if (internal_io::IOList::Instances())
5c8a3d40
RP
500 {
501 internal_io::g_io_list().remove_item(this);
502 }
503 // now clear the filters:
504 while (! m_filter_chain.empty() )
505 {
506 FilterChain::iterator it = m_filter_chain.begin();
507 (*it)->reset();
508 (*it)->m_io= NULL;
509 //TODO: signal the filter that it is removed ?!
510 m_filter_chain.erase(it);
511 }
512} // eo IOImplementation::~IOImplementation
513
514
515/**
516 * adds another filter to the filter chain.
517 * @param filter pointer to the new filter.
518 */
519void IOImplementation::addFilter
520(
521 FilterBasePtr filter
522)
523{
524 if (!filter)
525 {
526 return; // nothing to do
527 }
528 if (filter->m_io)
529 {
530 filter->m_io->removeFilter(filter);
531 }
532 m_filter_chain.push_back( filter );
533} // eo IOImplementation::addFilter
534
535
536/**
537 * removes a filter from the filter chain.
538 * @param filter the pointer to the filter which is removed.
539 * @note if the filter is removed the class gives away the ownership; i.,e. the caller is responsible for
540 * deleting the filter if it was dynamically allocated.
541 */
542void IOImplementation::removeFilter
543(
544 FilterBasePtr filter
545)
546{
547 FilterChain::iterator it =
548 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(filter) );
549 if (it != m_filter_chain.end())
550 {
551 filter->reset();
552 filter->m_io= NULL;
553 //TODO: signal the filter that it is removed ?!
554 m_filter_chain.erase(it);
555 }
556} // eo IOImplementation::removeFilter
557
558
559/**
560 * closes the file descriptors (/ the connection).
561 *
562 * @param direction the direction which should be closed (default: @a Direction::both for all).
563 */
564void IOImplementation::close(Direction direction)
565{
566 bool had_read_fd= (m_read_fd >= 0);
567 m_errno= 0;
568 if (direction == Direction::unspecified) direction= Direction::both;
569 if (direction != Direction::both && m_read_fd==m_write_fd && m_read_fd>=0)
570 { // special case: half closed (socket) connections...
571 // NOTE: for file descriptors m_errno will set to ENOTSOCK, but since we "forget" the desired part
572 // (read_fd or write_fd) this class works as desired.
573 switch(direction)
574 {
575 case Direction::in:
576 {
577 int res= ::shutdown(m_read_fd, SHUT_RD);
74786da6 578 if (res<0)
5c8a3d40
RP
579 {
580 m_errno= errno;
581 }
582 m_read_fd= -1;
583 if (!m_eof)
584 {
585 for(FilterChain::iterator it= m_filter_chain.begin();
586 it != m_filter_chain.end();
587 ++it)
588 {
589 (*it)->endOfIncomingData();
590 }
591 }
592 }
593 return;
594
595 case Direction::out:
596 {
597 int res= ::shutdown(m_write_fd, SHUT_WR);
598 if (res<0)
599 {
600 m_errno= errno;
601 }
602 m_write_fd= -1;
603 m_output_buffer.clear();
604 }
605 return;
606 }
607 }
608 if (m_write_fd >= 0 && (direction & Direction::out) )
609 {
610 int res1 = ::close(m_write_fd);
611 if (m_write_fd == m_read_fd)
612 {
613 m_read_fd= -1;
614 }
615 m_write_fd= -1;
616 m_output_buffer.clear();
617 if (res1<0) m_errno= errno;
618 }
619 if (m_read_fd >=0 && (direction & Direction::in) )
620 {
621 int res1 = ::close(m_read_fd);
622 m_read_fd= -1;
623 if (res1<0) m_errno= errno;
624 }
625 if (had_read_fd && !m_eof && (m_read_fd<0))
626 {
627 for(FilterChain::iterator it= m_filter_chain.begin();
628 it != m_filter_chain.end();
629 ++it)
630 {
631 (*it)->endOfIncomingData();
632 }
633 }
634} // eo IOImplementation::close
635
636
637/**
638 * determines if the io class wants to read data.
639 * Default implementation checks only for a valid file descriptor value.
640 *
641 * @return @a true if the objects wants to read data
642 */
643bool IOImplementation::wantRead()
644{
645 return (m_read_fd >= 0) && ! m_eof;
646} // eo IOImplementation::wantRead
647
648
649/**
650 * determines if the io class wants to write data.
651 * Default implementation checks for a valid file descriptor value and if the object
652 * cannot write data immediately.
653 *
654 * @return @a true if the objects wants to write data
655 */
656bool IOImplementation::wantWrite()
657{
658 return (m_write_fd >= 0) && ! m_marked_for_writing && ! m_not_writable;
659} // eo IOImplementation::wantWrite
660
661
662/**
663 * delivers if opened.
664 * The default returns @a true if at least one file descriptor (read or write) is valid.
665 * @return @a true if opened.
666 */
667bool IOImplementation::opened() const
668{
669 return (m_read_fd>=0) || (m_write_fd>=0);
670} // eo IOImplementation::opened() const
671
672
673/**
674 * returns if the read side detected an end of file (EOF).
675 * @return @a true if end of file was detected on read file descriptor (or read file descriptor isn't valid).
676 */
677bool IOImplementation::eof() const
678{
679 return (m_read_fd < 0) || m_eof;
680} // eo IOImplementatio::eof() const
681
682
683/**
684 * @brief returns of the write side didn't detect that it cannot write.
685 * @return @a true if we can write.
686 */
687bool IOImplementation::writable() const
688{
689 return (m_write_fd >=0 ) and not m_not_writable;
690} // eo IOImplementation::writable() const
691
692
693/**
694 * returns if the output buffer is empty.
74786da6 695 * @return
5c8a3d40
RP
696 */
697bool IOImplementation::empty() const
698{
699 return m_output_buffer.empty();
700} // eo IOImplementation::empty
701
702/**
703 * puts data into the output buffer and sends it immediately if possible,
704 *
705 * The data is passed through the filter chain before it's stored in the output buffer
706 * (i.e. the output buffer contains data as it should be send directly to the descriptor).
707 * @param _data the data which should be send.
708 */
709void IOImplementation::lowSend(const std::string& _data)
710{
711 std::string data(_data);
712
713 for(FilterChain::reverse_iterator it_filter= m_filter_chain.rbegin();
714 it_filter!= m_filter_chain.rend();
715 ++it_filter)
716 {
717 data= (*it_filter)->filterOutgoingData(data);
718 }
719 m_output_buffer+= data;
720
721 // if we can send immediately, do it:
722 if (! m_output_buffer.empty() && m_marked_for_writing)
723 {
724 doWrite();
725 }
726} // eo IOImplementation::lowSend
727
728
729/**
730 * called by the backend when there is data to read for this object.
731 *
732 * Reads the data from the connection (read file descriptor) and passes the data through the filter chain.
733 * The final data is appended to the input buffer and the signal @a m_signal_read() is called.
734 *
735 * If EOF is detected (i,e, no data was received) then the signal @a m_signal_eof() is called.
736 *
737 * @note overload this method only when You know what You are doing!
738 * (overloading is necessary when handling server sockets.)
739 */
740void IOImplementation::doRead()
741{
742 // static read buffer; should be ok as long as we don't use threads
743 static char buffer[8*1024]; // 8 KiB
744
745 m_errno = 0;
746 if (m_read_fd<0 || !m_marked_for_reading)
747 {
748 ODOUT("exit0; read_fd="<<m_read_fd << " mark=" << m_marked_for_reading);
749 return;
750 }
751
752 // reset the mark:
753 m_marked_for_reading = false;
754
755 // now read the data:
756 ssize_t count;
757 count = ::read(m_read_fd, buffer, sizeof(buffer));
758
759 ODOUT("::read -> " << count);
760
761 // interpret what we got:
762 if (count < 0) // error
763 {
764 m_errno = errno;
765 int fd= m_read_fd;
766
767 switch(m_errno)
768 {
769 case EINVAL:
770 case EBADF:
771 case ECONNRESET:
772 case ENETRESET:
773 if (fd == m_read_fd)
774 {
775 close( (m_read_fd == m_write_fd) ? Direction::both : Direction::in );
776 }
777 break;
778 }
779 }
780 else if (count==0) // EOF
781 {
782 // remember the read fd:
783 int fd = m_read_fd;
784 // remember the EOF:
785 m_eof= true;
786 // signal EOF
787 m_signal_eof();
788 // if the fd is still the same: close it.
789 if (fd == m_read_fd)
790 {
791 close( Direction::in );
792 }
793 }
794 else // we have valid data
795 {
796 std::string data(buffer,count);
797 ODOUT(" got \"" << data << "\"");
798 for(FilterChain::iterator it_filter= m_filter_chain.begin();
799 it_filter != m_filter_chain.end();
800 ++it_filter)
801 {
802 data= (*it_filter)->filterIncomingData(data);
803 }
804 m_input_buffer+= data;
805 m_signal_read();
806 }
807} // eo IOImplementation::doRead
808
809
810/**
811 * interface for filter classes to inject data into the filter chain (emulating new incoming data).
812 * @param from_filter the filter which injects the new data.
813 * @param _data the new data.
814 */
815void IOImplementation::injectIncomingData(FilterBasePtr from_filter, const std::string& _data)
816{
817 FilterChain::iterator it_filter =
818 std::find_if( m_filter_chain.begin(), m_filter_chain.end(), FilterMatch(from_filter) );
819 if (it_filter == m_filter_chain.end())
820 {
821 // dont accept data inject from a unknown filter
822 return;
823 }
824 // well: pass the data through the remaining filters:
825 // NOTE: processing is (nearly) the same as in IOImplementation::doRead()
826 std::string data(_data);
827 for(++it_filter;
828 it_filter != m_filter_chain.end();
829 ++it_filter)
830 {
831 data= (*it_filter)->filterIncomingData(data);
832 }
833 m_input_buffer+= data;
834 m_signal_read();
835} // eo IOImplementation::injectIncomingData(FilterBase*,const std::string&)
836
837
838/**
839 * interface for filter classes to inject data into the filter chain (emulating new outgoing data).
840 * @param from_filter the filter which injects the new data.
841 * @param _data the new data.
842 */
843void IOImplementation::injectOutgoingData(FilterBasePtr from_filter, const std::string& _data)
844{
845 FilterChain::reverse_iterator it_filter =
846 std::find_if( m_filter_chain.rbegin(), m_filter_chain.rend(), FilterMatch(from_filter) );
847 if (it_filter == m_filter_chain.rend())
848 {
849 // dont accept data inject from a unknown filter
850 return;
851 }
852 // well: pass the data through the remaining filters:
853 // NOTE: processing is (nearly) the same as in IOImplementation::lowSend()
854 std::string data(_data);
855 for(++it_filter;
856 it_filter!= m_filter_chain.rend();
857 ++it_filter)
858 {
859 data= (*it_filter)->filterOutgoingData(data);
860 }
861 m_output_buffer+= data;
862
863 // if we can send immediately, do it:
864 if (! m_output_buffer.empty() && m_marked_for_writing)
865 {
866 doWrite();
867 }
868} // eo IOImplementation::injectOutgoingData(FilterBase*,const std::string&)
869
870
871/**
872 * set the read file descriptor.
873 * Although a derived class can also set the read fd directly; this method should be used
874 * for this task since it updates some flags on the fd for async operation.
875 * @param fd the new read file descriptor.
876 */
877void IOImplementation::setReadFd(int fd)
878{
879 // test if we already have a valid descriptor (and may have to close it):
880 if (m_read_fd >=0 )
881 {
882 if (m_read_fd == fd)
883 {
884 // fd was already right; consider it to be ok.
885 return;
886 }
887 close(Direction::in);
888 }
889 // reset our errno:
890 m_errno= 0;
891 // if the new descriptor looks valid, set some flags:
892 if (fd >= 0)
893 {
894 long flags= ::fcntl(fd, F_GETFL);
895 if (flags != -1)
896 {
897 // set the flags for non blocking, async operation
898 flags |= O_NONBLOCK|O_ASYNC;
899 ::fcntl(fd,F_SETFL, flags);
900 }
901 else if ( errno == EBADF )
902 {
903 // well, we seemed to be fed with an invalid descriptor...:
904 m_errno = errno;
905 fd= -1;
906 }
907 }
908 if (fd >= 0) // if still valid:
909 {
910 // set the close-on-exec flag
911 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
912 }
913 m_read_fd= fd;
914 m_marked_for_reading= false;
915 m_eof= false;
916} // eo IOImplementation::setReadFd(int)
917
918
919
920/**
921 * set the write file descriptor.
922 * Although a derived class can also set the write fd directly; this method should be used
923 * for this task since it updates some flags on the fd for async operation.
924 * @param fd the new write file descriptor.
925 */
926void IOImplementation::setWriteFd(int fd)
927{
928 if (m_write_fd >=0 )
929 {
930 if (m_write_fd == fd)
931 {
932 // fd was already right; consider it to be ok.
933 return;
934 }
935 close(Direction::out);
936 }
937 // reset our errno:
938 m_errno= 0;
939 // if the new descriptor looks valid, set some flags:
940 if (fd >= 0)
941 {
942 long flags= ::fcntl(fd, F_GETFL);
943 if (flags != -1)
944 {
945 // set the flags for non blocking, async operation
946 flags |= O_NONBLOCK|O_ASYNC;
947 ::fcntl(fd,F_SETFL, flags);
948 }
949 else if (errno == EBADF)
950 {
951 // well, we seemed to be fed with an invalid descriptor...:
952 m_errno = errno;
953 fd= -1;
954 }
955 }
956 if (fd >= 0) // if still valid:
957 {
958 // set the close-on-exec flag
959 ::fcntl(fd,F_SETFD, FD_CLOEXEC);
960 }
961 m_write_fd = fd;
962 m_marked_for_writing= false;
963 m_not_writable= false;
964} // eo IOImplementation::setWriteFd(int)
965
966
967
968/**
969 * called by the backend when this object can write data.
970 *
971 * If some data was sended, the signal @a m_signal_write is called.
972 *
973 * @internal tries to write all buffered data to output; if this succeeds,
974 * the connection is assumed to be still able to accept more data.
975 * (i.e. the internal write mark is kept!)
976 *
977 * @note overload this method only when You know what You are doing!
978*/
979void IOImplementation::doWrite()
980{
981 m_errno = 0;
982 if ( m_write_fd<0 || !m_marked_for_writing || m_output_buffer.empty())
983 {
984 return;
985 }
986
987 ODOUT("doWrite, d=\"" << m_output_buffer << "\"");
988
989 //reset mark:
990 m_marked_for_writing= false;
991
992 // now write the data
993 ssize_t count= ::write( m_write_fd, m_output_buffer.data(), m_output_buffer.size());
994
995 ODOUT("::write -> " << count);
996
997 if (count < 0) // error
998 {
999 m_errno= errno;
1000 int fd= m_write_fd;
1001
1002 switch(m_errno)
1003 {
1004 case EPIPE:
1005 m_not_writable= true;
1006 // emit a signal
1007 m_signal_not_writable();
1008 // fall through
1009 case EINVAL:
1010 case EBADF:
1011 case ECONNRESET:
1012 case ENETRESET:
1013 if (fd == m_write_fd)
1014 {
1015 close( (m_write_fd == m_read_fd) ? Direction::both : Direction::out );
1016 }
1017 break;
1018 }
1019 }
1020 else
1021 {
1022 m_output_buffer.erase(0, count);
1023 if (m_output_buffer.empty())
1024 {
1025 // special case: if we were able to send all the data, we keep the write mark:
1026 m_marked_for_writing= true;
1027 }
1028 }
1029 if (count > 0)
1030 {
1031 m_signal_write();
1032 }
1033} // eo IOImplementation::doWrite
1034
b9c8a8e5
GE
1035/**
1036 * Return a copy of the input buffer.
1037 */
1038std::string IOImplementation::getInput() const
1039{
1040 return m_input_buffer;
1041}
1042
1043/**
1044 * Return the input buffer and clear it.
1045 */
1046std::string IOImplementation::getInputClear()
1047{
1048 std::string retbuf;
1049
1050 retbuf.swap(m_input_buffer);
1051
1052 return retbuf;
1053}
1054
1055/**
1056 * Return true if there are bytes available in the input buffer.
1057 */
1058bool IOImplementation::inputAvailable() const
1059{
1060 return !m_input_buffer.empty();
1061}
1062
1063/**
1064 * Cut off the first len bytes from the input buffer.
1065 * @returns The number of bytes actually shortened
1066 */
1067std::string::size_type IOImplementation::shortenInput(std::string::size_type len)
1068{
1069 std::string::size_type real_len=len;
1070 if (real_len > m_input_buffer.size())
1071 real_len=m_input_buffer.size();
1072
1073 if (real_len > 0)
1074 m_input_buffer.erase(0,real_len);
1075
1076 return real_len;
1077}
5c8a3d40 1078
054f19f4
GE
1079/**
1080 * Read and remove a line from the input buffer.
1081 * @param delim the line ending character, \\n by default.
1082 * @returns A full line including the line ending if available, an empty string otherwise.
1083 */
1084std::string IOImplementation::getline(char delim)
1085{
1086 std::string line;
1087 std::string::size_type pos=m_input_buffer.find(delim);
1088
1089 // no line ending in the buffer?
1090 if (pos==std::string::npos)
1091 return line;
1092
1093 // copy the line including the delimiter
1094 line=m_input_buffer.substr(0,pos+1);
1095
1096 m_input_buffer.erase(0,pos+1);
1097
1098 return line;
1099}
1100
1101
5c8a3d40
RP
1102/*
1103 * implementation of SimpleIO
1104 */
1105
1106
1107SimpleIO::SimpleIO(int read_fd, int write_fd)
1108: inherited(read_fd, write_fd)
1109{
1110 m_signal_read.connect(boost::bind(&SimpleIO::slotReceived,this));
1111} // eo SimpleIO::SimpleIO()
1112
1113
1114SimpleIO::~SimpleIO()
1115{
1116} // eo SimpleIO::~SimpleIO()
1117
1118
1119/**
1120 * sends a string.
1121 * @param data the string.
1122 */
1123void SimpleIO::sendString(const std::string& data)
1124{
1125 lowSend(data);
1126} // eo SimpleIO::sendString(const std::string&)
1127
1128
1129/**
1130 * emits the signal signalReceived with the received data.
1131 * This slot is connected to IOImplementation::m_signal_read.
1132 */
1133void SimpleIO::slotReceived()
1134{
1135 std::string data;
1136 data.swap(m_input_buffer);
1137 signal_received_string(data);
1138} // eo SimpleIO::slotReceived()
1139
1140
1141
1142/*
1143 * implementation of SimpleIO2
1144 */
1145
1146
1147SimpleIO2::SimpleIO2(int read_fd, int write_fd)
1148: inherited(read_fd, write_fd)
1149{
1150 m_signal_read.connect(boost::bind(&SimpleIO2::slotReceived,this));
1151} // eo SimpleIO2::SimpleIO2()
1152
1153
1154SimpleIO2::~SimpleIO2()
1155{
1156} // eo SimpleIO2::~SimpleIO2()
1157
1158
1159/**
1160 * sends a string.
1161 * @param data the string.
1162 */
1163void SimpleIO2::sendString(const std::string& data)
1164{
1165 lowSend(data);
1166} // eo SimpleIO2::sendString(const std::string&)
1167
1168
1169/**
1170 * emits the signal signalReceived with the received data.
1171 * This slot is connected to IOImplementation::m_signal_read.
1172 */
1173void SimpleIO2::slotReceived()
1174{
1175 std::string data;
1176 data.swap(m_input_buffer);
1177 signal_received_string(data);
1178} // eo SimpleIO2::slotReceived()
1179
1180
1181
1182/*
1183 * implementation of class Backend (singleton)
1184 */
1185
1186Backend* Backend::g_backend= NULL;
1187
1188int Backend::m_count_active_steps=0;
1189
1190
1191Backend::Backend()
1192: m_count_active_loops(0)
1193, m_count_stop_requests(0)
1194{
74786da6 1195 SystemTools::ignore_signal( SystemTools::Signal::PIPE );
5c8a3d40
RP
1196} // eo Backend::Backend
1197
1198
1199Backend::~Backend()
1200{
1201 SystemTools::restore_signal_handler( SystemTools::Signal::PIPE );
1202} // eo Backend::~Backend()
1203
1204/**
1205 * delivers pointer to the current backend, instantiating a new backend if there was no current one.
1206 *
1207 * This should be the only way to access the backend which should be a singleton.
1208 *
1209 * @return the pointer to the current backend.
1210 */
1211Backend* Backend::getBackend()
1212{
1213 if (!g_backend)
1214 {
1215 g_backend = new Backend();
1216 }
1217 return g_backend;
1218} // eo Backend::getBackend
1219
1220
1221
1222
1223/**
1224 * performs one backend cycle.
1225 *
1226 * Collects all file descriptors from the active io objects which should be selected for reading and/or writing.
1227 * Also determines the timer events which become due and adjusts the timeout.
1228 * Constructs the necessary structures and calls poll().
1229 * Finally interprets the results from poll() (i.e. performs the reading/writing/timer events)
1230 *
1231 * @param timeout maximal wait value in milliseconds; negative value waits until at least one event occured.
1232 * @return @a true if there was at least one active object; otherwise @a false
1233 *
1234 * @note this method is a little beast.
1235 *
74786da6 1236 * @internal
5c8a3d40
RP
1237 * The cycle is divided into four steps: collecting; poll; mark and execute.
1238 * The "mark" step is necessary to avoid some bad side effects when method calls in the execution stage
1239 * are calling @a Backup::doOneStep or open their own local backend loop.
1240 *
1241 * @todo handle some more error cases.
1242 * @todo provide a plugin interface for external handler.
1243 * (currently inclusion of external handler is possible by (ab)using timer classes)
1244 */
1245bool Backend::doOneStep(int timeout)
1246{
1247 ODOUT( "timeout=" << timeout );
1248 internal_io::PollDataCluster poll_data;
1249 bool had_active_object = false;
1250
1251 ++m_count_active_steps;
1252
1253 try {
1254
1255 FdSetType local_read_fds;
1256 FdSetType local_write_fds;
1257
1258 // step 1 ; collect
1259
1260 { // step 1.1: collect fds for read/write operations
1261 for(internal_io::IOList::iterator itIOList = internal_io::g_io_list().begin();
1262 itIOList != internal_io::g_io_list().end();
1263 ++itIOList)
1264 {
1265 if (! *itIOList) continue; // skip NULL entries
1266 int read_fd = (*itIOList)->m_read_fd;
1267 int write_fd = (*itIOList)->m_write_fd;
1268 bool want_read = (read_fd >= 0) and (*itIOList)->wantRead();
1269 bool want_write = (write_fd >= 0) and (*itIOList)->wantWrite();
1270 if (read_fd >= 0 )
1271 {
1272 local_read_fds.insert( read_fd );
1273 }
1274 if (write_fd >= 0)
1275 {
1276 local_write_fds.insert( write_fd );
1277 }
1278 if (!want_read && !want_write) continue;
1279 if (want_read)
1280 {
1281 FODOUT( (*itIOList), "wants to read (fd=" << read_fd << ")");
1282 poll_data.add_read_fd(read_fd, *itIOList);
1283 }
1284 if (want_write)
1285 {
1286 FODOUT( (*itIOList), "wants to write (fd=" << write_fd << ")");
1287 poll_data.add_write_fd(write_fd, *itIOList);
1288 }
1289 had_active_object= true;
1290 }
1291 }
1292
1293 { // step 1.2: collect timer events
1294 MilliTime current_time;
1295 MilliTime min_event_time;
1296
1297 get_current_monotonic_time(current_time);
1298 bool min_event_time_set;
1299
1300 if (timeout >= 0)
1301 {
1302 min_event_time = current_time + MilliTime(0,timeout);
1303 min_event_time_set= true;
1304 }
1305 else
1306 {
1307 min_event_time = current_time + MilliTime(86400,0);
1308 min_event_time_set= false;
1309 }
1310 // TODO
1311
1312 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
74786da6 1313 it_timer != internal_io::g_timer_list().end()
5c8a3d40
RP
1314 && (!had_active_object || !min_event_time_set || current_time < min_event_time);
1315 ++ it_timer)
1316 {
1317 if (! *it_timer) continue; // skip NULL entries
1318 if (! (*it_timer)->m_active) continue; // skip if not enabled
1319 if ( !min_event_time_set || (*it_timer)->m_when < min_event_time)
1320 {
1321 min_event_time = (*it_timer)->m_when;
1322 min_event_time_set= true;
1323 }
1324 had_active_object= true;
1325 }
1326
1327 if (min_event_time_set)
1328 { // we have at a minimal event time, so (re)compute the timeout value:
1329 MilliTime delta= (min_event_time - current_time);
1330 long long delta_ms = std::min( delta.get_milliseconds(), 21600000LL); // max 6h
1331 if (delta_ms <= 0L)
1332 {
1333 timeout= 0L;
1334 }
1335 else
1336 {
1337 timeout= delta_ms + (delta_ms<5 ? 1 : 3);
1338 }
1339 }
1340 }
1341
1342 // step 2 : poll
1343 ODOUT(" poll timeout is " << timeout);
1344 {
1345 MilliTime current_time;
1346 get_current_monotonic_time(current_time);
1347 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1348 }
1349 int poll_result= ::poll(poll_data.get_pollfd_ptr(), poll_data.get_num_pollfds(), timeout);
1350
1351 ODOUT("poll -> " << poll_result);
1352 {
1353 MilliTime current_time;
1354 get_current_monotonic_time(current_time);
1355 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1356 }
1357
1358 if (poll_result < 0)
1359 {
1360 //TODO poll error handling (signals ?!)
1361 }
1362
1363 // step 3 : mark
1364
1365 // step 3.1: mark io objects (if necessary)
1366 if (poll_result > 0)
1367 {
1368 for(internal_io::PollVector::iterator itPollItem = poll_data.m_poll_vector.begin();
1369 itPollItem != poll_data.m_poll_vector.end();
1370 ++itPollItem)
1371 {
1372 ODOUT(" fd=" << itPollItem->fd << ", events=" << itPollItem->events << ", revents=" << itPollItem->revents);
1373 if ( 0 == (itPollItem->revents))
1374 { // preliminary continuation if nothing is to handle for this item(/fd)...
1375 continue;
1376 }
1377 if ( 0!= (itPollItem->revents & (POLLIN|POLLHUP)))
1378 {
1379 IOImplementation *io= poll_data.m_read_fd_io_map[ itPollItem->fd ];
1380 if (io && io->m_read_fd==itPollItem->fd)
1381 {
1382 FODOUT(io,"marked for reading");
1383 io->m_marked_for_reading= true;
1384 }
1385 }
1386 if ( 0!= (itPollItem->revents & POLLOUT))
1387 {
1388 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1389 if (io && io->m_write_fd==itPollItem->fd)
1390 {
1391 io->m_marked_for_writing= true;
1392 }
1393 }
1394 if ( 0!= (itPollItem->revents & POLLERR))
1395 {
1396 IOImplementation *io= poll_data.m_write_fd_io_map[ itPollItem->fd ];
1397 if (0!= (itPollItem->events & POLLOUT))
1398 {
1399 if (io && io->m_write_fd==itPollItem->fd)
1400 {
1401 io->m_marked_for_writing= false;
1402 //io->close( Direction::out );
1403 }
1404 }
1405 }
1406 // TODO error handling (POLLERR, POLLHUP, POLLNVAL)
1407 }
1408 }
1409
1410 //Step 3.2: mark timer objects
1411 {
1412 MilliTime current_time;
1413
1414 get_current_monotonic_time(current_time);
1415 ODOUT(" current time is sec="<<current_time.mt_sec << ", msec=" << current_time.mt_msec);
1416
1417 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1418 it_timer != internal_io::g_timer_list().end();
1419 ++ it_timer)
1420 {
1421 ODOUT(" check timer " << *it_timer);
1422 if (! *it_timer) continue; // skip NULL entries
1423 if (! (*it_timer)->m_active) continue; // skip if not enabled
1424 if ( (*it_timer)->m_when <= current_time)
1425 {
1426 ODOUT(" ==> MARK");
1427 (*it_timer)->m_marked = true;
1428 }
1429 }
1430 }
1431
1432
1433 // step 4 : execute
1434
1435 // step 4.1: execute io
1436 ODOUT("execute stage");
1437 for(internal_io::IOList::iterator it_io = internal_io::g_io_list().begin();
1438 it_io != internal_io::g_io_list().end();
1439 ++ it_io)
1440 {
1441 ODOUT(" check obj " << *it_io);
1442 if (NULL == *it_io) continue;
1443 if ((*it_io)->m_marked_for_writing)
1444 {
1445 FODOUT((*it_io),"exec doWrite");
1446 (*it_io)->doWrite();
1447 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1448 if ((*it_io)->m_errno)
1449 {
1450 continue;
1451 }
1452 }
1453 if ((*it_io)->m_marked_for_reading)
1454 {
1455 FODOUT((*it_io),"exec doRead");
1456 (*it_io)->doRead();
1457 if ((*it_io) == NULL) continue; // skip remaining if we lost the object
1458 if ((*it_io)->m_errno)
1459 {
1460 continue;
1461 }
1462 }
1463 }
1464
1465 // step 4.2: execute timer events
1466 {
1467 for(internal_io::TimerList::iterator it_timer= internal_io::g_timer_list().begin();
1468 it_timer != internal_io::g_timer_list().end();
1469 ++ it_timer)
1470 {
1471 if (! *it_timer) continue; // skip NULL entries
1472 if (! (*it_timer)->m_active) continue; // skip if not enabled
1473 if (! (*it_timer)->m_marked) continue; // skip if not marked
1474
1475 // reset the mark and deactivate object now since the execute() method might activate it again
1476 (*it_timer)->m_marked= false;
1477 (*it_timer)->m_active= false;
1478
1479 // now execute the event:
1480 (*it_timer)->execute();
1481 }
1482 }
1483
1484 } // eo try
1485 catch(...)
1486 {
1487 // clean up our counter
1488 --m_count_active_steps;
1489 // and forward the exception
1490 throw;
1491 }
1492
1493 if ( 0 == --m_count_active_steps)
1494 {
1495 internal_io::g_io_list().clean_list();
1496 internal_io::g_timer_list().clean_list();
1497 }
1498
1499 return had_active_object;
1500} // eo Backend::doOneStep
1501
1502
1503/**
1504 * enters a backend loop.
1505 *
1506 * Calls @a Backend::doOneStep within a loop until @a Backend::stop was called or there are no more
1507 * active objects (io objects or timer objects).
1508 */
1509void Backend::run()
1510{
1511 ++m_count_active_loops;
1512 do
1513 {
1514 try
1515 {
1516 if (!doOneStep(c_max_poll_wait))
1517 {
1518 // stop if there are no more active objects.
1519 stop();
1520 }
1521 }
1522 catch(...)
1523 {
1524 // clean up our counter
1525 --m_count_active_loops;
1526 // and forward the exception
1527 throw;
1528 }
74786da6 1529 }
5c8a3d40
RP
1530 while (0 == m_count_stop_requests);
1531 --m_count_active_loops;
1532 --m_count_stop_requests;
1533} // eo Backend::run
1534
1535
1536/**
1537 * @brief stops the latest loop currently run by Backend::run().
1538 * @see Backend::run()
1539 */
1540void Backend::stop()
1541{
1542 if (m_count_active_loops)
1543 {
1544 ++m_count_stop_requests;
1545 }
1546} // eo Backend::stop()
1547
1548
1549
42b7c46d 1550} // eo namespace AsyncIo