X-Git-Url: http://developer.intra2net.com/git/?p=libt2n;a=blobdiff_plain;f=src%2Fsocket_server.cpp;h=f0e573b3101e8c6e36133f10b5032116b1d2e29f;hp=6ef93177413fa0d5324eaf35eeaffb71002bab34;hb=5816531135da09aceb7a026eeade735cbd20ef03;hpb=0cf4dc9bf7fa527751fd7dc425f882fc86888132 diff --git a/src/socket_server.cpp b/src/socket_server.cpp index 6ef9317..f0e573b 100644 --- a/src/socket_server.cpp +++ b/src/socket_server.cpp @@ -33,6 +33,8 @@ #include #include +#include + #include "socket_server.hxx" #include "t2n_exception.hxx" @@ -201,7 +203,79 @@ socket_server::~socket_server() void socket_server::new_connection() { + struct sockaddr_un clientname; + + unsigned int size = sizeof (clientname); + int newsock = accept (sock,(struct sockaddr *) &clientname,&size); + if (newsock < 0) + { + if (errno == EAGAIN) + { + log(error, "accept error (EAGAIN): no connection waiting"); + return; + } + + /* default: break */ + string err="error accepting connection: "; + err+=strerror(errno); + log(error, err); + throw t2n_server_error(err); + } + + FD_SET (newsock, &connection_set); + int i=1; + + /* keepalive enable */ + if (setsockopt(newsock,SOL_SOCKET, SO_KEEPALIVE, &i, sizeof(i)) < 0) + { + string err="error setting socket option: "; + err+=strerror(errno); + log(error, err); + throw t2n_server_error(err); + } + + /* close on exec */ + int fdflags; + fdflags=fcntl(newsock,F_GETFD, 0); + if (fdflags < 0) + { + string err="fcntl error on socket: "; + err+=strerror(errno); + log(error, err); + throw t2n_server_error(err); + } + fdflags |= FD_CLOEXEC; + if (fcntl(newsock,F_SETFD,fdflags) < 0) + { + string err="fcntl error on socket: "; + err+=strerror(errno); + log(error, err); + throw t2n_server_error(err); + } + + /* non-blocking mode */ + int flflags; + flflags=fcntl(newsock,F_GETFL,0); + if (flflags < 0) + { + string err="fcntl error on socket: "; + err+=strerror(errno); + log(error, err); + throw t2n_server_error(err); + } + flflags |= O_NONBLOCK; + if (fcntl(newsock,F_SETFL,flflags) < 0) + { + string err="fcntl error on socket: "; + err+=strerror(errno); + log(error, err); + throw t2n_server_error(err); + } + + add_connection(new socket_connection(newsock, get_default_timeout())); + + return; } void socket_server::fill_buffer(long long usec_timeout) @@ -258,10 +332,21 @@ void socket_server::fill_buffer(long long usec_timeout) return; } -socket_connection::socket_connection(int _socket, int _timeout) - : connection(_timeout) +void socket_server::fill_connection_buffers() { + std::map::iterator ie=connections.end(); + for(std::map::iterator i=connections.begin(); i != ie; i++) + if (!i->second->is_closed()) + { + socket_connection* cp=dynamic_cast(i->second); + cp->fill_buffer(connection_set); + } +} +socket_connection::socket_connection(int _sock, int _timeout) + : connection(_timeout) +{ + sock=_sock; } void socket_connection::close() @@ -269,14 +354,124 @@ void socket_connection::close() } -void socket_connection::fill_buffer(void) +void socket_connection::fill_buffer(fd_set &cur_fdset) { + bool try_again=false; + + if (is_closed() || !FD_ISSET (sock, &cur_fdset)) + return; // no data pending or connection closed + + // data pending -> go and get it + char socket_buffer[recv_buffer_size]; + + int nbytes = read (sock, socket_buffer, recv_buffer_size); + if (nbytes < 0) + { + if (errno == EAGAIN) + { + if (my_server) + my_server->log(server::error,"read error: no data (EAGAIN) for connection "+get_id_string()); + return; + } + else if (errno == EINTR) + { + // interrupted, try again + try_again=true; + } + else + { + if (my_server) + my_server->log(server::error,"error reading from socket of connection "+get_id_string()+": "+strerror(errno)); + return; + } + } + + // End-of-file + if (nbytes == 0 && !try_again) + { + close(); + return; + } + + // Data read -> store it + if (nbytes > 0) + buffer.assign(socket_buffer,nbytes); + // more data waiting? + fd_set active_fd_set; + struct timeval tval; + + FD_ZERO (&active_fd_set); + FD_SET (sock, &active_fd_set); + + /* no waiting */ + tval.tv_sec=0; + tval.tv_usec=0; + + if (select (FD_SETSIZE, &active_fd_set, NULL, NULL, &tval) >0) + { + /* more data waiting -> recurse */ + fill_buffer(active_fd_set); + } + + reset_timeout(); + + return; } void socket_connection::write(const std::string& data) { + static const unsigned int write_block_size=4096; + + if (is_closed()) + return; + + // prepend packet size to data + packet_size_indicator psize=data.size(); + string send_data(data); + send_data.insert(0,(char*)psize,sizeof(packet_size_indicator)); + + int offset = 0; + while (offset < send_data.size()) + { + unsigned int write_size=write_block_size; + if (offset+write_size > send_data.size()) + write_size = send_data.size()-offset; + + int rtn; + while ((rtn=::write(sock, send_data.data()+offset, write_size)) && + rtn == -1 && (errno == EAGAIN || errno == EINTR)) + { + usleep (80000); + if (my_server) + my_server->log(server::debug,"resuming write() call after EAGAIN or EINTR for connection "+get_id_string()); + } + + if (rtn == -1) + { + if (my_server) + my_server->log(server::error,"write() error on connection "+get_id_string()+": "+strerror(errno)); + } + else if (rtn != write_size) + { + if (my_server) + { + ostringstream msg; + msg << "write() error on connection " << get_id() + << ": wrote " << rtn << " bytes, should have been " + << write_size << " (complete: " << send_data.size() << ")"; + + my_server->log(server::error,msg.str()); + } + } + + offset += write_size; + } + + reset_timeout(); + + return; } }