Infra: On a port modify, all clients are sent notifications of the same - they can request for port config again to get latest config

Updates issue 144
This commit is contained in:
Srivats P. 2015-04-24 20:53:00 +05:30
parent fd22d49ead
commit ad6baea4af
16 changed files with 196 additions and 32 deletions

View File

@ -71,6 +71,11 @@ PortGroup::PortGroup(QHostAddress ip, quint16 port)
connect(rpcChannel, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(on_rpcChannel_error(QAbstractSocket::SocketError)));
connect(rpcChannel,
SIGNAL(notification(int, ::google::protobuf::Message*)),
this,
SLOT(on_rpcChannel_notification(int, ::google::protobuf::Message*)));
connect(this, SIGNAL(portListChanged(quint32)),
this, SLOT(when_portListChanged(quint32)), Qt::QueuedConnection);
}
@ -209,6 +214,50 @@ void PortGroup::on_rpcChannel_error(QAbstractSocket::SocketError socketError)
}
}
void PortGroup::on_rpcChannel_notification(int notifType,
::google::protobuf::Message *notification)
{
OstProto::Notification *notif =
dynamic_cast<OstProto::Notification*>(notification);
if (!notif) {
qWarning("unable to dynamic cast notif");
return;
}
if (notifType != notif->notif_type()) {
qWarning("notif type mismatch %d/%d msg = %s",
notifType, notif->notif_type(),
notification->DebugString().c_str());
return;
}
switch (notifType)
{
case OstProto::portConfigChanged: {
if (!notif->port_id_list().port_id_size()) {
qWarning("notif(portConfigChanged) has an empty port_id_list");
return;
}
OstProto::PortIdList *portIdList = new OstProto::PortIdList;
OstProto::PortConfigList *portConfigList =
new OstProto::PortConfigList;
PbRpcController *controller = new PbRpcController(portIdList,
portConfigList);
portIdList->CopyFrom(notif->port_id_list());
serviceStub->getPortConfig(controller, portIdList, portConfigList,
NewCallback(this, &PortGroup::processUpdatedPortConfig,
controller));
break;
}
default:
break;
}
}
void PortGroup::when_portListChanged(quint32 /*portGroupId*/)
{
if (state() == QAbstractSocket::ConnectedState && numPorts() <= 0)
@ -422,24 +471,10 @@ void PortGroup::processModifyPortAck(PbRpcController *controller)
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
goto _exit;
}
{
OstProto::PortIdList *portIdList = new OstProto::PortIdList;
OstProto::PortConfigList *portConfigList = new OstProto::PortConfigList;
PbRpcController *controller2 = new PbRpcController(portIdList,
portConfigList);
OstProto::PortId *portId = portIdList->add_port_id();
portId->CopyFrom(static_cast<OstProto::PortConfigList*>
(controller->request())->mutable_port(0)->port_id());
serviceStub->getPortConfig(controller, portIdList, portConfigList,
NewCallback(this, &PortGroup::processUpdatedPortConfig,
controller2));
}
_exit:
mainWindow->setEnabled(true);
QApplication::restoreOverrideCursor();
delete controller;
}
@ -457,9 +492,6 @@ void PortGroup::processUpdatedPortConfig(PbRpcController *controller)
goto _exit;
}
if (portConfigList->port_size() != 1)
qDebug("port size = %d (expected = 1)", portConfigList->port_size());
for(int i = 0; i < portConfigList->port_size(); i++)
{
uint id;
@ -473,8 +505,6 @@ void PortGroup::processUpdatedPortConfig(PbRpcController *controller)
_exit:
mainWindow->setEnabled(true);
QApplication::restoreOverrideCursor();
delete controller;
}

View File

@ -147,6 +147,9 @@ private slots:
void on_rpcChannel_disconnected();
void on_rpcChannel_error(QAbstractSocket::SocketError socketError);
void on_rpcChannel_notification(int notifType,
::google::protobuf::Message *notification);
void when_portListChanged(quint32 portGroupId);
public slots:

View File

