* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
***************************************************************************/
+#include <sstream>
+
#include "server.hxx"
namespace libt2n
last_action_time=time(NULL);
}
+std::string connection::get_id_string()
+{
+ std::ostringstream os;
+ os << get_id();
+ return os.str();
+}
+
bool connection::get_packet(std::string& data, unsigned int& conn_id)
{
// max packet size is unsigned int
int server::add_connection(connection* newconn)
{
- connections[next_id]=newconn;
- return next_id++;
+ unsigned int cid=next_id++;
+ newconn->set_id(cid);
+ newconn->set_server(this);
+ connections[cid]=newconn;
+ return cid;
}
/**
i->second->check_timeout();
}
-void server::fill_connection_buffers(void)
-{
- std::map<unsigned int, connection*>::iterator ie=connections.end();
- for(std::map<unsigned int, connection*>::iterator i=connections.begin(); i != ie; i++)
- if (!i->second->is_closed())
- i->second->fill_buffer();
-}
-
bool server::get_packet(std::string& data, unsigned int& conn_id)
{
// todo: this is somehow unfair: the first connections in the map get checked more
namespace libt2n
{
+class server;
+
/**
Basic connection class
*/
int timeout;
int last_action_time;
bool closed;
- std::string buffer;
+ unsigned int connection_id;
protected:
connection(int _timeout)
set_timeout(_timeout);
reset_timeout();
closed=false;
+ connection_id=0;
+ my_server=0;
}
+ server *my_server;
+ std::string buffer;
+
+ typedef unsigned int packet_size_indicator;
+
public:
~connection()
{ this->close(); }
bool is_closed()
{ return closed; }
+ void set_server(server* _my_server)
+ { my_server=_my_server; }
+
+ void set_id(unsigned int _connection_id)
+ { connection_id=_connection_id; }
+ unsigned int get_id()
+ { return connection_id; }
+ std::string get_id_string();
+
virtual void close()
{ closed=true; }
- virtual void fill_buffer(void)=0;
-
bool get_packet(std::string& data, unsigned int& conn_id);
virtual void write(const std::string& data)=0;
};
std::ostream *logstream;
unsigned int next_id;
- std::map<unsigned int, connection*> connections;
protected:
+ std::map<unsigned int, connection*> connections;
+
server()
{
set_default_timeout(30);
set_logging(NULL,none);
- next_id=0;
+ next_id=1;
}
int add_connection(connection* newconn);
bool get_packet(std::string& data, unsigned int& conn_id);
- void fill_connection_buffers(void);
+ virtual void fill_connection_buffers(void)=0;
- protected:
void log(log_level_values level, const std::string& message)
{ log(level,message.c_str()); }
void log(log_level_values level, const char* message);
#include <pwd.h>
#include <grp.h>
+#include <sstream>
+
#include "socket_server.hxx"
#include "t2n_exception.hxx"
return;
}
+void socket_server::fill_connection_buffers()
+{
+ std::map<unsigned int, connection*>::iterator ie=connections.end();
+ for(std::map<unsigned int, connection*>::iterator i=connections.begin(); i != ie; i++)
+ if (!i->second->is_closed())
+ {
+ socket_connection* cp=dynamic_cast<socket_connection*>(i->second);
+ cp->fill_buffer(connection_set);
+ }
+}
+
socket_connection::socket_connection(int _sock, int _timeout)
: connection(_timeout)
{
}
-void socket_connection::fill_buffer(void)
+void socket_connection::fill_buffer(fd_set &cur_fdset)
{
+ bool try_again=false;
+
+ if (is_closed() || !FD_ISSET (sock, &cur_fdset))
+ return; // no data pending or connection closed
+
+ // data pending -> go and get it
+ char socket_buffer[recv_buffer_size];
+
+ int nbytes = read (sock, socket_buffer, recv_buffer_size);
+ if (nbytes < 0)
+ {
+ if (errno == EAGAIN)
+ {
+ if (my_server)
+ my_server->log(server::error,"read error: no data (EAGAIN) for connection "+get_id_string());
+ return;
+ }
+ else if (errno == EINTR)
+ {
+ // interrupted, try again
+ try_again=true;
+ }
+ else
+ {
+ if (my_server)
+ my_server->log(server::error,"error reading from socket of connection "+get_id_string()+": "+strerror(errno));
+ return;
+ }
+ }
+
+ // End-of-file
+ if (nbytes == 0 && !try_again)
+ {
+ close();
+ return;
+ }
+
+ // Data read -> store it
+ if (nbytes > 0)
+ buffer.assign(socket_buffer,nbytes);
+ // more data waiting?
+ fd_set active_fd_set;
+ struct timeval tval;
+
+ FD_ZERO (&active_fd_set);
+ FD_SET (sock, &active_fd_set);
+
+ /* no waiting */
+ tval.tv_sec=0;
+ tval.tv_usec=0;
+
+ if (select (FD_SETSIZE, &active_fd_set, NULL, NULL, &tval) >0)
+ {
+ /* more data waiting -> recurse */
+ fill_buffer(active_fd_set);
+ }
+
+ reset_timeout();
+
+ return;
}
void socket_connection::write(const std::string& data)
{
+ static const unsigned int write_block_size=4096;
+
+ if (is_closed())
+ return;
+
+ // prepend packet size to data
+ packet_size_indicator psize=data.size();
+ string send_data(data);
+ send_data.insert(0,(char*)psize,sizeof(packet_size_indicator));
+
+ int offset = 0;
+ while (offset < send_data.size())
+ {
+ unsigned int write_size=write_block_size;
+
+ if (offset+write_size > send_data.size())
+ write_size = send_data.size()-offset;
+ int rtn;
+ while ((rtn=::write(sock, send_data.data()+offset, write_size)) &&
+ rtn == -1 && (errno == EAGAIN || errno == EINTR))
+ {
+ usleep (80000);
+ if (my_server)
+ my_server->log(server::debug,"resuming write() call after EAGAIN or EINTR for connection "+get_id_string());
+ }
+
+ if (rtn == -1)
+ {
+ if (my_server)
+ my_server->log(server::error,"write() error on connection "+get_id_string()+": "+strerror(errno));
+ }
+ else if (rtn != write_size)
+ {
+ if (my_server)
+ {
+ ostringstream msg;
+ msg << "write() error on connection " << get_id()
+ << ": wrote " << rtn << " bytes, should have been "
+ << write_size << " (complete: " << send_data.size() << ")";
+
+ my_server->log(server::error,msg.str());
+ }
+ }
+
+ offset += write_size;
+ }
+
+ reset_timeout();
+
+ return;
}
}