const char* T2nSingletonWrapperMessages::NotInitializedMessage = "T2nSingletonWrapper used before setting initializing connection";
+/// get pointer to logging stream, returns NULL if no logging needed
+std::ostream* ConnectionWrapper::get_logstream(log_level_values level)
+{
+ if (logstream && log_level >= level)
+ return logstream;
+ else
+ return NULL;
+}
+
+/// activate logging to the given stream. everything above the given level is logged.
+void ConnectionWrapper::set_logging(std::ostream *_logstream, log_level_values _log_level)
+{
+ log_level=_log_level;
+ logstream=_logstream;
+}
+
+/// always call this when you got a new connection to transfer logging settings
+void ConnectionWrapper::set_logging_on_connection(client_connection& c)
+{
+ if (logstream != NULL && log_level > none)
+ c.set_logging(logstream,log_level);
+}
+
}
class ConnectionWrapper
{
- public:
+ private:
+ long long command_timeout_usec;
+ long long hello_timeout_usec;
- virtual client_connection* get_connection()=0;
+ protected:
+ log_level_values log_level;
+ std::ostream *logstream;
+ void set_logging_on_connection(client_connection& c);
- virtual long long get_command_timeout_usec(void)
- {
- return command_client::command_timeout_usec_default;
- }
+ public:
+ ConnectionWrapper()
+ : log_level(none), logstream(NULL),
+ command_timeout_usec(command_client::command_timeout_usec_default),
+ hello_timeout_usec(command_client::hello_timeout_usec_default)
+ { }
- virtual long long get_hello_timeout_usec(void)
- {
- return command_client::hello_timeout_usec_default;
- }
+ virtual ~ConnectionWrapper()
+ { }
+
+ virtual client_connection* get_connection()=0;
virtual void handle(command_client* stubBase, boost::function< void() > f)
{
f();
}
- virtual ~ConnectionWrapper()
- { }
+ long long get_command_timeout_usec(void)
+ { return command_timeout_usec; }
+
+ void set_command_timeout_usec(long long _command_timeout_usec)
+ { command_timeout_usec=_command_timeout_usec; }
+
+ long long get_hello_timeout_usec(void)
+ { return hello_timeout_usec; }
+
+ void set_hello_timeout_usec(long long _hello_timeout_usec)
+ { hello_timeout_usec=_hello_timeout_usec; }
+
+ void set_logging(std::ostream *_logstream, log_level_values _log_level);
+
+ std::ostream* get_logstream(log_level_values level);
};
// contains the internal stuff needed for T2nWrapper
if (SingletonObject.get() != NULL)
SingletonObject.reset();
}
- static ConnectionWrapper* get_connection(void)
+ static ConnectionWrapper* get_connection_wrapper(void)
{ return WrappedConnection.get(); }
static void ensure_singleton_there(void)
#include "socket_client.hxx"
#include "t2n_exception.hxx"
+#include "log.hxx"
using namespace std;
{
socket_client_connection::socket_client_connection(int _port, const std::string& _server,
- long long _connect_timeout_usec, int _max_retries)
+ long long _connect_timeout_usec, int _max_retries,
+ std::ostream *_logstream, log_level_values _log_level)
: client_connection(), socket_handler(0,tcp_s)
{
max_retries=_max_retries;
server=_server;
port=_port;
+ set_logging(_logstream,_log_level);
+
tcp_connect(max_retries);
+
+ do_callbacks(new_connection);
}
socket_client_connection::socket_client_connection(const std::string& _path,
- long long _connect_timeout_usec, int _max_retries)
+ long long _connect_timeout_usec, int _max_retries,
+ std::ostream *_logstream, log_level_values _log_level)
: client_connection(), socket_handler(0,unix_s)
{
max_retries=_max_retries;
path=_path;
+ set_logging(_logstream,_log_level);
+
unix_connect(max_retries);
+
+ do_callbacks(new_connection);
}
void socket_client_connection::tcp_connect(int max_retries)
{
// recurse if retries left
if (max_retries > 0)
+ {
+ LOGSTREAM(debug,"retrying connect after connect error");
tcp_connect(max_retries-1);
+ }
+ else
+ LOGSTREAM(debug,"no more retries left after connect error");
}
-
- do_callbacks(new_connection);
}
void socket_client_connection::unix_connect(int max_retries)
{
// recurse if retries left
if (max_retries > 0)
+ {
+ LOGSTREAM(debug,"retrying connect after connect error");
unix_connect(max_retries-1);
+ }
+ else
+ LOGSTREAM(debug,"no more retries left after connect error");
}
-
- do_callbacks(new_connection);
}
void socket_client_connection::connect_with_timeout(struct sockaddr *sock_addr,unsigned int sockaddr_size)
{
set_socket_options(sock);
+ LOGSTREAM(debug,"connect_with_timeout()");
int ret=::connect(sock,sock_addr, sockaddr_size);
if (ret < 0)
{
if (errno==EINPROGRESS)
{
+ LOGSTREAM(debug,"connect_with_timeout(): EINPROGRESS");
+
/* set timeout */
struct timeval tval;
struct timeval *timeout_ptr;
ret < 0 && errno==EINTR);
if (ret < 0)
+ {
+ LOGSTREAM(debug,"connect_with_timeout(): select error: " << strerror(errno));
throw t2n_connect_error(string("connect() error (select): ")+strerror(errno));
+ }
socklen_t sopt=sizeof(int);
int valopt;
ret=getsockopt(sock, SOL_SOCKET, SO_ERROR, (void*)(&valopt), &sopt);
if (ret < 0 || valopt)
+ {
+ LOGSTREAM(debug,"connect_with_timeout(): getsockopt error: " << strerror(errno));
throw t2n_connect_error(string("connect() error (getsockopt): ")+strerror(errno));
+ }
}
else
+ {
+ LOGSTREAM(debug,"connect_with_timeout(): error: " << strerror(errno));
throw t2n_connect_error(string("connect() error: ")+strerror(errno));
+ }
}
+
+ LOGSTREAM(debug,"connect_with_timeout(): success");
}
void socket_client_connection::close()
*/
void socket_client_connection::reconnect()
{
+ LOGSTREAM(debug,"reconnect()");
+
// close the current connection if still open
close();
else if (type == unix_s)
unix_connect(max_retries);
+ LOGSTREAM(debug,"reconnect(): basic connection established");
+
reopen();
+
+ LOGSTREAM(debug,"reconnect() done, client_connection::is_closed() now " << client_connection::is_closed());
+
}
}
std::string server;
int port;
+ protected:
+
std::ostream* get_logstream(log_level_values level)
{ return client_connection::get_logstream(level); }
public:
socket_client_connection(int _port, const std::string& _server="127.0.0.1",
long long _connect_timeout_usec=connect_timeout_usec_default,
- int _max_retries=max_retries_default);
+ int _max_retries=max_retries_default,
+ std::ostream *_logstream=NULL, log_level_values _log_level=none);
socket_client_connection(const std::string& _path,
long long _connect_timeout_usec=connect_timeout_usec_default,
- int _max_retries=max_retries_default);
+ int _max_retries=max_retries_default,
+ std::ostream *_logstream=NULL, log_level_values _log_level=none);
/** @brief read data from the socket and copy it into buffer
@param usec_timeout wait until new data is found, max timeout usecs.
/// by the connection class you are using.
void socket_handler::close()
{
+ LOGSTREAM(debug,"close connection");
// graceful shutdown
shutdown(sock,SHUT_RDWR);
::close(sock);
{
if (socket_type == tcp_s)
c=std::auto_ptr<socket_client_connection>
- (new socket_client_connection(port,server,connect_timeout_usec,max_retries));
+ (new socket_client_connection(port,server,connect_timeout_usec,max_retries,logstream,log_level));
else if (socket_type == unix_s)
c=std::auto_ptr<socket_client_connection>
- (new socket_client_connection(path,connect_timeout_usec,max_retries));
+ (new socket_client_connection(path,connect_timeout_usec,max_retries,logstream,log_level));
}
return c.get();
long long _connect_timeout_usec=socket_client_connection::connect_timeout_usec_default,
int _max_retries=socket_client_connection::max_retries_default)
: port(_port), server(_server), connect_timeout_usec(_connect_timeout_usec),
- max_retries(_max_retries), socket_type(tcp_s)
+ max_retries(_max_retries), socket_type(tcp_s), ConnectionWrapper()
{ }
BasicSocketWrapper(const std::string& _path,
long long _connect_timeout_usec=socket_client_connection::connect_timeout_usec_default,
int _max_retries=socket_client_connection::max_retries_default)
: path(_path), connect_timeout_usec(_connect_timeout_usec),
- max_retries(_max_retries), socket_type(unix_s)
+ max_retries(_max_retries), socket_type(unix_s), ConnectionWrapper()
{ }
client_connection* get_connection(void);
#include <cppunit/ui/text/TestRunner.h>
#include <cppunit/extensions/HelperMacros.h>
+#include <boost/archive/binary_oarchive.hpp>
+#include <boost/archive/binary_iarchive.hpp>
+#include <boost/archive/xml_oarchive.hpp>
+#include <boost/archive/xml_iarchive.hpp>
+#include <boost/serialization/serialization.hpp>
+#include <boost/serialization/export.hpp>
+
+#include <container.hxx>
+#include <socket_client.hxx>
+#include <socket_server.hxx>
#include <command_client.hxx>
+#include <command_server.hxx>
#include <client_wrapper.hxx>
#include <socket_wrapper.hxx>
using namespace libt2n;
using namespace CppUnit;
-class testme : public command_client
+// the server part
+
+stringstream logstream;
+bool close_server=false;
+bool kill_server=false;
+
+int serverfunc(int i)
+{
+ // magic commands
+ if (i==42)
+ close_server=true;
+ if (i==666)
+ kill_server=true;
+
+ return i+1;
+}
+
+std::string getserverlog(void)
+{
+ return logstream.str();
+}
+
+class serverfunc_res : public libt2n::result
+{
+ private:
+ int res;
+
+ friend class boost::serialization::access;
+ template<class Archive>
+ void serialize(Archive & ar, const unsigned int version)
+ {
+ ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(libt2n::result);
+ ar & BOOST_SERIALIZATION_NVP(res);
+ }
+
+ public:
+ serverfunc_res()
+ { }
+
+ serverfunc_res(int i)
+ {
+ res=i;
+ }
+
+ int get_data()
+ {
+ return res;
+ }
+};
+
+class getserverlog_res : public libt2n::result
+{
+ private:
+ std::string res;
+
+ friend class boost::serialization::access;
+ template<class Archive>
+ void serialize(Archive & ar, const unsigned int version)
+ {
+ ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(libt2n::result);
+ ar & BOOST_SERIALIZATION_NVP(res);
+ }
+
+ public:
+ getserverlog_res()
+ { }
+
+ getserverlog_res(std::string s)
+ {
+ res=s;
+ }
+
+ std::string get_data()
+ {
+ return res;
+ }
+};
+
+class cmd_group_x : public command
+{
+ private:
+ friend class boost::serialization::access;
+ template<class Archive>
+ void serialize(Archive & ar, const unsigned int version)
+ {
+ ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(libt2n::command);
+ }
+};
+
+class serverfunc_cmd : public cmd_group_x
+{
+ private:
+ int param;
+
+ friend class boost::serialization::access;
+ template<class Archive>
+ void serialize(Archive & ar, const unsigned int version)
+ {
+ ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(cmd_group_x);
+ ar & BOOST_SERIALIZATION_NVP(param);
+ }
+
+ public:
+ serverfunc_cmd()
+ { }
+
+ serverfunc_cmd(int i)
+ {
+ param=i;
+ }
+
+ libt2n::result* operator()()
+ {
+ return new serverfunc_res(serverfunc(param));
+ }
+};
+
+class getserverlog_cmd : public cmd_group_x
{
+ private:
+ friend class boost::serialization::access;
+ template<class Archive>
+ void serialize(Archive & ar, const unsigned int version)
+ {
+ ar & BOOST_SERIALIZATION_BASE_OBJECT_NVP(cmd_group_x);
+ }
+
public:
+ getserverlog_cmd()
+ { }
- testme(client_connection &x, long long a, long long b)
- : command_client(x,100000,10000)
- { }
+ libt2n::result* operator()()
+ {
+ return new getserverlog_res(getserverlog());
+ }
+};
+
+BOOST_CLASS_EXPORT(serverfunc_res)
+BOOST_CLASS_EXPORT(getserverlog_res)
+BOOST_CLASS_EXPORT(cmd_group_x)
+BOOST_CLASS_EXPORT(serverfunc_cmd)
+BOOST_CLASS_EXPORT(getserverlog_cmd)
- void helloworld(const std::string& text)
+class cmd_group_x_client : public command_client
+{
+ public:
+ cmd_group_x_client(libt2n::client_connection &_c,
+ long long _command_timeout_usec=command_timeout_usec_default,
+ long long _hello_timeout_usec=hello_timeout_usec_default)
+ : libt2n::command_client(_c,_command_timeout_usec,_hello_timeout_usec)
+ {}
+
+ int serverfunc(int i)
{
- std::cout << "Hello world, " << text << std::endl;
+ libt2n::result_container rc;
+
+ send_command(new serverfunc_cmd(i), rc);
+ serverfunc_res* res=dynamic_cast<serverfunc_res*>(rc.get_result());
+ if (!res) throw libt2n::t2n_communication_error("result object of wrong type");
+ return res->get_data();
+ }
+
+ std::string getserverlog(void)
+ {
+ libt2n::result_container rc;
+
+ send_command(new getserverlog_cmd(), rc);
+ getserverlog_res* res=dynamic_cast<getserverlog_res*>(rc.get_result());
+ if (!res) throw libt2n::t2n_communication_error("result object of wrong type");
+ return res->get_data();
}
};
-typedef T2nSingletonWrapper<testme> wraptype;
+typedef T2nSingletonWrapper<cmd_group_x_client> wraptype;
template<>
std::auto_ptr<wraptype> wraptype::SingletonObject = std::auto_ptr<wraptype>();
{
CPPUNIT_TEST_SUITE(test_wrapper);
+ CPPUNIT_TEST(no_init_exception); // must be called first!!!
CPPUNIT_TEST(simple_wrap);
+ CPPUNIT_TEST(double_use);
+ CPPUNIT_TEST(double_use_with_close);
+ CPPUNIT_TEST(reconnect_after_close);
+ CPPUNIT_TEST(reconnect_not_possible);
+
+// TODO: missing tests:
+// ignore: init, use, server die, ignore
+// ignore: init, no server, ignore
+// ignore: init, no server, ignore, server ok, connect?
CPPUNIT_TEST_SUITE_END();
public:
void setUp()
- { }
+ {
+ pid_t pid;
+
+ switch(pid=fork())
+ {
+ case -1:
+ {
+ CPPUNIT_FAIL("fork error");
+ break;
+ }
+ case 0:
+ // child
+ {
+ int i=0;
+ while(i < 15 && !kill_server)
+ {
+ close_server=false;
+
+ socket_server ss("./socket");
+ group_command_server<cmd_group_x> cs(ss);
+ ss.set_logging(&logstream,debug);
+
+ // max 10 sec
+ for (; !close_server && !kill_server && i < 15; i++)
+ cs.handle(1000000);
+ }
+
+ // don't call atexit and stuff
+ _exit(0);
+ }
+
+ default:
+ // parent
+ {
+ // wait till server is up
+ sleep(1);
+
+ }
+ }
+ }
void tearDown()
{ }
void simple_wrap()
{
-// t2n_exec(&testme::helloworld)("gurke");
+ wraptype::set_connection(auto_ptr<ConnectionWrapper>
+ (new BasicSocketWrapper("./socket")));
+
+ int i=t2n_exec(&cmd_group_x_client::serverfunc)(1);
+
+ CPPUNIT_ASSERT_EQUAL(2,i);
+ }
+
+ void no_init_exception()
+ {
+ CPPUNIT_ASSERT_THROW(t2n_exec(&cmd_group_x_client::serverfunc)(1),std::logic_error);
+ }
+
+ void double_use()
+ {
+ // only one connection used?
+ wraptype::set_connection(auto_ptr<ConnectionWrapper>
+ (new BasicSocketWrapper("./socket")));
+
+ t2n_exec(&cmd_group_x_client::serverfunc)(17);
+ string out=t2n_exec(&cmd_group_x_client::getserverlog)();
+
+ // count the number of times that "new connection accepted" appears in the server log
+ string::size_type p=0;
+ int cnt=0;
+ while ((p=out.find("new connection accepted",p))++ != string::npos)
+ cnt++;
+
+ CPPUNIT_ASSERT_EQUAL(1,cnt);
+ }
+
+ void double_use_with_close()
+ {
+ wraptype::set_connection(auto_ptr<ConnectionWrapper>
+ (new BasicSocketWrapper("./socket")));
+
+ t2n_exec(&cmd_group_x_client::serverfunc)(17);
+
+ // closes the connection from the client side
+ wraptype::set_connection(auto_ptr<ConnectionWrapper>
+ (new BasicSocketWrapper("./socket")));
+
+ string out=t2n_exec(&cmd_group_x_client::getserverlog)();
+
+ // count the number of times that "new connection accepted" appears in the server log
+ string::size_type p=0;
+ int cnt=0;
+ while ((p=out.find("new connection accepted",p))++ != string::npos)
+ cnt++;
+
+ CPPUNIT_ASSERT_EQUAL(2,cnt);
+ }
+
+ void reconnect_after_close()
+ {
+ wraptype::set_connection(auto_ptr<ConnectionWrapper>
+ (new ReconnectSocketWrapper("./socket")));
+
+ wraptype::get_connection_wrapper()->set_command_timeout_usec(3000000);
+ wraptype::get_connection_wrapper()->set_hello_timeout_usec(3000000);
+
+ // 42 closes connection on the server side
+ t2n_exec(&cmd_group_x_client::serverfunc)(42);
+
+ string out=t2n_exec(&cmd_group_x_client::getserverlog)();
+
+ // count the number of times that "new connection accepted" appears in the server log
+ string::size_type p=0;
+ int cnt=0;
+ while ((p=out.find("new connection accepted",p))++ != string::npos)
+ cnt++;
+
+ CPPUNIT_ASSERT_EQUAL(2,cnt);
+ }
+
+ void reconnect_not_possible()
+ {
+ wraptype::set_connection(auto_ptr<ConnectionWrapper>
+ (new ReconnectSocketWrapper("./socket")));
+
+ // the server doens't like the beast
+ t2n_exec(&cmd_group_x_client::serverfunc)(666);
- CPPUNIT_ASSERT_EQUAL(true,true);
+ CPPUNIT_ASSERT_THROW(t2n_exec(&cmd_group_x_client::serverfunc)(1),t2n_communication_error);
}