@ -1,5 +1,5 @@
/*
Copyright (C) 2010 Srivats P.
Copyright (C) 2010-2015 Srivats P.
This file is part of "Ostinato"
@ -240,6 +240,15 @@ message PortStatsList {
repeated PortStats port_stats = 1;
}
enum NotifType {
portConfigChanged = 1;
}
message Notification {
required NotifType notif_type = 1;
optional PortIdList port_id_list = 6;
}
service OstService {
rpc getPortIdList(Void) returns (PortIdList);
rpc getPortConfig(PortIdList) returns (PortConfigList);

View File

@ -20,6 +20,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pbrpcchannel.h"
#include "pbqtio.h"
#include "../common/protocol.pb.h" // FIXME: temp
#include <qendian.h>
static uchar msgBuf[4096];
@ -169,6 +171,17 @@ void PbRpcChannel::on_mpSocket_readyRead()
static quint16 type, method;
static quint32 len;
/*
* FIXME(HI): This function has some serious bugs!
* # It should not read both directly from the socket and via instream;
* reading via instream will consume more than the msg being parsed,
* so next time we read from socket - we are in trouble!
* # It should ensure that it has read all the available data before
* it returns, otherwise the readyRead singal will not be raised
* again - so we'll never read another message even if they are all
* sitting in the buffer waiting for us!
*/
//qDebug("%s: bytesAvail = %d", __FUNCTION__, mpSocket->bytesAvailable());
if (!parsing)
@ -317,6 +330,47 @@ void PbRpcChannel::on_mpSocket_readyRead()
break;
}
case PB_MSG_TYPE_NOTIFY:
{
//qDebug("client(%s) rcvd %d bytes", __FUNCTION__, msgLen);
//BUFDUMP(msg, msgLen);
#if 0
notif = serviceStub->GetNotificationPrototype(method).New();
#else
notif = new OstProto::Notification;
#endif
if (!notif)
{
qWarning("invalid notif type %d", method);
goto _error_exit;
}
if (len)
notif->ParseFromBoundedZeroCopyStream(inStream, len);
qDebug("client(%s): Received Notif Msg <---- ", __FUNCTION__);
qDebug("type = %d\nnotif = \n%s\n---->",
method, notif->DebugString().c_str());
if (!notif->IsInitialized())
{
qWarning("RpcChannel: missing required fields in notify <----");
qDebug("notify = \n%s", notif->DebugString().c_str());
qDebug("error = \n%s\n--->",
notif->InitializationErrorString().c_str());
}
else
emit notification(method, notif);
delete notif;
notif = NULL;
parsing = false;
goto _exit;
break;
}
default:
qFatal("%s: unexpected type %d", __PRETTY_FUNCTION__, type);
goto _error_exit;
@ -342,6 +396,7 @@ void PbRpcChannel::on_mpSocket_readyRead()
call.done);
}
_exit:
return;
_error_exit:
@ -349,8 +404,7 @@ _error_exit:
_error_exit2:
parsing = false;
qDebug("client(%s) discarding received msg <----", __FUNCTION__);
qDebug("method = %d\nreq = \n%s\n---->",
method, response->DebugString().c_str());
qDebug("method = %d\n---->", method);
return;
}

View File

@ -61,6 +61,8 @@ class PbRpcChannel : public QObject, public ::google::protobuf::RpcChannel
} RpcCall;
QList<RpcCall> pendingCallList;
::google::protobuf::Message *notif;
QHostAddress mServerAddress;
quint16 mServerPort;
QTcpSocket *mpSocket;
@ -94,6 +96,8 @@ signals:
void error(QAbstractSocket::SocketError socketError);
void stateChanged(QAbstractSocket::SocketState socketState);
void notification(int notifType, ::google::protobuf::Message *notifData);
private slots:
void on_mpSocket_connected();
void on_mpSocket_disconnected();

View File

@ -27,14 +27,15 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
/*
** RPC Header (8)
** - MSG_TYPE (2)
** - METHOD_ID (2)
** - METHOD_ID/NOTIF_TYPE (2)
** - LEN (4) [not including this header]
*/
#define PB_HDR_SIZE 8
#define PB_MSG_TYPE_REQUEST 1
#define PB_MSG_TYPE_RESPONSE 2
#define PB_MSG_TYPE_RESPONSE 2
#define PB_MSG_TYPE_BINBLOB 3
#define PB_MSG_TYPE_ERROR 4
#define PB_MSG_TYPE_NOTIFY 5
#endif

View File

