From b470ed84bbbec489d6593d7d6f408db2e956743a Mon Sep 17 00:00:00 2001 From: "Srivats P." Date: Sat, 17 May 2014 18:45:02 +0530 Subject: [PATCH] Reworked the multi-threaded RPC server/connection architecture into a worker-thread arch as reccomended by Qt --- rpc/pbrpc.pro | 4 +- rpc/pbrpccontroller.h | 3 +- rpc/{rpcthread.cpp => rpcconn.cpp} | 74 ++++++++++++++---------------- rpc/{rpcthread.h => rpcconn.h} | 28 ++++++----- rpc/rpcserver.cpp | 27 +++++++++-- rpc/rpcserver.h | 33 ------------- 6 files changed, 75 insertions(+), 94 deletions(-) rename rpc/{rpcthread.cpp => rpcconn.cpp} (82%) rename rpc/{rpcthread.h => rpcconn.h} (72%) diff --git a/rpc/pbrpc.pro b/rpc/pbrpc.pro index f1ab422..ff28d8a 100644 --- a/rpc/pbrpc.pro +++ b/rpc/pbrpc.pro @@ -3,5 +3,5 @@ CONFIG += qt staticlib QT += network DEFINES += HAVE_REMOTE LIBS += -lprotobuf -HEADERS += rpcserver.h rpcthread.h pbrpccontroller.h pbrpcchannel.h pbqtio.h -SOURCES += rpcserver.cpp rpcthread.cpp pbrpcchannel.cpp +HEADERS += rpcserver.h rpcconn.h pbrpccontroller.h pbrpcchannel.h pbqtio.h +SOURCES += rpcserver.cpp rpcconn.cpp pbrpcchannel.cpp diff --git a/rpc/pbrpccontroller.h b/rpc/pbrpccontroller.h index fa11cdd..88782e7 100644 --- a/rpc/pbrpccontroller.h +++ b/rpc/pbrpccontroller.h @@ -1,5 +1,5 @@ /* -Copyright (C) 2010 Srivats P. +Copyright (C) 2010, 2014 Srivats P. This file is part of "Ostinato" @@ -20,6 +20,7 @@ along with this program. If not, see #ifndef _PB_RPC_CONTROLLER_H #define _PB_RPC_CONTROLLER_H +#include #include class QIODevice; diff --git a/rpc/rpcthread.cpp b/rpc/rpcconn.cpp similarity index 82% rename from rpc/rpcthread.cpp rename to rpc/rpcconn.cpp index 81ead4f..123c25d 100644 --- a/rpc/rpcthread.cpp +++ b/rpc/rpcconn.cpp @@ -17,28 +17,24 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -#include "rpcthread.h" +#include "rpcconn.h" #include "pbqtio.h" #include "pbrpccommon.h" +#include "pbrpccontroller.h" #include #include #include #include -#include "pbrpccontroller.h" // FIXME: move up the order and fix warnings - #include #include #include -RpcThread::RpcThread( - int socketDescriptor, - ::google::protobuf::Service *service, - QObject *parent) - : QThread(parent), - socketDescriptor(socketDescriptor), +RpcConnection::RpcConnection(int socketDescriptor, + ::google::protobuf::Service *service) + : socketDescriptor(socketDescriptor), service(service) { inStream = NULL; @@ -46,18 +42,29 @@ RpcThread::RpcThread( isPending = false; pendingMethodId = -1; // don't care as long as isPending is false - - moveToThread(this); } -RpcThread::~RpcThread() +RpcConnection::~RpcConnection() { + qDebug("destroying connection to %s: %d", + clientSock->peerAddress().toString().toAscii().constData(), + clientSock->peerPort()); + + // If still connected, disconnect + if (clientSock->state() != QAbstractSocket::UnconnectedState) { + clientSock->disconnectFromHost(); + clientSock->waitForDisconnected(); + } + + delete inStream; + delete outStream; + + delete clientSock; } -void RpcThread::run() +void RpcConnection::start() { clientSock = new QTcpSocket; - //qDebug("clientSock = %p, %p", clientSock, this->clientSock); if (!clientSock->setSocketDescriptor(socketDescriptor)) { qWarning("Unable to initialize TCP socket for incoming connection"); return; @@ -67,28 +74,21 @@ void RpcThread::run() clientSock->peerAddress().toString().toAscii().constData(), clientSock->peerPort()); inStream = new google::protobuf::io::CopyingInputStreamAdaptor( - new PbQtInputStream(clientSock)); + new PbQtInputStream(clientSock)); inStream->SetOwnsCopyingStream(true); outStream = new google::protobuf::io::CopyingOutputStreamAdaptor( - new PbQtOutputStream(clientSock)); + new PbQtOutputStream(clientSock)); outStream->SetOwnsCopyingStream(true); connect(clientSock, SIGNAL(readyRead()), - this, SLOT(when_dataAvail()), Qt::DirectConnection); + this, SLOT(on_clientSock_dataAvail())); connect(clientSock, SIGNAL(disconnected()), - this, SLOT(when_disconnected())); + this, SLOT(on_clientSock_disconnected())); connect(clientSock, SIGNAL(error(QAbstractSocket::SocketError)), - this, SLOT(when_error(QAbstractSocket::SocketError))); - - exec(); + this, SLOT(on_clientSock_error(QAbstractSocket::SocketError))); } -QString RpcThread::errorString() -{ - return errorString_; -} - -void RpcThread::done(PbRpcController *controller) +void RpcConnection::sendRpcReply(PbRpcController *controller) { google::protobuf::Message *response = controller->response(); QIODevice *blob; @@ -96,8 +96,6 @@ void RpcThread::done(PbRpcController *controller) char* const msg = &msgBuf[0]; int len; - //qDebug("In RpcThread::done"); - if (controller->Failed()) { qDebug("rpc failed"); @@ -160,28 +158,23 @@ _exit: isPending = false; } -void RpcThread::when_disconnected() +void RpcConnection::on_clientSock_disconnected() { qDebug("connection closed from %s: %d", clientSock->peerAddress().toString().toAscii().constData(), clientSock->peerPort()); - delete inStream; - delete outStream; - - clientSock->deleteLater(); - clientSock = NULL; - - quit(); + deleteLater(); + emit closed(); } -void RpcThread::when_error(QAbstractSocket::SocketError socketError) +void RpcConnection::on_clientSock_error(QAbstractSocket::SocketError socketError) { qDebug("%s (%d)", clientSock->errorString().toAscii().constData(), socketError); } -void RpcThread::when_dataAvail() +void RpcConnection::on_clientSock_dataAvail() { uchar msg[PB_HDR_SIZE]; int msgLen; @@ -256,7 +249,8 @@ void RpcThread::when_dataAvail() //qDebug("before service->callmethod()"); service->CallMethod(methodDesc, controller, req, resp, - google::protobuf::NewCallback(this, &RpcThread::done, controller)); + google::protobuf::NewCallback(this, &RpcConnection::sendRpcReply, + controller)); parsing = false; diff --git a/rpc/rpcthread.h b/rpc/rpcconn.h similarity index 72% rename from rpc/rpcthread.h rename to rpc/rpcconn.h index 5a9381c..ce5e5bd 100644 --- a/rpc/rpcthread.h +++ b/rpc/rpcconn.h @@ -17,10 +17,9 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -#ifndef _RPC_THREAD_H -#define _RPC_THREAD_H +#ifndef _RPC_CONNECTION_H +#define _RPC_CONNECTION_H -#include #include // forward declarations @@ -36,25 +35,25 @@ namespace google { } } -class RpcThread : public QThread +class RpcConnection : public QObject { Q_OBJECT public: - RpcThread(int socketDescriptor, - ::google::protobuf::Service *service, - QObject *parent); - virtual ~RpcThread(); - void run(); + RpcConnection(int socketDescriptor, ::google::protobuf::Service *service); + virtual ~RpcConnection(); private: - QString errorString(); // FIXME: needed? why? - void done(PbRpcController *controller); + void sendRpcReply(PbRpcController *controller); + +signals: + void closed(); private slots: - void when_disconnected(); - void when_dataAvail(); - void when_error(QAbstractSocket::SocketError socketError); + void start(); + void on_clientSock_dataAvail(); + void on_clientSock_error(QAbstractSocket::SocketError socketError); + void on_clientSock_disconnected(); private: int socketDescriptor; @@ -66,7 +65,6 @@ private: bool isPending; int pendingMethodId; - QString errorString_; }; #endif diff --git a/rpc/rpcserver.cpp b/rpc/rpcserver.cpp index e7affaf..6aa8f88 100644 --- a/rpc/rpcserver.cpp +++ b/rpc/rpcserver.cpp @@ -1,5 +1,5 @@ /* -Copyright (C) 2010 Srivats P. +Copyright (C) 2010, 2014 Srivats P. This file is part of "Ostinato" @@ -19,7 +19,17 @@ along with this program. If not, see #include "rpcserver.h" -#include "rpcthread.h" +#include "rpcconn.h" + +#include + +// FIXME: QThreadX till we change minimum version of Qt from Qt4.3+ to Qt4.4+ +class QThreadX: public QThread +{ +protected: + virtual ~QThreadX() { qDebug("QThreadX going down!"); } + void run() { exec(); } +}; RpcServer::RpcServer() { @@ -50,8 +60,19 @@ bool RpcServer::registerService(::google::protobuf::Service *service, void RpcServer::incomingConnection(int socketDescriptor) { - RpcThread *thread = new RpcThread(socketDescriptor, service, this); + QThread *thread = new QThreadX; // FIXME:QThreadX pending Qt4.4+ + RpcConnection *conn = new RpcConnection(socketDescriptor, service); + conn->moveToThread(thread); + + connect(thread, SIGNAL(started()), conn, SLOT(start())); + + // NOTE: conn "self-destructs" after emitting closed + // use 'closed' to stop execution of the thread + connect(conn, SIGNAL(closed()), thread, SLOT(quit())); + + // setup thread to "self-destruct" when it is done connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater())); + thread->start(); } diff --git a/rpc/rpcserver.h b/rpc/rpcserver.h index 15f7910..0a4acdd 100644 --- a/rpc/rpcserver.h +++ b/rpc/rpcserver.h @@ -20,17 +20,6 @@ along with this program. If not, see #ifndef _RPC_SERVER_H #define _RPC_SERVER_H -#if 0 -#include -#include -#include - -#include - -#include "pbrpccommon.h" -#include "pbrpccontroller.h" -#endif - #include // forward declaration @@ -44,18 +33,6 @@ class RpcServer : public QTcpServer { Q_OBJECT -#if 0 - QTcpServer *server; - QTcpSocket *clientSock; - - ::google::protobuf::io::CopyingInputStreamAdaptor *inStream; - ::google::protobuf::io::CopyingOutputStreamAdaptor *outStream; - - bool isPending; - int pendingMethodId; - QString errorString_; -#endif - public: RpcServer(); //! \todo (LOW) use 'parent' param virtual ~RpcServer(); @@ -66,16 +43,6 @@ public: protected: void incomingConnection(int socketDescriptor); -#if 0 - QString errorString(); - void done(PbRpcController *controller); - -private slots: - void when_newConnection(); - void when_disconnected(); - void when_dataAvail(); - void when_error(QAbstractSocket::SocketError socketError); -#endif private: ::google::protobuf::Service *service; };