class ConnectionWrapper
{
private:
- long long command_timeout_usec;
- long long hello_timeout_usec;
+ int command_timeout_millisec;
+ int hello_timeout_millisec;
protected:
log_level_values log_level;
public:
ConnectionWrapper()
- : command_timeout_usec(command_client::command_timeout_usec_default),
- hello_timeout_usec(command_client::hello_timeout_usec_default),
+ : command_timeout_millisec(command_client::command_timeout_millisec_default),
+ hello_timeout_millisec(command_client::hello_timeout_millisec_default),
log_level(none),
logstream(NULL)
{ }
return true;
}
- long long get_command_timeout_usec(void)
- { return command_timeout_usec; }
+ int get_command_timeout_millisec() const
+ { return command_timeout_millisec; }
- void set_command_timeout_usec(long long _command_timeout_usec)
- { command_timeout_usec=_command_timeout_usec; }
+ void set_command_timeout_millisec(int _command_timeout_millisec)
+ { command_timeout_millisec=_command_timeout_millisec; }
- long long get_hello_timeout_usec(void)
- { return hello_timeout_usec; }
+ int get_hello_timeout_millisec() const
+ { return hello_timeout_millisec; }
- void set_hello_timeout_usec(long long _hello_timeout_usec)
- { hello_timeout_usec=_hello_timeout_usec; }
+ void set_hello_timeout_millisec(int _hello_timeout_millisec)
+ { hello_timeout_millisec=_hello_timeout_millisec; }
virtual void set_logging(std::ostream *_logstream, log_level_values _log_level);
throw std::logic_error(NotInitializedMessage);
std::auto_ptr<Client> stub(new Client(WrappedConnection->get_connection(),
- WrappedConnection->get_command_timeout_usec(),
- WrappedConnection->get_hello_timeout_usec()));
+ WrappedConnection->get_command_timeout_millisec(),
+ WrappedConnection->get_hello_timeout_millisec()));
SingletonObject=std::auto_ptr<T2nSingletonWrapper>(new T2nSingletonWrapper(stub));
}
}
/// return a pointer to the ConnectionWrapper currently in use
- static ConnectionWrapper* get_connection_wrapper(void)
+ static ConnectionWrapper* get_connection_wrapper()
{ return WrappedConnection.get(); }
/// manually establish the connection without actually executing a call
- static void ensure_singleton_there(void)
+ static void ensure_singleton_there()
{
if (SingletonObject.get() == NULL)
init();
#include <boost/bind.hpp>
#include "command_client.hxx"
+#include "monotonic_clock.hxx"
#include <config.h>
/**
* Constructor
* @param _c connection for this command. Ownership of the pointer is outside.
- * @param _command_timeout_usec timeout until the command has to be completed
- * @param _hello_timeout_usec timeout until hello has to be received
+ * @param _command_timeout_millisec timeout until the command has to be completed
+ * @param _hello_timeout_millisec timeout until hello has to be received
*/
-command_client::command_client(client_connection* _c, long long _command_timeout_usec, long long _hello_timeout_usec)
+command_client::command_client(client_connection* _c, int _command_timeout_millisec, int _hello_timeout_millisec)
: c(_c)
, constructorException(NULL)
{
- command_timeout_usec=_command_timeout_usec;
- hello_timeout_usec=_hello_timeout_usec;
+ command_timeout_millisec=_command_timeout_millisec;
+ hello_timeout_millisec=_hello_timeout_millisec;
// for reconnects
c->add_callback(new_connection,bind(&command_client::read_hello, boost::ref(*this)));
}
/** @brief return a complete packet
- @param usec_timeout maximum microseconds to wait until the packet is complete
+ @param millisec_timeout maximum microseconds to wait until the packet is complete
@retval packet data as std::string
@note throws a t2n_transfer_error if the timeout is exceeded
*/
-std::string command_client::read_packet(const long long &usec_timeout)
+std::string command_client::read_packet(const int &millisec_timeout)
{
string resultpacket;
bool got_packet=false;
- long long my_timeout=usec_timeout;
+ int my_timeout=millisec_timeout, timeout_checkpoint;
+
while(!(got_packet=c->get_packet(resultpacket)) && my_timeout > 0 && !c->is_closed())
- c->fill_buffer(my_timeout,&my_timeout);
+ {
+ timeout_checkpoint=monotonic_clock_gettime_sec();
+ c->fill_buffer(millisec_timeout);
+ my_timeout -= abs(monotonic_clock_gettime_sec() - timeout_checkpoint);
+ }
+
if (!got_packet)
throw t2n_transfer_error("timeout exceeded");
{
string resultpacket;
bool got_packet=false;
- long long my_timeout=hello_timeout_usec;
+ int my_timeout=hello_timeout_millisec, timeout_checkpoint;
while(!(got_packet=c->get_packet(resultpacket)) && my_timeout > 0 && !c->is_closed())
{
- c->fill_buffer(my_timeout,&my_timeout);
+ timeout_checkpoint=monotonic_clock_gettime_sec();
+ c->fill_buffer(my_timeout);
+ my_timeout -= abs(monotonic_clock_gettime_sec() - timeout_checkpoint);
c->peek_packet(resultpacket);
check_hello(resultpacket); // will throw before timeout if wrong data received
c->write(ofs.str());
- istringstream ifs(read_packet(command_timeout_usec));
+ istringstream ifs(read_packet(command_timeout_millisec));
boost::archive::binary_iarchive ia(ifs);
try
class command_client
{
public:
- static const long long command_timeout_usec_default=90000000;
- static const long long hello_timeout_usec_default=30000000;
+ static const int command_timeout_millisec_default=90000;
+ static const int hello_timeout_millisec_default=30000;
private:
client_connection *c;
- long long hello_timeout_usec;
- long long command_timeout_usec;
+ int hello_timeout_millisec;
+ int command_timeout_millisec;
+
+ t2n_exception *constructorException;
void read_hello();
- std::string read_packet(const long long &usec_timeout);
+ std::string read_packet(const int &millisec_timeout);
bool check_hello(const std::string& hellostr);
- t2n_exception *constructorException;
-
public:
- command_client(client_connection* _c,
- long long _command_timeout_usec=command_timeout_usec_default,
- long long _hello_timeout_usec=hello_timeout_usec_default);
+ explicit command_client(client_connection* _c,
+ int _command_timeout_millisec=command_timeout_millisec_default,
+ int _hello_timeout_millisec=hello_timeout_millisec_default);
+
virtual ~command_client();
void replace_connection(client_connection* _c);
void send_command(command* cmd, result_container &res);
- void set_command_timeout_usec(long long _command_timeout_usec=command_timeout_usec_default)
- { command_timeout_usec=_command_timeout_usec; }
- void set_hello_timeout_usec(long long _hello_timeout_usec=hello_timeout_usec_default)
- { hello_timeout_usec=_hello_timeout_usec; }
- long long get_command_timeout_usec(void)
- { return command_timeout_usec; }
- long long get_hello_timeout_usec(void)
- { return hello_timeout_usec; }
- bool is_connection_closed(void)
+ void set_command_timeout_millisec(int _command_timeout_millisec=command_timeout_millisec_default)
+ { command_timeout_millisec=_command_timeout_millisec; }
+
+ void set_hello_timeout_millisec(int _hello_timeout_millisec=hello_timeout_millisec_default)
+ { hello_timeout_millisec=_hello_timeout_millisec; }
+
+ int get_command_timeout_millisec() const
+ { return command_timeout_millisec; }
+
+ int get_hello_timeout_millisec() const
+ { return hello_timeout_millisec; }
+
+ bool is_connection_closed()
{ return c->is_closed(); }
- t2n_exception* get_constuctor_exception(void)
+
+ t2n_exception* get_constructor_exception()
{ return constructorException; }
};
}
/** @brief handle incoming commands
- @param[in,out] usec_timeout wait until new data is found, max timeout usecs.
+ @param[in,out] millisec_timeout wait until new data is found, max timeout millisecs.
-1: wait endless, 0: instant return
- @param[out] usec_timeout_remaining microseconds from the timeout that were not used
*/
-void command_server::handle(long long usec_timeout, long long* usec_timeout_remaining)
+void command_server::handle(int millisec_timeout)
{
guard_handle++;
try
{
- if (s.fill_buffer(usec_timeout,usec_timeout_remaining))
+ if (s.fill_buffer(millisec_timeout))
{
std::string packet;
unsigned int conn_id = 0;
command_server(server& _s);
~command_server();
- void handle(long long usec_timeout=-1, long long* usec_timeout_remaining=NULL);
+ void handle(int millisec_timeout=-1);
void send_hello(unsigned int conn_id);
void do_callbacks(callback_event_type event);
- void reopen(void);
+ void reopen();
void remove_incomplete_packets();
public:
virtual ~connection();
/// is this connection closed or not
- bool is_closed()
+ bool is_closed() const
{ return closed; }
/// close this connection
virtual void close();
/** @brief look for new data and store it in the local buffer
- @param usec_timeout wait until new data is found, max timeout usecs.
+ @param millisec_timeout wait until new data is found, max timeout secs.
-1: wait endless
0: return instantly
- @param usec_timeout_remaining if non-NULL the function will write the
- not used time to the given target
@retval true if new data was found (does not mean that the received data
is a complete packet though)
*/
- virtual bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL)=0;
+ virtual bool fill_buffer(int millisec_timeout)=0;
bool get_packet(std::string& data);
server::server()
: callbacks(__events_end)
{
- set_default_timeout(30);
+ set_default_timeout(30000);
set_logging(NULL,none);
next_id=1;
}
for (i=callbacks[e].begin(); i != ie; i++)
newconn->add_callback(static_cast<callback_event_type>(e),bind(*i,cid));
}
-
LOGSTREAM(debug,"new connection accepted, id: " << cid);
do_callbacks(new_connection,cid);
/**
@brief Gets a connection by id
-
@param conn_id Connection ID
-
@retval Pointer to connection object
*/
server_connection* server::get_connection(unsigned int conn_id)
private:
int timeout;
int last_action_time;
+
unsigned int connection_id;
void set_server(server* _my_server)
protected:
server *my_server;
- server_connection(int _timeout);
+ explicit server_connection(int _timeout);
virtual ~server_connection();
std::ostream* get_logstream(log_level_values level);
{ timeout=_timeout; }
/// get the id of this connection within the server object
- unsigned int get_id()
+ unsigned int get_id() const
{ return connection_id; }
void add_callback(callback_event_type event, const boost::function<void ()>& func);
server();
- virtual bool fill_connection_buffers(void)=0;
+ virtual bool fill_connection_buffers()=0;
unsigned int add_connection(server_connection* newconn);
{ default_timeout=_default_timeout; }
/// get the current default timeout for client connections
- int get_default_timeout(void)
+ int get_default_timeout() const
{ return default_timeout; }
void set_logging(std::ostream *_logstream, log_level_values _log_level);
void add_callback(callback_event_type event, const boost::function<void (unsigned int)>& func);
/** @brief look for new data and store it in the local buffer
- @param usec_timeout wait until new data is found, max timeout usecs.
+ @param millisec_timeout wait until new data is found, max timeout millisecs.
-1: wait endless
0: return instantly
- @param usec_timeout_remaining if non-NULL the function will write the
- not used time to the given target
@retval true if new data was found (does not mean that the received data
is a complete packet though)
*/
- virtual bool fill_buffer(long long usec_timeout=-1, long long* usec_timeout_remaining=NULL)=0;
+ virtual bool fill_buffer(int millisec_timeout=-1)=0;
void close();
on this file might be covered by the GNU General Public License.
*/
-#include <stdio.h>
-#include <errno.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <sys/types.h>
+#include <cerrno>
#include <sys/socket.h>
#include <sys/un.h>
-#include <sys/time.h>
+#include <sys/epoll.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
-#include <time.h>
-
-#include <pwd.h>
-#include <grp.h>
+#include <ctime>
#include <sstream>
+#include "monotonic_clock.hxx"
#include "socket_client.hxx"
#include "t2n_exception.hxx"
#include "log.hxx"
{
/// returns a closed connection if connection could not be established, call get_last_error_msg() for details
-socket_client_connection::socket_client_connection(int _port, const std::string& _server,
- long long _connect_timeout_usec, int _max_retries,
- std::ostream *_logstream, log_level_values _log_level)
+socket_client_connection::socket_client_connection(int _port, const std::string& _server,
+ int _connect_timeout_millisec, int _max_retries,
+ std::ostream *_logstream, log_level_values _log_level)
: client_connection(), socket_handler(0,tcp_s)
{
max_retries=_max_retries;
- connect_timeout_usec=_connect_timeout_usec;
+ connect_timeout_millisec=_connect_timeout_millisec;
server=_server;
port=_port;
}
/// returns a closed connection if connection could not be established, call get_last_error_msg() for details
-socket_client_connection::socket_client_connection(const std::string& _path,
- long long _connect_timeout_usec, int _max_retries,
- std::ostream *_logstream, log_level_values _log_level)
+socket_client_connection::socket_client_connection(const std::string& _path, int _connect_timeout_millisec, int _max_retries,
+ std::ostream *_logstream, log_level_values _log_level)
: client_connection(), socket_handler(0,unix_s)
{
max_retries=_max_retries;
- connect_timeout_usec=_connect_timeout_usec;
+ connect_timeout_millisec=_connect_timeout_millisec;
path=_path;
if (!sock)
throw t2n_connect_error(string("socket() error: ")+strerror(errno));
+ socket_handler::create_epoll();
+
try
{
connect_with_timeout((struct sockaddr *) &sock_addr,sizeof(sock_addr));
if (!sock)
throw t2n_connect_error(string("socket() error: ")+strerror(errno));
+ socket_handler::create_epoll();
+
try
{
connect_with_timeout((struct sockaddr *) &unix_addr, sizeof(unix_addr));
else
throw t2n_connect_error("no more retries left after connect error");
}
+
}
/// execute a connect on a prepared socket (tcp or unix) respecting timeouts
{
LOGSTREAM(debug,"connect_with_timeout(): EINPROGRESS");
- /* set timeout */
- struct timeval tval;
- struct timeval *timeout_ptr;
-
- if (connect_timeout_usec == -1)
- timeout_ptr = NULL;
- else
- {
- timeout_ptr = &tval;
+ int nfds;
+ while ((nfds=epoll_wait(epoll_fd,epoll_events,EPOLL_MAX_EVENTS,timeout)) &&
+ nfds < 0 && errno==EINTR);
- // convert timeout from long long usec to int sec + int usec
- tval.tv_sec = connect_timeout_usec / 1000000;
- tval.tv_usec = connect_timeout_usec % 1000000;
- }
-
- fd_set connect_socket_set;
- FD_ZERO(&connect_socket_set);
- FD_SET(sock,&connect_socket_set);
-
- int ret;
- while ((ret=select(FD_SETSIZE, NULL, &connect_socket_set, NULL, timeout_ptr)) &&
- ret < 0 && errno==EINTR);
-
- if (ret < 0)
+ if (nfds < 0)
throw t2n_connect_error(string("connect() error (select): ")+strerror(errno));
socklen_t sopt=sizeof(int);
}
}
-/** @brief try to reconnect the current connection with the same connection credentials (host and port or path)
-
- @note will throw an exeption if reconnecting not possible
+/**
+ * @brief try to reconnect the current connection with the same connection credentials (host and port or path)
+ * @note will throw an exception if reconnecting not possible
*/
void socket_client_connection::reconnect()
{
else if (type == unix_s)
unix_connect(max_retries);
- // connection is open now, otherwise an execption would have been thrown
+ // connection is open now, otherwise an exception would have been thrown
reopen();
LOGSTREAM(debug,"reconnect() done, client_connection::is_closed() now " << client_connection::is_closed());
namespace libt2n
{
/** @brief a connection from client to server using sockets.
-
Use this class to connect from a client to a server.
*/
class socket_client_connection : public client_connection, public socket_handler
{
public:
- static const int max_retries_default=3;
- static const long long connect_timeout_usec_default=30000000;
+ static const int max_retries_default=5;
+ static const int connect_timeout_millisec_default=30000;
private:
void real_write(const std::string& data)
void connect_with_timeout(struct sockaddr *sock_addr,unsigned int sockaddr_size);
int max_retries;
- long long connect_timeout_usec;
+ int connect_timeout_millisec;
std::string path;
std::string server;
{ return client_connection::get_logstream(level); }
public:
- socket_client_connection(int _port, const std::string& _server="127.0.0.1",
- long long _connect_timeout_usec=connect_timeout_usec_default,
- int _max_retries=max_retries_default,
- std::ostream *_logstream=NULL, log_level_values _log_level=none);
- socket_client_connection(const std::string& _path,
- long long _connect_timeout_usec=connect_timeout_usec_default,
+ explicit socket_client_connection(int _port, const std::string& _server="127.0.0.1",
+ int _connect_timeout_millisec=connect_timeout_millisec_default,
+ int _max_retries=max_retries_default, std::ostream *_logstream=NULL,
+ log_level_values _log_level=none);
+
+ explicit socket_client_connection(const std::string& _path,
+ int _connect_timeout_millisec=connect_timeout_millisec_default,
int _max_retries=max_retries_default,
std::ostream *_logstream=NULL, log_level_values _log_level=none);
~socket_client_connection();
/** @brief read data from the socket and copy it into buffer
- @param usec_timeout wait until new data is found, max timeout usecs.
- -1: wait endless
- 0: return instantly
- @param usec_timeout_remaining if non-NULL the function will write the
- not used time to the given target
@retval true if new data was found (does not mean that the received data
is a complete packet though)
*/
- bool fill_buffer(long long usec_timeout=-1, long long *usec_timeout_remaining=NULL)
- { return socket_handler::fill_buffer(buffer,usec_timeout,usec_timeout_remaining); }
+ bool fill_buffer(int millisec_timeout)
+ { return socket_handler::fill_buffer(buffer, millisec_timeout); }
virtual void close();
void reconnect();
- std::string get_last_error_msg(void)
+ std::string get_last_error_msg()
{ return lastErrorMsg; }
};
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
+#include <sys/epoll.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netdb.h>
{
socket_handler::socket_handler(int _sock, socket_type_value _socket_type)
-: sock(_sock)
-, recv_buffer_size( default_recv_buffer_size )
-, write_block_size( default_write_block_size )
-, write_timeout( default_write_timeout )
-, socket_type(_socket_type)
+ : sock(_sock)
+ , recv_buffer_size( default_recv_buffer_size )
+ , write_block_size( default_write_block_size )
+ , write_timeout( default_write_timeout )
+ , socket_type(_socket_type)
{
}
if (sock != -1)
{
shutdown(sock,SHUT_RDWR);
+ remove_from_epoll(sock);
::close(sock);
sock = -1;
/// by the connection class you are using.
void socket_handler::close()
{
- LOGSTREAM(debug,"close connection");
- // graceful shutdown
- shutdown(sock,SHUT_RDWR);
- ::close(sock);
+ if (sock != -1)
+ {
+ LOGSTREAM(debug, "close connection");
+ // graceful shutdown
+ shutdown(sock, SHUT_RDWR);\
+ //taking it out of the epoll pool
- sock = -1;
+ remove_from_epoll(sock);
+ ::close(sock);
+
+ sock = -1;
+ }
}
/// set options like fast reuse and keepalive every socket should have
/**
- * @brief set new size for the data chunks when writeing.
+ * @brief set new size for the data chunks when writing.
* @param new_write_block_size the new chunk size.
*
* The write block size determines the amount of data which is tried to write
- * to the socket when data needs to be sended.
- * Since writing data is done in a loop, this does not limit the amunt of data which can
+ * to the socket when data needs to be sent.
+ * Since writing data is done in a loop, this does not limit the amount of data which can
* be written.
*
* The value is normalized to be at least 512 bytes and at max 32K bytes.
/** @brief check if new data is waiting on the raw socket
- @param[in,out] usec_timeout wait until new data is found, max timeout usecs.
+ @param[in,out] timeout wait until new data is found, max timeout millisecs.
-1: wait endless
0: return instantly
- @param[out] usec_timeout_remaining microseconds from the timeout that were not used
*/
-bool socket_handler::data_waiting(long long usec_timeout,long long* usec_timeout_remaining)
+bool socket_handler::data_waiting(int timeout)
{
- // just our socket
- fd_set active_fd_set;
- FD_ZERO (&active_fd_set);
- FD_SET (sock, &active_fd_set);
-
- /* set timeout */
- struct timeval tval;
- struct timeval *timeout_ptr;
-
- if (usec_timeout == -1)
- timeout_ptr = NULL;
- else
- {
- timeout_ptr = &tval;
-
- // convert timeout from long long usec to int sec + int usec
- tval.tv_sec = usec_timeout / 1000000;
- tval.tv_usec = usec_timeout % 1000000;
- }
-
- int ret=select (FD_SETSIZE, &active_fd_set, NULL, NULL, timeout_ptr);
-
- // return the timeout we did not use
- // todo: this is linux specific according to man 2 select
- if (usec_timeout > 0 && usec_timeout_remaining != NULL)
- *usec_timeout_remaining=(tval.tv_sec*1000000)+tval.tv_usec;
+ int ret = epoll_wait(epoll_fd,epoll_events,EPOLL_MAX_EVENTS,timeout);
if (ret > 0)
return true;
/** @brief read data from the raw socket and copy it into the provided buffer
@param buffer the buffer where to append the new data
- @param[in,out] usec_timeout wait until new data is found, max timeout usecs.
+ @param[in,out] timeout wait until new data is found, max timeout millisecs.
-1: wait endless
0: return instantly
- @param[out] usec_timeout_remaining microseconds from the timeout that were not used
*/
-bool socket_handler::fill_buffer(std::string& buffer, long long usec_timeout, long long *usec_timeout_remaining)
+bool socket_handler::fill_buffer(std::string& buffer,int timeout)
{
// fast path for timeout==0
- if (usec_timeout==0 || data_waiting(usec_timeout,usec_timeout_remaining))
+ if (timeout == 0 || data_waiting(timeout))
return fill_buffer(buffer);
else
return false;
while ((rtn=::write(sock, data.data()+offset, write_size)) == -1 &&
(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
{
- wait_ready_to_write(sock,write_timeout);
+ wait_ready_to_write(write_timeout);
LOGSTREAM(debug,"resuming write() call after EAGAIN or EINTR or EWOULDBLOCK");
}
}
LOGSTREAM(debug,"wrote " << data.size() << " bytes");
-
- return;
} // eo socket_handler::socket_write(const std::string&)
/// wait until the socket is ready to write again
-void socket_handler::wait_ready_to_write(int socket, long long write_block_timeout)
+void socket_handler::wait_ready_to_write(int write_block_timeout)
{
- // prepare socket sets
- fd_set write_set[1];
- fd_set except_set[1];
- FD_ZERO(write_set);
- FD_ZERO(except_set);
- FD_SET(socket, write_set);
- FD_SET(socket, except_set);
-
- // prepare timeout struct
- struct timeval tval;
- struct timeval *timeout_ptr;
-
- if (write_block_timeout == -1)
- timeout_ptr = NULL;
- else
- {
- timeout_ptr = &tval;
-
- // convert timeout from long long usec to int sec + int usec
- tval.tv_sec = write_block_timeout / 1000000;
- tval.tv_usec = write_block_timeout % 1000000;
- }
-
// let's wait for the socket to become writable again...
int rtn;
- while ((rtn=::select(socket+1, NULL, write_set, except_set, timeout_ptr)) ==-1 && errno == EINTR);
+ while ((rtn=::epoll_wait(epoll_fd,epoll_events,EPOLL_MAX_EVENTS,write_block_timeout)) ==-1 && errno == EINTR);
- if (rtn > 0 && (!FD_ISSET(socket,write_set)) && FD_ISSET(socket, except_set))
+ if (rtn > 0)
{
- // if we are selected but cannot write and have an exception
- // we have serious trouble...
- EXCEPTIONSTREAM(error,t2n_transfer_error,"exception on socket; cannot write any more.");
+ for (int i=0; i < rtn; ++i)
+ {
+ // Something very wrong happened
+ if(epoll_events[i].events != EPOLLOUT && epoll_events[i].events == EPOLLERR)
+ EXCEPTIONSTREAM(error,t2n_transfer_error,"exception on socket; cannot write any more.");
+ }
}
+ else if (rtn == 0)
+ EXCEPTIONSTREAM(error,t2n_transfer_error,"timeout on epoll_wait() for write");
+ else if (rtn == -1)
+ EXCEPTIONSTREAM(error,t2n_transfer_error,"cannot find socket to write: " << strerror(errno));
+}
+
+/**
+ * @brief initialize the epoll pool and add the primary socket to it
+ * assuming the socket is already created!
+ */
+void socket_handler::create_epoll()
+{
+ struct epoll_event ev = {};
+ ev.events = EPOLLIN;
+ epoll_fd = epoll_create1(EPOLL_CLOEXEC);
- if (rtn==0)
- EXCEPTIONSTREAM(error,t2n_transfer_error,"timeout on select() for write");
+ if (epoll_fd == -1)
+ EXCEPTIONSTREAM(error,t2n_server_error, "error opening epoll: " << strerror(errno));
- if (rtn==-1)
- EXCEPTIONSTREAM(error,t2n_transfer_error,"cannot select() for write: " << strerror(errno));
+ //adding sock (fd) to the set of events that epoll API will monitor
+ if(sock != -1)
+ add_to_epoll(sock,&sock);
+ else
+ EXCEPTIONSTREAM(error,t2n_server_error, "socket not previously created impossible to add to epoll");
}
+/**
+ * @brief this adds one file descriptor to the epoll pool to monitor events
+ * assuming the epoll was previously created
+ * @param fd the file descriptor we want to monitor
+ */
+void socket_handler::add_to_epoll(int fd,void* ptr)
+{
+ struct epoll_event ev;
+ ev.events = EPOLLIN;
+ ev.data.ptr = ptr;
+ //adding newsock (client fd) to the set of events that epoll API will monitor
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd,&ev) == -1)
+ EXCEPTIONSTREAM(error,t2n_server_error,"error adding server socket to epoll(): " << strerror(errno));
+
+}
+
+/**
+ * @brief this removes one file descriptor from the epoll pool
+ * assuming the epoll was previously created and the fd was
+ * previously added somewhere
+ * @param fd the file descriptor we want to monitor
+ */
+void socket_handler::remove_from_epoll(int fd)
+{
+ struct epoll_event ev;
+ //removing (client fd) from the set of events that epoll API monitors
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd,&ev) == -1)
+ LOGSTREAM(debug,"error deleting server socket to epoll(): " << strerror(errno) << " ...ignoring"<<endl);
+}
}
#define __LIBT2N_SOCKET_HANDLER
#include <iostream>
+#include <sys/epoll.h>
#include "types.hxx"
+#define EPOLL_MAX_EVENTS 32
+
namespace libt2n
{
private:
static const unsigned int default_recv_buffer_size=2048;
static const unsigned int default_write_block_size=4096;
- static const long long default_write_timeout=30000000;
-
+ static const int default_write_timeout=30000;
socket_type_value socket_type;
- bool data_waiting(long long usec_timeout,long long *usec_timeout_remaining=NULL);
- void wait_ready_to_write(int socket, long long write_block_timeout);
+ bool data_waiting(int timeout);
+ void wait_ready_to_write(int write_block_timeout);
protected:
+ int epoll_fd;
+ struct epoll_event epoll_events[EPOLL_MAX_EVENTS];
+
int sock;
unsigned int recv_buffer_size;
unsigned int write_block_size;
- long long write_timeout;
+ time_t write_timeout;
+ time_t timeout;
socket_handler(int _sock, socket_type_value _socket_type);
~socket_handler();
virtual void close();
- bool fill_buffer(std::string& buffer, long long usec_timeout, long long* usec_timeout_remaining=NULL);
+ bool fill_buffer(std::string& buffer, int timeout);
bool fill_buffer(std::string& buffer);
+ void create_epoll();
+ void remove_from_epoll(int fd);
+ void add_to_epoll(int fd,void* ptr);
+
public:
/// is this a tcp or unix socket connection
socket_type_value get_type()
{ return socket_type; }
- int get_socket()
+ int get_socket() const
{ return sock; }
bool is_closed();
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
-#include <time.h>
#include <pwd.h>
#include <grp.h>
-
#include <sstream>
+#include "monotonic_clock.hxx"
#include "socket_server.hxx"
#include "t2n_exception.hxx"
#include "log.hxx"
EXCEPTIONSTREAM(error,t2n_server_error,"error binding socket: " << strerror(errno));
}
+ create_epoll();
+
start_listening();
}
EXCEPTIONSTREAM(error,t2n_server_error,"error changing socket ownership: " << strerror(errno));
}
+ socket_handler::create_epoll();
+
start_listening();
+
}
/**
server::close();
// server socket will be closed by destructor of socket_handler
-
if (get_type()==unix_s)
unlink(unix_path.c_str());
EXCEPTIONSTREAM(error,t2n_server_error,"error listening to socket: " << strerror(errno));
/* clear & insert server sock into the fd_tab to prepare select */
- FD_ZERO(&connection_set);
- FD_SET (sock, &connection_set);
sockets_set.insert(sock);
}
EXCEPTIONSTREAM(error,t2n_server_error,"fatal error accepting connection: " << strerror(errno));
}
- FD_SET (newsock, &connection_set);
sockets_set.insert(newsock);
+
socket_server_connection *nc=new socket_server_connection(newsock, get_type(), get_default_timeout());
nc->set_socket_options(newsock);
add_connection(nc);
-
- return;
+ add_to_epoll(newsock,nc);
}
/** @brief look for new connections and new data in any of the existing connections
- @param usec_timeout wait until new data is found, max timeout usecs.
+ @param timeout wait until new data is found, max timeout millisecs.
-1: wait endless
0: return instantly
- @param usec_timeout_remaining if non-NULL the function will write the
- not used time to the given target
- @retval true if new data was found (does not mean that the received data
- is a complete packet though)
*/
-bool socket_server::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining)
+bool socket_server::fill_buffer(int timeout)
{
- fd_set used_fdset=connection_set;
-
- /* set timeout */
- struct timeval tval;
- struct timeval *timeout_ptr;
-
- if (usec_timeout == -1)
- timeout_ptr = NULL;
- else
- {
- timeout_ptr = &tval;
-
- // timeout von long long usec in int sec + int usec umrechnen
- tval.tv_sec = usec_timeout / 1000000;
- tval.tv_usec = usec_timeout % 1000000;
- }
-
- int ret=select (FD_SETSIZE, &used_fdset, NULL, NULL, timeout_ptr);
-
- // return the timeout we did not use
- if (usec_timeout > 0 && usec_timeout_remaining != NULL)
- *usec_timeout_remaining=(tval.tv_sec*1000000)+tval.tv_usec;
+ int nfds = epoll_wait(epoll_fd, epoll_events, EPOLL_MAX_EVENTS, timeout);
- if (ret < 0)
+ if (nfds < 0)
{
if (errno == EINTR)
{
// select interrupted by signal
- ret=0;
+ nfds=0;
}
else
- EXCEPTIONSTREAM(error,t2n_server_error,"select error: " << strerror(errno));
+ EXCEPTIONSTREAM(error,t2n_server_error,"epoll error: " << strerror(errno));
}
- if (ret > 0)
+ if (nfds > 0)
{
// we have data pending
-
// check for new connection
- if (FD_ISSET (sock, &used_fdset))
+ for (int n=0; n < nfds; ++n)
{
- new_connection();
+ if(sock != epoll_events[n].data.fd)
+ {
+ new_connection();
+ continue;
+ }
+ socket_server_connection *ready_client = static_cast<socket_server_connection*>(epoll_events[n].data.ptr);
+ if(ready_client != NULL)
+ ready_client->fill_buffer(timeout);
}
-
// check all connections for pending data
return fill_connection_buffers();
}
/// remove the socket of a connection after the connection has been closed
void socket_server::remove_connection_socket(int sock)
{
- FD_CLR(sock, &connection_set);
- sockets_set.erase(sock);
+ int r = sockets_set.erase(sock);
+ if (r)
+ remove_from_epoll(sock);
}
/**
}
}
-bool socket_server_connection::fill_buffer(long long usec_timeout,long long* usec_timeout_remaining)
+bool socket_server_connection::fill_buffer(int timeout)
{
- bool new_data = socket_handler::fill_buffer(buffer,usec_timeout,usec_timeout_remaining);
+ bool new_data = socket_handler::fill_buffer(buffer, timeout);
if (new_data)
reset_timeout();
return new_data;
#ifndef __LIBT2N_SOCKET_SERVER
#define __LIBT2N_SOCKET_SERVER
+#include <sys/epoll.h>
#include <sys/types.h>
#include <string>
#include <set>
friend class socket_server_connection;
private:
- fd_set connection_set;
std::string unix_path;
std::set<int> sockets_set;
void new_connection();
- bool fill_connection_buffers();
void remove_connection_socket(int sock);
protected:
~socket_server();
- bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL);
+ /** @brief look for new connections and new data in any of the existing connections
+ @param timeout wait until new data is found, max timeout millisecs.
+ -1: wait endless
+ 0: return instantly
+ */
+ bool fill_buffer(int timeout);
+ bool fill_connection_buffers();
std::set<int> get_sockets_set()
{ return sockets_set; };
};
private:
socket_server_connection(int _sock, socket_type_value _stype, int _timeout)
- : socket_handler(_sock,_stype), server_connection(_timeout)
- { }
+ : socket_handler(_sock,_stype), server_connection(_timeout)
+ { }
~socket_server_connection();
{ socket_write(data); }
public:
- bool fill_buffer(long long usec_timeout=-1,long long* usec_timeout_remaining=NULL);
+ bool fill_buffer(int timeout);
virtual void close();
+
};
}
{
if (socket_type == tcp_s)
c=std::auto_ptr<socket_client_connection>
- (new socket_client_connection(port,server,connect_timeout_usec,max_retries,logstream,log_level));
+ (new socket_client_connection(port, server, connect_timeout_millisec, max_retries, logstream, log_level));
else if (socket_type == unix_s)
c=std::auto_ptr<socket_client_connection>
- (new socket_client_connection(path,connect_timeout_usec,max_retries,logstream,log_level));
+ (new socket_client_connection(path, connect_timeout_millisec, max_retries, logstream, log_level));
}
return c.get();
}
/// return active connection, return a dummy-connection if we can't establish one
-client_connection* ReconnectIgnoreFailureSocketWrapper::get_connection(void)
+client_connection* ReconnectIgnoreFailureSocketWrapper::get_connection()
{
client_connection* tmp=BasicSocketWrapper::get_connection();
/** @brief a basic implementation of ConnectionWrapper
This is a basic version of a ConnectionWrapper which does not do any fancy
- error handling or anything, it justs executes the regular calls. Use this
+ error handling or anything, it just executes the regular calls. Use this
wrapper if you only want to use the singleton-feature of T2nSingletonWrapper.
*/
class BasicSocketWrapper : public ConnectionWrapper
std::string server;
int port;
- long long connect_timeout_usec;
+ int connect_timeout_millisec;
int max_retries;
std::auto_ptr<socket_client_connection> c;
public:
BasicSocketWrapper(int _port, const std::string& _server="127.0.0.1",
- long long _connect_timeout_usec=socket_client_connection::connect_timeout_usec_default,
+ int _connect_timeout_millisec=socket_client_connection::connect_timeout_millisec_default,
int _max_retries=socket_client_connection::max_retries_default)
: ConnectionWrapper(),
socket_type(tcp_s),
server(_server),
port(_port),
- connect_timeout_usec(_connect_timeout_usec),
+ connect_timeout_millisec(_connect_timeout_millisec),
max_retries(_max_retries)
{ }
BasicSocketWrapper(const std::string& _path,
- long long _connect_timeout_usec=socket_client_connection::connect_timeout_usec_default,
+ int _connect_timeout_millisec=socket_client_connection::connect_timeout_millisec_default,
int _max_retries=socket_client_connection::max_retries_default)
: ConnectionWrapper(),
socket_type(unix_s),
path(_path),
- connect_timeout_usec(_connect_timeout_usec),
+ connect_timeout_millisec(_connect_timeout_millisec),
max_retries(_max_retries)
{ }
- client_connection* get_connection(void);
+ client_connection* get_connection();
- bool connection_established(void)
+ bool connection_established()
{ return (c.get() != NULL); }
void set_logging(std::ostream *_logstream, log_level_values _log_level);
class ReconnectSocketWrapper : public BasicSocketWrapper
{
public:
- ReconnectSocketWrapper(int _port, const std::string& _server="127.0.0.1",
- long long _connect_timeout_usec=socket_client_connection::connect_timeout_usec_default,
+ explicit ReconnectSocketWrapper(int _port, const std::string& _server="127.0.0.1",
+ int _connect_timeout_millisec=socket_client_connection::connect_timeout_millisec_default,
int _max_retries=socket_client_connection::max_retries_default)
- : BasicSocketWrapper(_port,_server,_connect_timeout_usec,_max_retries)
+ : BasicSocketWrapper(_port, _server, _connect_timeout_millisec,
+ _max_retries)
{ }
- ReconnectSocketWrapper(const std::string& _path,
- long long _connect_timeout_usec=socket_client_connection::connect_timeout_usec_default,
+ explicit ReconnectSocketWrapper(const std::string& _path,
+ int _connect_timeout_millisec=socket_client_connection::connect_timeout_millisec_default,
int _max_retries=socket_client_connection::max_retries_default)
- : BasicSocketWrapper(_path,_connect_timeout_usec,_max_retries)
+ : BasicSocketWrapper(_path, _connect_timeout_millisec,
+ _max_retries)
{ }
bool handle(command_client* stubBase, boost::function< void() > f);
: client_connection()
{ close(); }
- bool fill_buffer(long long usec_timeout=-1, long long *usec_timeout_remaining=NULL)
+ bool fill_buffer(int millisec_timeout=-1)
{ return false; }
};
dummy_client_connection dc;
public:
- ReconnectIgnoreFailureSocketWrapper(int _port, const std::string& _server="127.0.0.1",
- long long _connect_timeout_usec=socket_client_connection::connect_timeout_usec_default,
+ __attribute__((unused)) explicit ReconnectIgnoreFailureSocketWrapper(int _port, const std::string& _server="127.0.0.1",
+ int _connect_timeout_millisec=socket_client_connection::connect_timeout_millisec_default,
int _max_retries=socket_client_connection::max_retries_default)
- : ReconnectSocketWrapper(_port,_server,_connect_timeout_usec,_max_retries)
+ : ReconnectSocketWrapper(_port, _server, _connect_timeout_millisec, _max_retries)
{ }
- ReconnectIgnoreFailureSocketWrapper(const std::string& _path,
- long long _connect_timeout_usec=socket_client_connection::connect_timeout_usec_default,
+ explicit ReconnectIgnoreFailureSocketWrapper(const std::string& _path,
+ int _connect_timeout_millisec=socket_client_connection::connect_timeout_millisec_default,
int _max_retries=socket_client_connection::max_retries_default)
- : ReconnectSocketWrapper(_path,_connect_timeout_usec,_max_retries)
+ : ReconnectSocketWrapper(_path, _connect_timeout_millisec, _max_retries)
{ }
client_connection* get_connection(void);
command_client cc(&sc);
- t2n_exception* ep=cc.get_constuctor_exception();
+ t2n_exception* ep=cc.get_constructor_exception();
string errormsg;
if (ep)
command_client cc(&sc);
- t2n_exception* ep=cc.get_constuctor_exception();
+ t2n_exception* ep=cc.get_constructor_exception();
string errormsg;
if (ep)
command_client cc(&sc);
- t2n_exception* ep=cc.get_constuctor_exception();
+ t2n_exception* ep=cc.get_constructor_exception();
string errormsg;
if (ep)
command_client cc(&sc);
- t2n_exception* ep=cc.get_constuctor_exception();
+ t2n_exception* ep=cc.get_constructor_exception();
string errormsg;
if (ep)
command_client cc(&sc);
- t2n_exception* ep=cc.get_constuctor_exception();
+ t2n_exception* ep=cc.get_constructor_exception();
string errormsg;
if (ep)
command_client cc(&sc);
- t2n_exception* ep=cc.get_constuctor_exception();
+ t2n_exception* ep=cc.get_constructor_exception();
string errormsg;
if (ep)
long long maxtime=1000000;
while(maxtime > 0)
- cs.handle(maxtime,&maxtime);
+ cs.handle(maxtime);
}
global_server = NULL;
socket_client_connection sc("./socket");
command_client cc(&sc,1000000,1000000);
- t2n_exception* ep=cc.get_constuctor_exception();
+ t2n_exception* ep=cc.get_constructor_exception();
string errormsg;
if (ep)
socket_client_connection sc("./socket");
command_client cc(&sc,1000000,1000000);
- t2n_exception* ep=cc.get_constuctor_exception();
+ t2n_exception* ep=cc.get_constructor_exception();
string errormsg;
if (ep)
{
public:
cmd_group_x_client(libt2n::client_connection *_c,
- long long _command_timeout_usec=command_timeout_usec_default,
- long long _hello_timeout_usec=hello_timeout_usec_default)
+ long long _command_timeout_usec=command_timeout_millisec_default,
+ long long _hello_timeout_usec=hello_timeout_millisec_default)
: libt2n::command_client(_c,_command_timeout_usec,_hello_timeout_usec)
{}
wraptype::set_connection(auto_ptr<ConnectionWrapper>
(new ReconnectSocketWrapper("./socket")));
- wraptype::get_connection_wrapper()->set_command_timeout_usec(3000000);
- wraptype::get_connection_wrapper()->set_hello_timeout_usec(3000000);
+ wraptype::get_connection_wrapper()->set_command_timeout_millisec(3000000);
+ wraptype::get_connection_wrapper()->set_hello_timeout_millisec(3000000);
// 42 closes connection on the server side
t2n_exec(&cmd_group_x_client::serverfunc)(42);
wraptype::set_connection(auto_ptr<ConnectionWrapper>
(new ReconnectIgnoreFailureSocketWrapper("./socket")));
- wraptype::get_connection_wrapper()->set_command_timeout_usec(3000000);
- wraptype::get_connection_wrapper()->set_hello_timeout_usec(3000000);
+ wraptype::get_connection_wrapper()->set_command_timeout_millisec(3000000);
+ wraptype::get_connection_wrapper()->set_hello_timeout_millisec(3000000);
// 42 closes connection on the server side
t2n_exec(&cmd_group_x_client::serverfunc)(42);
// parent
{
// wait till server is up
- sleep(1);
+ sleep(2);
}
}