ostinato/rpc/pbrpcchannel.cpp
Srivats P. 2581562ec5 Fixes
- Queued RPC calls would cause crashes due to invalid pointers to request/response and/or controller; this has been fixed
    - PbRpcController now takes ownership of request and response messages and
      will delete them when it itself is being deleted
    - This design mandates that request and response messages for each RPC call
      have to be allocated on the heap.
    - The convention for the Closure 'done' call now is to allocate and pass a
      pointer to the controller object to it which will delete it after use;
      this requires that controller itself be also allocated on the heap
      (NOTE: this is just a convention - not mandatory)
    - All existing RPC calls (in portgroup.cpp) have been changed to follow the
      above convention
- Reordering of queued RPC calls has been fixed
- PortManager is now destroyed at exit; because of this fix the per port temporary capture files are auto-removed at exit
- WinPcapPort destructor no longer deletes the monitor threads because the parent class PcapPort already does it
- Capture does not automatically (and incorrectly) stop after one packet if started immediately after a View Capture operation
- User is prompted to stop transmit on a port first if he tries to apply configuration changes on a port in 'transmit' state
2010-02-17 15:26:42 +00:00

315 lines
8.1 KiB
C++

#include "pbrpcchannel.h"
PbRpcChannel::PbRpcChannel(QHostAddress ip, quint16 port)
{
isPending = false;
pendingMethodId = -1; // don't care as long as isPending is false
controller = NULL;
done = NULL;
response = NULL;
mServerAddress = ip;
mServerPort = port;
mpSocket = new QTcpSocket(this);
// FIXME: Not quite sure why this ain't working!
// QMetaObject::connectSlotsByName(this);
connect(mpSocket, SIGNAL(connected()),
this, SLOT(on_mpSocket_connected()));
connect(mpSocket, SIGNAL(disconnected()),
this, SLOT(on_mpSocket_disconnected()));
connect(mpSocket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
this, SLOT(on_mpSocket_stateChanged(QAbstractSocket::SocketState)));
connect(mpSocket, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(on_mpSocket_error(QAbstractSocket::SocketError)));
connect(mpSocket, SIGNAL(readyRead()),
this, SLOT(on_mpSocket_readyRead()));
}
PbRpcChannel::~PbRpcChannel()
{
delete mpSocket;
}
void PbRpcChannel::establish()
{
qDebug("In %s", __FUNCTION__);
mpSocket->connectToHost(mServerAddress, mServerPort);
}
void PbRpcChannel::establish(QHostAddress ip, quint16 port)
{
mServerAddress = ip;
mServerPort = port;
establish();
}
void PbRpcChannel::tearDown()
{
qDebug("In %s", __FUNCTION__);
mpSocket->disconnectFromHost();
}
void PbRpcChannel::CallMethod(
const ::google::protobuf::MethodDescriptor *method,
::google::protobuf::RpcController *controller,
const ::google::protobuf::Message *req,
::google::protobuf::Message *response,
::google::protobuf::Closure* done)
{
char msg[MSGBUF_SIZE];
int len;
bool ret;
if (isPending)
{
RpcCall call;
qDebug("RpcChannel: queueing method %d since %d is pending; "
"queued message = <%s>",
method->index(), pendingMethodId, req->DebugString().c_str());
call.method = method;
call.controller = controller;
call.request = req;
call.response = response;
call.done = done;
pendingCallList.append(call);
qDebug("pendingCallList size = %d", pendingCallList.size());
Q_ASSERT(pendingCallList.size() < 100);
return;
}
if (!req->IsInitialized())
{
qWarning("RpcChannel: missing required fields in request");
qDebug(req->InitializationErrorString().c_str());
qFatal("exiting");
controller->SetFailed("Required fields missing");
done->Run();
return;
}
pendingMethodId = method->index();
this->controller=controller;
this->done=done;
this->response=response;
isPending = true;
ret = req->SerializeToArray((void*) (&msg[PB_HDR_SIZE]), sizeof(msg));
Q_ASSERT(ret == true);
len = req->ByteSize();
*((quint16*)(&msg[0])) = HTONS(PB_MSG_TYPE_REQUEST); // type
*((quint16*)(&msg[2])) = HTONS(method->index()); // method id
*((quint32*)(&msg[4])) = HTONL(len); // len
// Avoid printing stats since it happens every couple of seconds
if (pendingMethodId != 13)
{
qDebug("client(%s) sending %d bytes encoding <%s>", __FUNCTION__,
PB_HDR_SIZE + len, req->DebugString().c_str());
BUFDUMP(msg, PB_HDR_SIZE + len);
}
mpSocket->write(msg, PB_HDR_SIZE + len);
}
void PbRpcChannel::on_mpSocket_readyRead()
{
char msg[MSGBUF_SIZE];
char *p = (char*)&msg;
int msgLen;
static bool parsing = false;
static quint16 type, method;
static quint32 len;
//qDebug("%s: bytesAvail = %d", __FUNCTION__, mpSocket->bytesAvailable());
if (!parsing)
{
// Do we have an entire header? If not, we'll wait ...
if (mpSocket->bytesAvailable() < PB_HDR_SIZE)
{
qDebug("client: not enough data available for a complete header");
return;
}
msgLen = mpSocket->read(msg, PB_HDR_SIZE);
Q_ASSERT(msgLen == PB_HDR_SIZE);
type = NTOHS(GET16(p+0));
method = NTOHS(GET16(p+2));
len = NTOHL(GET32(p+4));
//BUFDUMP(msg, PB_HDR_SIZE);
//qDebug("type = %hu, method = %hu, len = %u", type, method, len);
parsing = true;
}
switch (type)
{
case PB_MSG_TYPE_BINBLOB:
{
static quint32 cumLen = 0;
QIODevice *blob;
blob = static_cast<PbRpcController*>(controller)->binaryBlob();
Q_ASSERT(blob != NULL);
while ((cumLen < len) && mpSocket->bytesAvailable())
{
int l;
l = mpSocket->read(msg, sizeof(msg));
blob->write(msg, l);
cumLen += l;
}
qDebug("%s: bin blob rcvd %d/%d", __PRETTY_FUNCTION__, cumLen, len);
if (cumLen < len)
return;
cumLen = 0;
if (!isPending)
{
qDebug("not waiting for response");
goto _error_exit;
}
if (pendingMethodId != method)
{
qDebug("invalid method id %d (expected = %d)", method,
pendingMethodId);
goto _error_exit;
}
break;
}
case PB_MSG_TYPE_RESPONSE:
// Wait till we have the entire message
if (mpSocket->bytesAvailable() < len)
{
qDebug("client: not enough data available for a complete msg");
return;
}
msgLen = mpSocket->read(msg, sizeof(msg));
Q_ASSERT((unsigned) msgLen == len);
//qDebug("client(%s) rcvd %d bytes", __FUNCTION__, msgLen);
//BUFDUMP(msg, msgLen);
if (!isPending)
{
qDebug("not waiting for response");
goto _error_exit;
}
if (pendingMethodId != method)
{
qDebug("invalid method id %d (expected = %d)", method,
pendingMethodId);
goto _error_exit;
}
response->ParseFromArray((void*) msg, len);
// Avoid printing stats
if (method != 13)
{
qDebug("client(%s): Parsed as %s", __FUNCTION__,
response->DebugString().c_str());
}
if (!response->IsInitialized())
{
qWarning("RpcChannel: missing required fields in response");
qDebug(response->InitializationErrorString().c_str());
controller->SetFailed("Required fields missing");
}
break;
default:
qFatal("%s: unexpected type %d", __PRETTY_FUNCTION__, type);
goto _error_exit;
}
done->Run();
pendingMethodId = -1;
controller = NULL;
response = NULL;
isPending = false;
parsing = false;
if (pendingCallList.size())
{
RpcCall call = pendingCallList.takeFirst();
qDebug("RpcChannel: executing queued method %d <%s>",
call.method->index(), call.request->DebugString().c_str());
CallMethod(call.method, call.controller, call.request, call.response,
call.done);
}
return;
_error_exit:
parsing = false;
qDebug("client(%s) discarding received msg", __FUNCTION__);
return;
}
void PbRpcChannel::on_mpSocket_stateChanged(
QAbstractSocket::SocketState socketState)
{
qDebug("In %s", __FUNCTION__);
emit stateChanged(socketState);
}
void PbRpcChannel::on_mpSocket_connected()
{
qDebug("In %s", __FUNCTION__);
emit connected();
}
void PbRpcChannel::on_mpSocket_disconnected()
{
qDebug("In %s", __FUNCTION__);
pendingMethodId = -1;
controller = NULL;
response = NULL;
isPending = false;
// \todo convert parsing from static to data member
//parsing = false
pendingCallList.clear();
emit disconnected();
}
void PbRpcChannel::on_mpSocket_error(QAbstractSocket::SocketError socketError)
{
qDebug("In %s", __FUNCTION__);
emit error(socketError);
}