RPC now uses stream instead of an array for protobuf parsing and serialization. This change removes the limit on the byte size of a protobuf.

Because we are now using a new protobuf API - ParseFromBoundedZeroCopyStream(), the protobuf version has to be >= 2.2.0

Fixes issue 14
This commit is contained in:
Srivats P. 2010-10-18 17:25:27 +05:30
parent c5fbceebb4
commit 90fda499dd
8 changed files with 98 additions and 38 deletions

View File

@ -5,7 +5,6 @@ win32:RC_FILE = ostinato.rc
macx:ICON = icons/logo.icns
QT += network script
INCLUDEPATH += "../rpc/" "../common/"
LIBS += -lprotobuf
win32 {
CONFIG(debug, debug|release) {
LIBS += -L"../common/debug" -lostproto
@ -25,6 +24,7 @@ win32 {
LIBS += -L"../rpc" -lpbrpc
POST_TARGETDEPS += "../common/libostproto.a" "../rpc/libpbrpc.a"
}
LIBS += -lprotobuf
RESOURCES += ostinato.qrc
HEADERS += \
dumpview.h \

42
rpc/pbqtio.h Normal file
View File

@ -0,0 +1,42 @@
#ifndef _PBQTIO_H
#define _PBQTIO_H
#include <google/protobuf/io/zero_copy_stream_impl.h>
class PbQtInputStream : public google::protobuf::io::CopyingInputStream
{
public:
PbQtInputStream(QIODevice *dev)
: dev_(dev) {};
int Read(void *buffer, int size) {
_top:
if (dev_->bytesAvailable())
return dev_->read(static_cast<char*>(buffer), size);
else
if (dev_->waitForReadyRead(-1))
goto _top;
else
return -1; //return dev_->atEnd() ? 0 : -1;
}
private:
QIODevice *dev_;
};
class PbQtOutputStream : public google::protobuf::io::CopyingOutputStream
{
public:
PbQtOutputStream(QIODevice *dev)
: dev_(dev) {};
bool Write(const void *buffer, int size) {
if (dev_->write(static_cast<const char*>(buffer), size) == size)
return true;
else
return false;
}
private:
QIODevice *dev_;
};
#endif

View File

@ -3,5 +3,5 @@ CONFIG += qt staticlib
QT += network
DEFINES += HAVE_REMOTE
LIBS += -lprotobuf
HEADERS += rpcserver.h pbrpccontroller.h pbrpcchannel.h
HEADERS += rpcserver.h pbrpccontroller.h pbrpcchannel.h pbqtio.h
SOURCES += rpcserver.cpp pbrpcchannel.cpp

View File

@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#include "pbrpcchannel.h"
#include "pbqtio.h"
#include <qendian.h>
@ -34,6 +35,13 @@ PbRpcChannel::PbRpcChannel(QHostAddress ip, quint16 port)
mServerPort = port;
mpSocket = new QTcpSocket(this);
inStream = new google::protobuf::io::CopyingInputStreamAdaptor(
new PbQtInputStream(mpSocket));
inStream->SetOwnsCopyingStream(true);
outStream = new google::protobuf::io::CopyingOutputStreamAdaptor(
new PbQtOutputStream(mpSocket));
outStream->SetOwnsCopyingStream(true);
// FIXME: Not quite sure why this ain't working!
// QMetaObject::connectSlotsByName(this);
@ -53,6 +61,8 @@ PbRpcChannel::PbRpcChannel(QHostAddress ip, quint16 port)
PbRpcChannel::~PbRpcChannel()
{
delete inStream;
delete outStream;
delete mpSocket;
}
@ -84,7 +94,7 @@ void PbRpcChannel::CallMethod(
::google::protobuf::Message *response,
::google::protobuf::Closure* done)
{
char msgBuf[MSGBUF_SIZE];
char msgBuf[PB_HDR_SIZE];
char* const msg = &msgBuf[0];
int len;
bool ret;
@ -128,9 +138,6 @@ void PbRpcChannel::CallMethod(
this->response=response;
isPending = true;
ret = req->SerializeToArray((void*)(msg+PB_HDR_SIZE), sizeof(msgBuf)-PB_HDR_SIZE);
Q_ASSERT(ret == true);
len = req->ByteSize();
*((quint16*)(msg+0)) = qToBigEndian(quint16(PB_MSG_TYPE_REQUEST)); // type
*((quint16*)(msg+2)) = qToBigEndian(quint16(method->index())); // method id
@ -141,15 +148,18 @@ void PbRpcChannel::CallMethod(
{
qDebug("client(%s) sending %d bytes encoding <%s>", __FUNCTION__,
PB_HDR_SIZE + len, req->DebugString().c_str());
BUFDUMP(msg, PB_HDR_SIZE + len);
BUFDUMP(msg, PB_HDR_SIZE);
}
mpSocket->write(msg, PB_HDR_SIZE + len);
mpSocket->write(msg, PB_HDR_SIZE);
ret = req->SerializeToZeroCopyStream(outStream);
Q_ASSERT(ret == true);
outStream->Flush();
}
void PbRpcChannel::on_mpSocket_readyRead()
{
uchar msg[MSGBUF_SIZE];
uchar msg[PB_HDR_SIZE];
uchar *p = (uchar*) &msg;
int msgLen;
static bool parsing = false;
@ -210,31 +220,20 @@ void PbRpcChannel::on_mpSocket_readyRead()
if (!isPending)
{
qDebug("not waiting for response");
goto _error_exit;
goto _error_exit2;
}
if (pendingMethodId != method)
{
qDebug("invalid method id %d (expected = %d)", method,
pendingMethodId);
goto _error_exit;
goto _error_exit2;
}
break;
}
case PB_MSG_TYPE_RESPONSE:
// Wait till we have the entire message
if (mpSocket->bytesAvailable() < len)
{
qDebug("client: not enough data available for a complete msg");
return;
}
msgLen = mpSocket->read((char*)msg, sizeof(msg));
Q_ASSERT((unsigned) msgLen == len);
//qDebug("client(%s) rcvd %d bytes", __FUNCTION__, msgLen);
//BUFDUMP(msg, msgLen);
@ -251,7 +250,8 @@ void PbRpcChannel::on_mpSocket_readyRead()
goto _error_exit;
}
response->ParseFromArray((void*) msg, len);
if (len)
response->ParseFromBoundedZeroCopyStream(inStream, len);
// Avoid printing stats
if (method != 13)
@ -295,6 +295,8 @@ void PbRpcChannel::on_mpSocket_readyRead()
return;
_error_exit:
inStream->Skip(len);
_error_exit2:
parsing = false;
qDebug("client(%s) discarding received msg", __FUNCTION__);
return;

View File

@ -23,6 +23,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include <QTcpServer>
#include <QTcpSocket>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/service.h>
@ -64,6 +65,9 @@ class PbRpcChannel : public QObject, public ::google::protobuf::RpcChannel
quint16 mServerPort;
QTcpSocket *mpSocket;
::google::protobuf::io::CopyingInputStreamAdaptor *inStream;
::google::protobuf::io::CopyingOutputStreamAdaptor *outStream;
public:
PbRpcChannel(QHostAddress ip, quint16 port);
~PbRpcChannel();

View File

@ -36,6 +36,4 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#define PB_MSG_TYPE_RESPONSE 2
#define PB_MSG_TYPE_BINBLOB 3
#define MSGBUF_SIZE 4096
#endif

View File

@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
//#include "pbhelper.h"
#include "rpcserver.h"
#include "pbqtio.h"
#include <qendian.h>
@ -29,6 +30,9 @@ RpcServer::RpcServer()
service = NULL;
inStream = NULL;
outStream = NULL;
isPending = false;
pendingMethodId = -1; // don't care as long as isPending is false
}
@ -71,7 +75,7 @@ void RpcServer::done(PbRpcController *controller)
{
google::protobuf::Message *response = controller->response();
QIODevice *blob;
char msgBuf[MSGBUF_SIZE];
char msgBuf[PB_HDR_SIZE];
char* const msg = &msgBuf[0];
int len;
@ -116,8 +120,6 @@ void RpcServer::done(PbRpcController *controller)
goto _exit;
}
response->SerializeToArray((void*)(msg+PB_HDR_SIZE), sizeof(msgBuf)-PB_HDR_SIZE);
len = response->ByteSize();
*((quint16*)(msg+0)) = qToBigEndian(quint16(PB_MSG_TYPE_RESPONSE)); // type
@ -132,7 +134,9 @@ void RpcServer::done(PbRpcController *controller)
//BUFDUMP(msg, len + 8);
}
clientSock->write(msg, PB_HDR_SIZE + len);
clientSock->write(msg, PB_HDR_SIZE);
response->SerializeToZeroCopyStream(outStream);
outStream->Flush();
_exit:
delete controller;
@ -159,6 +163,12 @@ void RpcServer::when_newConnection()
qDebug("accepting new connection from %s: %d",
clientSock->peerAddress().toString().toAscii().constData(),
clientSock->peerPort());
inStream = new google::protobuf::io::CopyingInputStreamAdaptor(
new PbQtInputStream(clientSock));
inStream->SetOwnsCopyingStream(true);
outStream = new google::protobuf::io::CopyingOutputStreamAdaptor(
new PbQtOutputStream(clientSock));
outStream->SetOwnsCopyingStream(true);
connect(clientSock, SIGNAL(readyRead()),
this, SLOT(when_dataAvail()));
@ -177,6 +187,9 @@ void RpcServer::when_disconnected()
clientSock->peerAddress().toString().toAscii().constData(),
clientSock->peerPort());
delete inStream;
delete outStream;
clientSock->deleteLater();
clientSock = NULL;
}
@ -189,7 +202,7 @@ void RpcServer::when_error(QAbstractSocket::SocketError socketError)
void RpcServer::when_dataAvail()
{
uchar msg[MSGBUF_SIZE];
uchar msg[PB_HDR_SIZE];
int msgLen;
static bool parsing = false;
static quint16 type, method;
@ -215,12 +228,6 @@ void RpcServer::when_dataAvail()
parsing = true;
}
if (clientSock->bytesAvailable() < len)
return;
msgLen = clientSock->read((char*)msg, sizeof(msg));
Q_ASSERT((unsigned) msgLen == len);
if (type != PB_MSG_TYPE_REQUEST)
{
qDebug("server(%s): unexpected msg type %d (expected %d)", __FUNCTION__,
@ -247,7 +254,9 @@ void RpcServer::when_dataAvail()
req = service->GetRequestPrototype(methodDesc).New();
resp = service->GetResponsePrototype(methodDesc).New();
req->ParseFromArray((void*)msg, len);
if (len)
req->ParseFromBoundedZeroCopyStream(inStream, len);
if (!req->IsInitialized())
{
qWarning("Missing required fields in request");
@ -256,7 +265,7 @@ void RpcServer::when_dataAvail()
delete req;
delete resp;
goto _error_exit;
goto _error_exit2;
}
//qDebug("Server(%s): successfully parsed as <%s>", __FUNCTION__,
//resp->DebugString().c_str());
@ -273,6 +282,8 @@ void RpcServer::when_dataAvail()
return;
_error_exit:
inStream->Skip(len);
_error_exit2:
parsing = false;
qDebug("server(%s): discarding msg from client", __FUNCTION__);
return;

View File

@ -23,6 +23,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/service.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <QTcpServer>
#include <QTcpSocket>
@ -39,6 +40,8 @@ class RpcServer : public QObject
QTcpSocket *clientSock;
::google::protobuf::Service *service;
::google::protobuf::io::CopyingInputStreamAdaptor *inStream;
::google::protobuf::io::CopyingOutputStreamAdaptor *outStream;
bool isPending;
int pendingMethodId;