@ -194,6 +194,38 @@ _exit:
isPending = false;
}
void RpcConnection::sendNotification(int notifType,
::google::protobuf::Message *notifData)
{
char msgBuf[PB_HDR_SIZE];
char* const msg = &msgBuf[0];
int len;
if (!notifData->IsInitialized())
{
qWarning("notification missing required fields!! <----");
qDebug("notif = \n%s"
"missing = \n%s---->",
notifData->DebugString().c_str(),
notifData->InitializationErrorString().c_str());
qFatal("exiting");
return;
}
len = notifData->ByteSize();
writeHeader(msg, PB_MSG_TYPE_NOTIFY, notifType, len);
qDebug("Server(%s): sending %d bytes to client <----",
__FUNCTION__, len + PB_HDR_SIZE);
BUFDUMP(msg, 8);
qDebug("notif = %d\ndata = \n%s---->",
notifType, notifData->DebugString().c_str());
clientSock->write(msg, PB_HDR_SIZE);
notifData->SerializeToZeroCopyStream(outStream);
outStream->Flush();
}
void RpcConnection::on_clientSock_disconnected()
{
qDebug("connection closed from %s: %d",

View File

@ -32,6 +32,7 @@ namespace google {
class CopyingInputStreamAdaptor;
class CopyingOutputStreamAdaptor;
}
class Message;
}
}
@ -42,6 +43,7 @@ class RpcConnection : public QObject
public:
RpcConnection(int socketDescriptor, ::google::protobuf::Service *service);
virtual ~RpcConnection();
static void connIdMsgHandler(QtMsgType type, const char* msg);
private:
@ -52,6 +54,9 @@ private:
signals:
void closed();
public slots:
void sendNotification(int notifType, ::google::protobuf::Message *notifData);
private slots:
void start();
void on_clientSock_dataAvail();

View File

@ -76,5 +76,8 @@ void RpcServer::incomingConnection(int socketDescriptor)
// setup thread to "self-destruct" when it is done
connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
connect(this, SIGNAL(notifyClients(int, ::google::protobuf::Message*)),
conn, SLOT(sendNotification(int, ::google::protobuf::Message*)));
thread->start();
}

View File

@ -26,6 +26,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
namespace google {
namespace protobuf {
class Service;
class Message;
}
}
@ -40,6 +41,9 @@ public:
bool registerService(::google::protobuf::Service *service,
quint16 tcpPortNum);
signals:
void notifyClients(int notifType, ::google::protobuf::Message *notifData);
protected:
void incomingConnection(int socketDescriptor);

View File

@ -61,7 +61,7 @@ void AbstractPort::init()
bool AbstractPort::modify(const OstProto::Port &port)
{
bool ret = false;
bool ret = true;
//! \todo Use reflection to find out which fields are set
if (port.has_is_exclusive_control())
@ -78,7 +78,6 @@ bool AbstractPort::modify(const OstProto::Port &port)
if (port.has_user_name()) {
data_.set_user_name(port.user_name());
// TODO: notify other users
}
return ret;

View File

@ -49,5 +49,8 @@ bool Drone::init()
return false;
}
connect(service, SIGNAL(notification(int, ::google::protobuf::Message*)),
rpcServer, SIGNAL(notifyClients(int, ::google::protobuf::Message*)));
return true;
}

View File

@ -23,7 +23,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include <QObject>
class RpcServer;
namespace OstProto { class OstService; }
class MyService;
class Drone : public QObject
{
@ -35,6 +35,6 @@ public:
private:
RpcServer *rpcServer;
OstProto::OstService *service;
MyService *service;
};
#endif

View File

@ -30,7 +30,8 @@ win32 {
}
LIBS += -lm
LIBS += -lprotobuf
HEADERS += drone.h
HEADERS += drone.h \
myservice.h
SOURCES += \
drone_main.cpp \
drone.cpp \

View File

@ -110,6 +110,9 @@ void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/,
::OstProto::Ack* /*response*/,
::google::protobuf::Closure* done)
{
// notification needs to be on heap because signal/slot is across threads!
OstProto::Notification *notif = new OstProto::Notification;
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_size(); i++)
@ -124,11 +127,19 @@ void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/,
portLock[id]->lockForWrite();
portInfo[id]->modify(port);
portLock[id]->unlock();
notif->mutable_port_id_list()->add_port_id()->set_id(id);
}
}
//! \todo (LOW): fill-in response "Ack"????
done->Run();
if (notif->port_id_list().port_id_size()) {
notif->set_notif_type(OstProto::portConfigChanged);
emit notification(notif->notif_type(), notif);
// FIXME: who will free notif!
}
}
void MyService::getStreamIdList(::google::protobuf::RpcController* controller,

View File

@ -23,6 +23,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "../common/protocol.pb.h"
#include <QList>
#include <QObject>
#include <QReadWriteLock>
#define MAX_PKT_HDR_SIZE 1536
@ -30,8 +31,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
class AbstractPort;
class MyService: public OstProto::OstService
class MyService: public QObject, public OstProto::OstService
{
Q_OBJECT
public:
MyService();
virtual ~MyService();
@ -102,6 +104,9 @@ public:
::OstProto::VersionCompatibility* response,
::google::protobuf::Closure* done);
signals:
void notification(int notifType, ::google::protobuf::Message *notifData);
private:
/*
* NOTES: