diff --git a/client/ostinato.pro b/client/ostinato.pro index 299ecc5..e61730e 100644 --- a/client/ostinato.pro +++ b/client/ostinato.pro @@ -5,7 +5,6 @@ win32:RC_FILE = ostinato.rc macx:ICON = icons/logo.icns QT += network script INCLUDEPATH += "../rpc/" "../common/" -LIBS += -lprotobuf win32 { CONFIG(debug, debug|release) { LIBS += -L"../common/debug" -lostproto @@ -25,6 +24,7 @@ win32 { LIBS += -L"../rpc" -lpbrpc POST_TARGETDEPS += "../common/libostproto.a" "../rpc/libpbrpc.a" } +LIBS += -lprotobuf RESOURCES += ostinato.qrc HEADERS += \ dumpview.h \ diff --git a/rpc/pbqtio.h b/rpc/pbqtio.h new file mode 100644 index 0000000..33d36a4 --- /dev/null +++ b/rpc/pbqtio.h @@ -0,0 +1,42 @@ +#ifndef _PBQTIO_H +#define _PBQTIO_H + +#include + +class PbQtInputStream : public google::protobuf::io::CopyingInputStream +{ +public: + PbQtInputStream(QIODevice *dev) + : dev_(dev) {}; + int Read(void *buffer, int size) { + _top: + if (dev_->bytesAvailable()) + return dev_->read(static_cast(buffer), size); + else + if (dev_->waitForReadyRead(-1)) + goto _top; + else + return -1; //return dev_->atEnd() ? 0 : -1; + } + +private: + QIODevice *dev_; +}; + +class PbQtOutputStream : public google::protobuf::io::CopyingOutputStream +{ +public: + PbQtOutputStream(QIODevice *dev) + : dev_(dev) {}; + bool Write(const void *buffer, int size) { + if (dev_->write(static_cast(buffer), size) == size) + return true; + else + return false; + } + +private: + QIODevice *dev_; +}; + +#endif diff --git a/rpc/pbrpc.pro b/rpc/pbrpc.pro index 27ec61b..f53fa81 100644 --- a/rpc/pbrpc.pro +++ b/rpc/pbrpc.pro @@ -3,5 +3,5 @@ CONFIG += qt staticlib QT += network DEFINES += HAVE_REMOTE LIBS += -lprotobuf -HEADERS += rpcserver.h pbrpccontroller.h pbrpcchannel.h +HEADERS += rpcserver.h pbrpccontroller.h pbrpcchannel.h pbqtio.h SOURCES += rpcserver.cpp pbrpcchannel.cpp diff --git a/rpc/pbrpcchannel.cpp b/rpc/pbrpcchannel.cpp index 47f1611..7c7789e 100644 --- a/rpc/pbrpcchannel.cpp +++ b/rpc/pbrpcchannel.cpp @@ -18,6 +18,7 @@ along with this program. If not, see */ #include "pbrpcchannel.h" +#include "pbqtio.h" #include @@ -34,6 +35,13 @@ PbRpcChannel::PbRpcChannel(QHostAddress ip, quint16 port) mServerPort = port; mpSocket = new QTcpSocket(this); + inStream = new google::protobuf::io::CopyingInputStreamAdaptor( + new PbQtInputStream(mpSocket)); + inStream->SetOwnsCopyingStream(true); + outStream = new google::protobuf::io::CopyingOutputStreamAdaptor( + new PbQtOutputStream(mpSocket)); + outStream->SetOwnsCopyingStream(true); + // FIXME: Not quite sure why this ain't working! // QMetaObject::connectSlotsByName(this); @@ -53,6 +61,8 @@ PbRpcChannel::PbRpcChannel(QHostAddress ip, quint16 port) PbRpcChannel::~PbRpcChannel() { + delete inStream; + delete outStream; delete mpSocket; } @@ -84,7 +94,7 @@ void PbRpcChannel::CallMethod( ::google::protobuf::Message *response, ::google::protobuf::Closure* done) { - char msgBuf[MSGBUF_SIZE]; + char msgBuf[PB_HDR_SIZE]; char* const msg = &msgBuf[0]; int len; bool ret; @@ -128,9 +138,6 @@ void PbRpcChannel::CallMethod( this->response=response; isPending = true; - ret = req->SerializeToArray((void*)(msg+PB_HDR_SIZE), sizeof(msgBuf)-PB_HDR_SIZE); - Q_ASSERT(ret == true); - len = req->ByteSize(); *((quint16*)(msg+0)) = qToBigEndian(quint16(PB_MSG_TYPE_REQUEST)); // type *((quint16*)(msg+2)) = qToBigEndian(quint16(method->index())); // method id @@ -141,15 +148,18 @@ void PbRpcChannel::CallMethod( { qDebug("client(%s) sending %d bytes encoding <%s>", __FUNCTION__, PB_HDR_SIZE + len, req->DebugString().c_str()); - BUFDUMP(msg, PB_HDR_SIZE + len); + BUFDUMP(msg, PB_HDR_SIZE); } - mpSocket->write(msg, PB_HDR_SIZE + len); + mpSocket->write(msg, PB_HDR_SIZE); + ret = req->SerializeToZeroCopyStream(outStream); + Q_ASSERT(ret == true); + outStream->Flush(); } void PbRpcChannel::on_mpSocket_readyRead() { - uchar msg[MSGBUF_SIZE]; + uchar msg[PB_HDR_SIZE]; uchar *p = (uchar*) &msg; int msgLen; static bool parsing = false; @@ -210,31 +220,20 @@ void PbRpcChannel::on_mpSocket_readyRead() if (!isPending) { qDebug("not waiting for response"); - goto _error_exit; + goto _error_exit2; } if (pendingMethodId != method) { qDebug("invalid method id %d (expected = %d)", method, pendingMethodId); - goto _error_exit; + goto _error_exit2; } break; } case PB_MSG_TYPE_RESPONSE: - // Wait till we have the entire message - if (mpSocket->bytesAvailable() < len) - { - qDebug("client: not enough data available for a complete msg"); - return; - } - - msgLen = mpSocket->read((char*)msg, sizeof(msg)); - - Q_ASSERT((unsigned) msgLen == len); - //qDebug("client(%s) rcvd %d bytes", __FUNCTION__, msgLen); //BUFDUMP(msg, msgLen); @@ -251,7 +250,8 @@ void PbRpcChannel::on_mpSocket_readyRead() goto _error_exit; } - response->ParseFromArray((void*) msg, len); + if (len) + response->ParseFromBoundedZeroCopyStream(inStream, len); // Avoid printing stats if (method != 13) @@ -295,6 +295,8 @@ void PbRpcChannel::on_mpSocket_readyRead() return; _error_exit: + inStream->Skip(len); +_error_exit2: parsing = false; qDebug("client(%s) discarding received msg", __FUNCTION__); return; diff --git a/rpc/pbrpcchannel.h b/rpc/pbrpcchannel.h index 682c553..e3f9096 100644 --- a/rpc/pbrpcchannel.h +++ b/rpc/pbrpcchannel.h @@ -23,6 +23,7 @@ along with this program. If not, see #include #include +#include #include #include #include @@ -64,6 +65,9 @@ class PbRpcChannel : public QObject, public ::google::protobuf::RpcChannel quint16 mServerPort; QTcpSocket *mpSocket; + ::google::protobuf::io::CopyingInputStreamAdaptor *inStream; + ::google::protobuf::io::CopyingOutputStreamAdaptor *outStream; + public: PbRpcChannel(QHostAddress ip, quint16 port); ~PbRpcChannel(); diff --git a/rpc/pbrpccommon.h b/rpc/pbrpccommon.h index 0f4acf9..e1fbdf9 100644 --- a/rpc/pbrpccommon.h +++ b/rpc/pbrpccommon.h @@ -36,6 +36,4 @@ along with this program. If not, see #define PB_MSG_TYPE_RESPONSE 2 #define PB_MSG_TYPE_BINBLOB 3 -#define MSGBUF_SIZE 4096 - #endif diff --git a/rpc/rpcserver.cpp b/rpc/rpcserver.cpp index d48de21..2d1bd63 100644 --- a/rpc/rpcserver.cpp +++ b/rpc/rpcserver.cpp @@ -19,6 +19,7 @@ along with this program. If not, see //#include "pbhelper.h" #include "rpcserver.h" +#include "pbqtio.h" #include @@ -29,6 +30,9 @@ RpcServer::RpcServer() service = NULL; + inStream = NULL; + outStream = NULL; + isPending = false; pendingMethodId = -1; // don't care as long as isPending is false } @@ -71,7 +75,7 @@ void RpcServer::done(PbRpcController *controller) { google::protobuf::Message *response = controller->response(); QIODevice *blob; - char msgBuf[MSGBUF_SIZE]; + char msgBuf[PB_HDR_SIZE]; char* const msg = &msgBuf[0]; int len; @@ -116,8 +120,6 @@ void RpcServer::done(PbRpcController *controller) goto _exit; } - response->SerializeToArray((void*)(msg+PB_HDR_SIZE), sizeof(msgBuf)-PB_HDR_SIZE); - len = response->ByteSize(); *((quint16*)(msg+0)) = qToBigEndian(quint16(PB_MSG_TYPE_RESPONSE)); // type @@ -132,7 +134,9 @@ void RpcServer::done(PbRpcController *controller) //BUFDUMP(msg, len + 8); } - clientSock->write(msg, PB_HDR_SIZE + len); + clientSock->write(msg, PB_HDR_SIZE); + response->SerializeToZeroCopyStream(outStream); + outStream->Flush(); _exit: delete controller; @@ -159,6 +163,12 @@ void RpcServer::when_newConnection() qDebug("accepting new connection from %s: %d", clientSock->peerAddress().toString().toAscii().constData(), clientSock->peerPort()); + inStream = new google::protobuf::io::CopyingInputStreamAdaptor( + new PbQtInputStream(clientSock)); + inStream->SetOwnsCopyingStream(true); + outStream = new google::protobuf::io::CopyingOutputStreamAdaptor( + new PbQtOutputStream(clientSock)); + outStream->SetOwnsCopyingStream(true); connect(clientSock, SIGNAL(readyRead()), this, SLOT(when_dataAvail())); @@ -177,6 +187,9 @@ void RpcServer::when_disconnected() clientSock->peerAddress().toString().toAscii().constData(), clientSock->peerPort()); + delete inStream; + delete outStream; + clientSock->deleteLater(); clientSock = NULL; } @@ -189,7 +202,7 @@ void RpcServer::when_error(QAbstractSocket::SocketError socketError) void RpcServer::when_dataAvail() { - uchar msg[MSGBUF_SIZE]; + uchar msg[PB_HDR_SIZE]; int msgLen; static bool parsing = false; static quint16 type, method; @@ -215,12 +228,6 @@ void RpcServer::when_dataAvail() parsing = true; } - if (clientSock->bytesAvailable() < len) - return; - - msgLen = clientSock->read((char*)msg, sizeof(msg)); - Q_ASSERT((unsigned) msgLen == len); - if (type != PB_MSG_TYPE_REQUEST) { qDebug("server(%s): unexpected msg type %d (expected %d)", __FUNCTION__, @@ -247,7 +254,9 @@ void RpcServer::when_dataAvail() req = service->GetRequestPrototype(methodDesc).New(); resp = service->GetResponsePrototype(methodDesc).New(); - req->ParseFromArray((void*)msg, len); + if (len) + req->ParseFromBoundedZeroCopyStream(inStream, len); + if (!req->IsInitialized()) { qWarning("Missing required fields in request"); @@ -256,7 +265,7 @@ void RpcServer::when_dataAvail() delete req; delete resp; - goto _error_exit; + goto _error_exit2; } //qDebug("Server(%s): successfully parsed as <%s>", __FUNCTION__, //resp->DebugString().c_str()); @@ -273,6 +282,8 @@ void RpcServer::when_dataAvail() return; _error_exit: + inStream->Skip(len); +_error_exit2: parsing = false; qDebug("server(%s): discarding msg from client", __FUNCTION__); return; diff --git a/rpc/rpcserver.h b/rpc/rpcserver.h index 1e9c93e..76f179a 100644 --- a/rpc/rpcserver.h +++ b/rpc/rpcserver.h @@ -23,6 +23,7 @@ along with this program. If not, see #include #include #include +#include #include #include @@ -39,6 +40,8 @@ class RpcServer : public QObject QTcpSocket *clientSock; ::google::protobuf::Service *service; + ::google::protobuf::io::CopyingInputStreamAdaptor *inStream; + ::google::protobuf::io::CopyingOutputStreamAdaptor *outStream; bool isPending; int pendingMethodId;