int main(int argc, char** argv)
{
- // use a local socket (a.k.a "unix domain socket")
- // if you want to connect to a tcp/ip server you pass the port and server name to the constructor
- libt2n::socket_client_connection sc("./socket");
- // this generated class has a method for each of our exported procedures
- cmd_group_t2nexample_client cc(sc);
+ // use a local socket (a.k.a "unix domain socket")
+ // if you want to connect to a tcp/ip server you pass the port and server name to the constructor
+ libt2n::socket_client_connection sc("./socket");
+ // this generated class has a method for each of our exported procedures
+ cmd_group_t2nexample_client cc(sc);
- bool throwok=false;
+ bool throwok=false;
- // exceptions are passed back to the client transparently
- try
- {
- // call the remote procedure (we pass "throw" to cause a exception to be thrown)
- cc.testfunc("throw");
- }catch(libt2n::t2n_runtime_error &e){
- throwok=(std::string(e.what())=="throw me around");
- }
+ // exceptions are passed back to the client transparently
+ try
+ {
+ // call the remote procedure (we pass "throw" to cause a exception to be thrown)
+ cc.testfunc("throw");
+ }
+ catch(libt2n::t2n_runtime_error &e)
+ {
+ throwok=(std::string(e.what())=="throw me around");
+ }
- // call remote procedure and check the return value is correct
- return ( throwok && ( cc.testfunc("hello") == "hello, testfunc() was here" ) )
- ? EXIT_SUCCESS : EXIT_FAILURE;
+ // call remote procedure and check the return value is correct
+ return ( throwok && ( cc.testfunc("hello") == "hello, testfunc() was here" ) )
+ ? EXIT_SUCCESS : EXIT_FAILURE;
}
// the automatically generated server side code (cmd_group_t2nexample class)
#include "t2nexample_server.hxx"
-int main(int argc, char** argv) {
- // create local socket server (a.k.a "unix domain socket")
- // if you want to to create a tcp/ip server you pass the port to the constructor (for details take a look at the socket_server class documentation)
- libt2n::socket_server ss("./socket");
- libt2n::group_command_server<cmd_group_t2nexample> cs(ss);
+int main(int argc, char** argv)
+{
+ // don't kill the server on broken pipe
+ signal(SIGPIPE, SIG_IGN);
- // handle requests
- while(true)
- cs.handle();
+ // create local socket server (a.k.a "unix domain socket")
+ // if you want to to create a tcp/ip server you pass the port to the constructor
+ // (for details take a look at the socket_server class documentation)
+ libt2n::socket_server ss("./socket");
+ libt2n::group_command_server<cmd_group_t2nexample> cs(ss);
- return 0;
+ // handle requests
+ while(true)
+ cs.handle();
+
+ return 0;
}
using namespace libt2n;
-int main(int argc, char** argv) {
- socket_server ss("./socket");
- socket_server ss_other("./socket_other");
- group_command_server<cmd_group_default> cs(ss);
- group_command_server<cmd_group_other> cs_other(ss_other);
-
- // handle requests
- while(true) {
- cs.handle(1000);
- cs_other.handle(1000);
- }
-
- return 0;
+int main(int argc, char** argv)
+{
+ // don't kill the server on broken pipe
+ signal(SIGPIPE, SIG_IGN);
+
+ socket_server ss("./socket");
+ socket_server ss_other("./socket_other");
+ group_command_server<cmd_group_default> cs(ss);
+ group_command_server<cmd_group_other> cs_other(ss_other);
+
+ // handle requests
+ while(true)
+ {
+ cs.handle(1000);
+ cs_other.handle(1000);
+ }
+
+ return 0;
}
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
+#include <signal.h>
+
#include <string>
#include "minimalistic-stub.hxx"
using namespace libt2n;
-int main(int argc, char** argv) {
- extended_type_info_test();
+int main(int argc, char** argv)
+{
+ extended_type_info_test();
+
+ // don't kill us on broken pipe
+ signal(SIGPIPE, SIG_IGN);
- socket_server ss("./socket");
- group_command_server<cmd_group_example> cs(ss);
+ socket_server ss("./socket");
+ group_command_server<cmd_group_example> cs(ss);
- // handle requests (without timeout)
- while(true)
- cs.handle();
+ // handle requests (without timeout)
+ while(true)
+ cs.handle();
- return 0;
+ return 0;
}
unsigned int conn_id;
while (s.get_packet(packet,conn_id))
- handle_packet(packet,s.get_connection(conn_id));
+ {
+ server_connection* conn=s.get_connection(conn_id);
+ if (!conn)
+ EXCEPTIONSTREAM(error,logic_error,"illegal connection id " << conn_id << " received");
+
+ try
+ { handle_packet(packet,conn); }
+ catch (t2n_transfer_error &e)
+ {
+ // shut down a connection with transfer errors (usually write errors)
+ conn->close();
+ }
+ catch(...)
+ { throw; }
+ }
}
s.cleanup();
}
void handle(long long usec_timeout=-1, long long* usec_timeout_remaining=NULL);
void send_hello(unsigned int conn_id);
+
+ std::ostream* get_logstream(log_level_values level)
+ { return s.get_logstream(level); }
};
template<class T, class B> struct Derived_from {
: sock(_sock)
, recv_buffer_size( default_recv_buffer_size )
, write_block_size( default_write_block_size )
+, write_timeout( default_write_timeout )
, socket_type(_socket_type)
{
}
* @brief set new size for the data chunks when writeing.
* @param new_write_block_size the new chunk size.
*
- * The write block size determines the amound of data which is tried to write
+ * The write block size determines the amount of data which is tried to write
* to the socket when data needs to be sended.
- * Since writeing data is done in a loop, this does not limit the amunt of data which can
+ * Since writing data is done in a loop, this does not limit the amunt of data which can
* be written.
*
* The value is normalized to be at least 512 bytes and at max 32K bytes.
} //
+/**
+ * @brief set new timeout for writing a block
+ * @param new_write_timeout the new timeout in usecs, -1: wait endless
+ *
+ * The write timeout determines the maximum amount of time that is waited
+ * between writing each block. If the timeout is exceeded, write will
+ * throw t2n_transfer_error
+ */
+void socket_handler::set_write_timeout(long long new_write_timeout)
+{
+ write_block_size=new_write_timeout;
+} //
+
+
/** @brief check if new data is waiting on the raw socket
@param[in,out] usec_timeout wait until new data is found, max timeout usecs.
-1: wait endless
try_again=true;
}
else
- {
- LOGSTREAM(error,"error reading from socket : " << strerror(errno));
- // TODO: exception?
- return false;
- }
+ EXCEPTIONSTREAM(error,t2n_transfer_error,"error reading from socket : " << strerror(errno));
}
// End-of-file
write_size = data.size()-offset;
int rtn;
- while ((rtn=::write(sock, data.data()+offset, write_size)) &&
- rtn == -1 && (errno == EAGAIN || errno == EINTR))
+ while ((rtn=::write(sock, data.data()+offset, write_size)) == -1 &&
+ (errno == EAGAIN || errno == EINTR))
{
- fd_set write_set[1];
- fd_set except_set[1];
- FD_ZERO(write_set);
- FD_ZERO(except_set);
- FD_SET(sock, write_set);
- FD_SET(sock, except_set);
- // let's wait for the socket to become writable again...
- //TODO: use a timeout here?
- int n= ::select(sock+1, NULL, write_set, except_set, NULL);
- if (n==1 && ! FD_ISSET(sock,write_set) && FD_ISSET(sock, except_set))
- {
- // if we are selected but cannot write and have an exception
- // we have serious trouble...
- LOGSTREAM(error,"exception on socket; cannot write any more.");
- //TODO: exception?
- return;
- }
+ wait_ready_to_write(sock,write_timeout);
LOGSTREAM(debug,"resuming write() call after EAGAIN or EINTR");
}
if (rtn == -1)
- {
- LOGSTREAM(error,"write() returned " << strerror(errno));
- // TODO: exception?
- return;
- }
+ EXCEPTIONSTREAM(error,t2n_transfer_error,"write() returned " << strerror(errno));
else if (rtn != write_size)
{
LOGSTREAM(debug,"write() wrote " << rtn << " bytes, should have been "
return;
} // eo socket_handler::socket_write(const std::string&)
+/// wait until the socket is ready to write again
+void socket_handler::wait_ready_to_write(int socket, long long write_block_timeout)
+{
+ // prepare socket sets
+ fd_set write_set[1];
+ fd_set except_set[1];
+ FD_ZERO(write_set);
+ FD_ZERO(except_set);
+ FD_SET(socket, write_set);
+ FD_SET(socket, except_set);
+
+ // prepare timeout struct
+ struct timeval tval;
+ struct timeval *timeout_ptr;
+
+ if (write_block_timeout == -1)
+ timeout_ptr = NULL;
+ else
+ {
+ timeout_ptr = &tval;
+
+ // convert timeout from long long usec to int sec + int usec
+ tval.tv_sec = write_block_timeout / 1000000;
+ tval.tv_usec = write_block_timeout % 1000000;
+ }
+
+ // let's wait for the socket to become writable again...
+ int rtn;
+ while ((rtn=::select(socket+1, NULL, write_set, except_set, timeout_ptr)) ==-1 && errno == EINTR);
+
+ if (rtn > 0 && (!FD_ISSET(socket,write_set)) && FD_ISSET(socket, except_set))
+ {
+ // if we are selected but cannot write and have an exception
+ // we have serious trouble...
+ EXCEPTIONSTREAM(error,t2n_transfer_error,"exception on socket; cannot write any more.");
+ }
+
+ if (rtn==0)
+ EXCEPTIONSTREAM(error,t2n_transfer_error,"timeout on select() for write");
+
+ if (rtn==-1)
+ EXCEPTIONSTREAM(error,t2n_transfer_error,"cannot select() for write: " << strerror(errno));
+}
+
+
}
private:
static const unsigned int default_recv_buffer_size=2048;
static const unsigned int default_write_block_size=4096;
+ static const long long default_write_timeout=30000000;
socket_type_value socket_type;
bool data_waiting(long long usec_timeout,long long *timeout_remaining=NULL);
+ void wait_ready_to_write(int socket, long long write_block_timeout);
protected:
int sock;
unsigned int recv_buffer_size;
unsigned int write_block_size;
+ long long write_timeout;
socket_handler(int _sock, socket_type_value _socket_type);
void set_recv_buffer_size(unsigned int new_recv_buffer_size);
void set_write_block_size(unsigned int new_write_block_size);
+ void set_write_timeout(long long new_write_timeout);
unsigned int get_recv_buffer_size() const { return recv_buffer_size; }
unsigned int get_write_block_size() const { return write_block_size; }
+ long long get_write_timeout() const { return write_timeout; }
};
}
int newsock = accept (sock,(struct sockaddr *) &clientname,&size);
if (newsock < 0)
{
- if (errno == EAGAIN)
+ // return on non-fatal errors (list taken from man-page)
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ECONNABORTED || errno == EINTR ||
+ errno == EMFILE || errno == ENFILE || errno == ENOBUFS || errno == ENOMEM ||
+ errno == EPROTO || errno == EPERM || errno == ETIMEDOUT)
{
- LOGSTREAM(error,"accept error (EAGAIN): no connection waiting");
+ LOGSTREAM(error,"non-fatal accept error: " << strerror(errno));
return;
}
- /* default: break */
- EXCEPTIONSTREAM(error,t2n_server_error,"error accepting connection: " << strerror(errno));
+ /* fatal error: will usually kill or restart the server */
+ EXCEPTIONSTREAM(error,t2n_server_error,"fatal error accepting connection: " << strerror(errno));
}
FD_SET (newsock, &connection_set);
std::map<unsigned int, server_connection*>::iterator ie=connections.end();
for(std::map<unsigned int, server_connection*>::iterator i=connections.begin(); i != ie; i++)
if (!i->second->server_connection::is_closed())
- if (i->second->fill_buffer(0))
- data_found=true;
+ {
+ // shutdown all connections which throw exceptions to protect the server
+ try
+ {
+ if (i->second->fill_buffer(0))
+ data_found=true;
+ }
+ catch (t2n_transfer_error &e)
+ { i->second->close(); }
+ catch(...)
+ { throw; }
+ }
return data_found;
}
{ socket_write(data); }
};
+// this is an evil hack to get access to real_write, don't ever do this in an app!!!
+class real_write_client_connection: public socket_client_connection
+{
+ public:
+ void real_write(const std::string& data)
+ { socket_write(data); }
+};
+
class test_timeout : public TestFixture
{
CPPUNIT_TEST_SUITE(test_timeout);
+ CPPUNIT_TEST(ConnectTimeout);
CPPUNIT_TEST(HelloTimeoutNothing);
CPPUNIT_TEST(HelloTimeoutSlowData);
CPPUNIT_TEST(CommandTimeout);
CPPUNIT_TEST(CommandSlowResponse);
+ CPPUNIT_TEST(DisconnectOnWrite);
+ CPPUNIT_TEST(DisconnectOnRead);
+ CPPUNIT_TEST(BreakAccept);
CPPUNIT_TEST_SUITE_END();
}
}
+ void ConnectTimeout()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ socket_server ss("./socket");
+
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ string data;
+
+ // wait till server is up
+ sleep(1);
+
+ string errormsg;
+
+ try
+ {
+ socket_client_connection sc("./socket");
+ command_client cc(sc,1000000,1000000);
+ }
+ catch(t2n_transfer_error &e)
+ { errormsg=e.what(); }
+ catch(...)
+ { throw; }
+
+ CPPUNIT_ASSERT_EQUAL(string("error reading from socket : Invalid argument"),errormsg);
+ }
+ }
+ }
+
void HelloTimeoutNothing()
{
pid_t pid;
}
}
+ void DisconnectOnWrite()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ socket_server ss("./socket");
+
+ // bail out as soon as we get something
+ ss.fill_buffer(-1);
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ string data;
+
+ // don't kill us on broken pipe
+ signal(SIGPIPE, SIG_IGN);
+
+ // wait till server is up
+ sleep(1);
+ socket_client_connection sc("./socket");
+
+ string errormsg;
+
+ string huge(1000000,'x');
+
+ try
+ {
+ sc.write(huge);
+ }
+ catch(t2n_transfer_error &e)
+ { errormsg=e.what(); }
+ catch(...)
+ { throw; }
+
+ CPPUNIT_ASSERT_EQUAL(string("write() returned Broken pipe"),errormsg);
+ }
+ }
+ }
+
+ void DisconnectOnRead()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ // wait till server is up
+ sleep(1);
+
+ socket_client_connection sc("./socket");
+
+ // this is an evil hack to get access to real_write, don't ever do this in an app!!!
+ real_write_client_connection *rwc=(real_write_client_connection*)≻
+ rwc->real_write(string(10000,'x'));
+
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ // don't kill us on broken pipe
+ signal(SIGPIPE, SIG_IGN);
+
+ socket_server ss("./socket");
+
+ time_t t0 = time(NULL);
+
+ // max 5 sec
+ while (time(NULL) < t0 + 5 )
+ {
+ ss.fill_buffer(1000000);
+
+ string data;
+ ss.get_packet(data);
+ }
+
+ // are we still alive and able to process data?
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ socket_client_connection *sc=new socket_client_connection("./socket");
+ sc->write(string(10000,'x'));
+ delete sc;
+ // socket is closed regularly
+
+ // don't run regular cleanup, otherwise cppunit stuff gets called
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ string received;
+
+ t0 = time(NULL);
+
+ // max 10 sec
+ while (time(NULL) < t0 + 10 )
+ {
+ ss.fill_buffer(1000000);
+
+ if (ss.get_packet(received))
+ break;
+ }
+
+ CPPUNIT_ASSERT_EQUAL(string(10000,'x'),received);
+ }
+ }
+ }
+ }
+ }
+
+ void BreakAccept()
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ // wait till server is really up and waiting
+ sleep(2);
+
+ // connect with very tight timeout and only 1 retry
+ socket_client_connection sc("./socket",50,1);
+
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ // don't kill us on broken pipe
+ signal(SIGPIPE, SIG_IGN);
+
+ socket_server ss("./socket");
+
+ // server is "working" while client wants to connect
+ sleep(5);
+
+ time_t t0 = time(NULL);
+
+ // max 5 sec
+ while (time(NULL) < t0 + 5 )
+ {
+ ss.fill_buffer(1000000);
+
+ string data;
+ ss.get_packet(data);
+ }
+
+ // are we still alive and able to process data?
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ socket_client_connection *sc=new socket_client_connection("./socket");
+ sc->write(string(10000,'x'));
+ delete sc;
+ // socket is closed regularly
+
+ // don't run regular cleanup, otherwise cppunit stuff gets called
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ string received;
+
+ t0 = time(NULL);
+
+ // max 10 sec
+ while (time(NULL) < t0 + 10 )
+ {
+ ss.fill_buffer(1000000);
+
+ if (ss.get_packet(received))
+ break;
+ }
+
+ CPPUNIT_ASSERT_EQUAL(string(10000,'x'),received);
+ }
+ }
+ }
+ }
+ }
};
CPPUNIT_TEST_SUITE_REGISTRATION(test_timeout);