X-Git-Url: http://developer.intra2net.com/git/?p=libt2n;a=blobdiff_plain;f=src%2Fsocket_server.cpp;fp=src%2Fsocket_server.cpp;h=f0e573b3101e8c6e36133f10b5032116b1d2e29f;hp=e9bdde001affb3f0cfef867e9892bea7c901d077;hb=aa499d2034964a4c125794b7e8ea768cb7471411;hpb=04e6b2712bf0fdfdb6a74cf6d26f02e6a8d37ae2 diff --git a/src/socket_server.cpp b/src/socket_server.cpp index e9bdde0..f0e573b 100644 --- a/src/socket_server.cpp +++ b/src/socket_server.cpp @@ -33,6 +33,8 @@ #include #include +#include + #include "socket_server.hxx" #include "t2n_exception.hxx" @@ -330,6 +332,17 @@ void socket_server::fill_buffer(long long usec_timeout) return; } +void socket_server::fill_connection_buffers() +{ + std::map::iterator ie=connections.end(); + for(std::map::iterator i=connections.begin(); i != ie; i++) + if (!i->second->is_closed()) + { + socket_connection* cp=dynamic_cast(i->second); + cp->fill_buffer(connection_set); + } +} + socket_connection::socket_connection(int _sock, int _timeout) : connection(_timeout) { @@ -341,14 +354,124 @@ void socket_connection::close() } -void socket_connection::fill_buffer(void) +void socket_connection::fill_buffer(fd_set &cur_fdset) { + bool try_again=false; + + if (is_closed() || !FD_ISSET (sock, &cur_fdset)) + return; // no data pending or connection closed + + // data pending -> go and get it + char socket_buffer[recv_buffer_size]; + + int nbytes = read (sock, socket_buffer, recv_buffer_size); + if (nbytes < 0) + { + if (errno == EAGAIN) + { + if (my_server) + my_server->log(server::error,"read error: no data (EAGAIN) for connection "+get_id_string()); + return; + } + else if (errno == EINTR) + { + // interrupted, try again + try_again=true; + } + else + { + if (my_server) + my_server->log(server::error,"error reading from socket of connection "+get_id_string()+": "+strerror(errno)); + return; + } + } + + // End-of-file + if (nbytes == 0 && !try_again) + { + close(); + return; + } + + // Data read -> store it + if (nbytes > 0) + buffer.assign(socket_buffer,nbytes); + // more data waiting? + fd_set active_fd_set; + struct timeval tval; + + FD_ZERO (&active_fd_set); + FD_SET (sock, &active_fd_set); + + /* no waiting */ + tval.tv_sec=0; + tval.tv_usec=0; + + if (select (FD_SETSIZE, &active_fd_set, NULL, NULL, &tval) >0) + { + /* more data waiting -> recurse */ + fill_buffer(active_fd_set); + } + + reset_timeout(); + + return; } void socket_connection::write(const std::string& data) { + static const unsigned int write_block_size=4096; + + if (is_closed()) + return; + + // prepend packet size to data + packet_size_indicator psize=data.size(); + string send_data(data); + send_data.insert(0,(char*)psize,sizeof(packet_size_indicator)); + + int offset = 0; + while (offset < send_data.size()) + { + unsigned int write_size=write_block_size; + + if (offset+write_size > send_data.size()) + write_size = send_data.size()-offset; + int rtn; + while ((rtn=::write(sock, send_data.data()+offset, write_size)) && + rtn == -1 && (errno == EAGAIN || errno == EINTR)) + { + usleep (80000); + if (my_server) + my_server->log(server::debug,"resuming write() call after EAGAIN or EINTR for connection "+get_id_string()); + } + + if (rtn == -1) + { + if (my_server) + my_server->log(server::error,"write() error on connection "+get_id_string()+": "+strerror(errno)); + } + else if (rtn != write_size) + { + if (my_server) + { + ostringstream msg; + msg << "write() error on connection " << get_id() + << ": wrote " << rtn << " bytes, should have been " + << write_size << " (complete: " << send_data.size() << ")"; + + my_server->log(server::error,msg.str()); + } + } + + offset += write_size; + } + + reset_timeout(); + + return; } }