diff --git a/client/portgroup.cpp b/client/portgroup.cpp index 8e20735..aaba1e4 100644 --- a/client/portgroup.cpp +++ b/client/portgroup.cpp @@ -71,6 +71,11 @@ PortGroup::PortGroup(QHostAddress ip, quint16 port) connect(rpcChannel, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(on_rpcChannel_error(QAbstractSocket::SocketError))); + connect(rpcChannel, + SIGNAL(notification(int, ::google::protobuf::Message*)), + this, + SLOT(on_rpcChannel_notification(int, ::google::protobuf::Message*))); + connect(this, SIGNAL(portListChanged(quint32)), this, SLOT(when_portListChanged(quint32)), Qt::QueuedConnection); } @@ -209,6 +214,50 @@ void PortGroup::on_rpcChannel_error(QAbstractSocket::SocketError socketError) } } +void PortGroup::on_rpcChannel_notification(int notifType, + ::google::protobuf::Message *notification) +{ + OstProto::Notification *notif = + dynamic_cast(notification); + + if (!notif) { + qWarning("unable to dynamic cast notif"); + return; + } + + if (notifType != notif->notif_type()) { + qWarning("notif type mismatch %d/%d msg = %s", + notifType, notif->notif_type(), + notification->DebugString().c_str()); + return; + } + + switch (notifType) + { + case OstProto::portConfigChanged: { + + if (!notif->port_id_list().port_id_size()) { + qWarning("notif(portConfigChanged) has an empty port_id_list"); + return; + } + + OstProto::PortIdList *portIdList = new OstProto::PortIdList; + OstProto::PortConfigList *portConfigList = + new OstProto::PortConfigList; + PbRpcController *controller = new PbRpcController(portIdList, + portConfigList); + + portIdList->CopyFrom(notif->port_id_list()); + serviceStub->getPortConfig(controller, portIdList, portConfigList, + NewCallback(this, &PortGroup::processUpdatedPortConfig, + controller)); + break; + } + default: + break; + } +} + void PortGroup::when_portListChanged(quint32 /*portGroupId*/) { if (state() == QAbstractSocket::ConnectedState && numPorts() <= 0) @@ -422,24 +471,10 @@ void PortGroup::processModifyPortAck(PbRpcController *controller) { qDebug("%s: rpc failed(%s)", __FUNCTION__, qPrintable(controller->ErrorString())); - goto _exit; } - { - OstProto::PortIdList *portIdList = new OstProto::PortIdList; - OstProto::PortConfigList *portConfigList = new OstProto::PortConfigList; - PbRpcController *controller2 = new PbRpcController(portIdList, - portConfigList); - - OstProto::PortId *portId = portIdList->add_port_id(); - portId->CopyFrom(static_cast - (controller->request())->mutable_port(0)->port_id()); - - serviceStub->getPortConfig(controller, portIdList, portConfigList, - NewCallback(this, &PortGroup::processUpdatedPortConfig, - controller2)); - } -_exit: + mainWindow->setEnabled(true); + QApplication::restoreOverrideCursor(); delete controller; } @@ -457,9 +492,6 @@ void PortGroup::processUpdatedPortConfig(PbRpcController *controller) goto _exit; } - if (portConfigList->port_size() != 1) - qDebug("port size = %d (expected = 1)", portConfigList->port_size()); - for(int i = 0; i < portConfigList->port_size(); i++) { uint id; @@ -473,8 +505,6 @@ void PortGroup::processUpdatedPortConfig(PbRpcController *controller) _exit: - mainWindow->setEnabled(true); - QApplication::restoreOverrideCursor(); delete controller; } diff --git a/client/portgroup.h b/client/portgroup.h index ccb8db8..820be79 100644 --- a/client/portgroup.h +++ b/client/portgroup.h @@ -147,6 +147,9 @@ private slots: void on_rpcChannel_disconnected(); void on_rpcChannel_error(QAbstractSocket::SocketError socketError); + void on_rpcChannel_notification(int notifType, + ::google::protobuf::Message *notification); + void when_portListChanged(quint32 portGroupId); public slots: diff --git a/common/protocol.proto b/common/protocol.proto index bce2b6a..09400b6 100644 --- a/common/protocol.proto +++ b/common/protocol.proto @@ -1,5 +1,5 @@ /* -Copyright (C) 2010 Srivats P. +Copyright (C) 2010-2015 Srivats P. This file is part of "Ostinato" @@ -240,6 +240,15 @@ message PortStatsList { repeated PortStats port_stats = 1; } +enum NotifType { + portConfigChanged = 1; +} + +message Notification { + required NotifType notif_type = 1; + optional PortIdList port_id_list = 6; +} + service OstService { rpc getPortIdList(Void) returns (PortIdList); rpc getPortConfig(PortIdList) returns (PortConfigList); diff --git a/rpc/pbrpcchannel.cpp b/rpc/pbrpcchannel.cpp index 516a023..8180a13 100644 --- a/rpc/pbrpcchannel.cpp +++ b/rpc/pbrpcchannel.cpp @@ -20,6 +20,8 @@ along with this program. If not, see #include "pbrpcchannel.h" #include "pbqtio.h" +#include "../common/protocol.pb.h" // FIXME: temp + #include static uchar msgBuf[4096]; @@ -169,6 +171,17 @@ void PbRpcChannel::on_mpSocket_readyRead() static quint16 type, method; static quint32 len; + /* + * FIXME(HI): This function has some serious bugs! + * # It should not read both directly from the socket and via instream; + * reading via instream will consume more than the msg being parsed, + * so next time we read from socket - we are in trouble! + * # It should ensure that it has read all the available data before + * it returns, otherwise the readyRead singal will not be raised + * again - so we'll never read another message even if they are all + * sitting in the buffer waiting for us! + */ + //qDebug("%s: bytesAvail = %d", __FUNCTION__, mpSocket->bytesAvailable()); if (!parsing) @@ -317,6 +330,47 @@ void PbRpcChannel::on_mpSocket_readyRead() break; } + case PB_MSG_TYPE_NOTIFY: + { + //qDebug("client(%s) rcvd %d bytes", __FUNCTION__, msgLen); + //BUFDUMP(msg, msgLen); +#if 0 + notif = serviceStub->GetNotificationPrototype(method).New(); +#else + notif = new OstProto::Notification; +#endif + if (!notif) + { + qWarning("invalid notif type %d", method); + goto _error_exit; + } + + if (len) + notif->ParseFromBoundedZeroCopyStream(inStream, len); + + qDebug("client(%s): Received Notif Msg <---- ", __FUNCTION__); + qDebug("type = %d\nnotif = \n%s\n---->", + method, notif->DebugString().c_str()); + + if (!notif->IsInitialized()) + { + qWarning("RpcChannel: missing required fields in notify <----"); + qDebug("notify = \n%s", notif->DebugString().c_str()); + qDebug("error = \n%s\n--->", + notif->InitializationErrorString().c_str()); + } + else + emit notification(method, notif); + + delete notif; + notif = NULL; + + parsing = false; + goto _exit; + + break; + } + default: qFatal("%s: unexpected type %d", __PRETTY_FUNCTION__, type); goto _error_exit; @@ -342,6 +396,7 @@ void PbRpcChannel::on_mpSocket_readyRead() call.done); } +_exit: return; _error_exit: @@ -349,8 +404,7 @@ _error_exit: _error_exit2: parsing = false; qDebug("client(%s) discarding received msg <----", __FUNCTION__); - qDebug("method = %d\nreq = \n%s\n---->", - method, response->DebugString().c_str()); + qDebug("method = %d\n---->", method); return; } diff --git a/rpc/pbrpcchannel.h b/rpc/pbrpcchannel.h index e3f9096..a4186df 100644 --- a/rpc/pbrpcchannel.h +++ b/rpc/pbrpcchannel.h @@ -61,6 +61,8 @@ class PbRpcChannel : public QObject, public ::google::protobuf::RpcChannel } RpcCall; QList pendingCallList; + ::google::protobuf::Message *notif; + QHostAddress mServerAddress; quint16 mServerPort; QTcpSocket *mpSocket; @@ -94,6 +96,8 @@ signals: void error(QAbstractSocket::SocketError socketError); void stateChanged(QAbstractSocket::SocketState socketState); + void notification(int notifType, ::google::protobuf::Message *notifData); + private slots: void on_mpSocket_connected(); void on_mpSocket_disconnected(); diff --git a/rpc/pbrpccommon.h b/rpc/pbrpccommon.h index 07c8013..fce0ba5 100644 --- a/rpc/pbrpccommon.h +++ b/rpc/pbrpccommon.h @@ -27,14 +27,15 @@ along with this program. If not, see /* ** RPC Header (8) ** - MSG_TYPE (2) -** - METHOD_ID (2) +** - METHOD_ID/NOTIF_TYPE (2) ** - LEN (4) [not including this header] */ #define PB_HDR_SIZE 8 #define PB_MSG_TYPE_REQUEST 1 -#define PB_MSG_TYPE_RESPONSE 2 +#define PB_MSG_TYPE_RESPONSE 2 #define PB_MSG_TYPE_BINBLOB 3 #define PB_MSG_TYPE_ERROR 4 +#define PB_MSG_TYPE_NOTIFY 5 #endif diff --git a/rpc/rpcconn.cpp b/rpc/rpcconn.cpp index f55a224..d1d193d 100644 --- a/rpc/rpcconn.cpp +++ b/rpc/rpcconn.cpp @@ -194,6 +194,38 @@ _exit: isPending = false; } +void RpcConnection::sendNotification(int notifType, + ::google::protobuf::Message *notifData) +{ + char msgBuf[PB_HDR_SIZE]; + char* const msg = &msgBuf[0]; + int len; + + if (!notifData->IsInitialized()) + { + qWarning("notification missing required fields!! <----"); + qDebug("notif = \n%s" + "missing = \n%s---->", + notifData->DebugString().c_str(), + notifData->InitializationErrorString().c_str()); + qFatal("exiting"); + return; + } + + len = notifData->ByteSize(); + writeHeader(msg, PB_MSG_TYPE_NOTIFY, notifType, len); + + qDebug("Server(%s): sending %d bytes to client <----", + __FUNCTION__, len + PB_HDR_SIZE); + BUFDUMP(msg, 8); + qDebug("notif = %d\ndata = \n%s---->", + notifType, notifData->DebugString().c_str()); + + clientSock->write(msg, PB_HDR_SIZE); + notifData->SerializeToZeroCopyStream(outStream); + outStream->Flush(); +} + void RpcConnection::on_clientSock_disconnected() { qDebug("connection closed from %s: %d", diff --git a/rpc/rpcconn.h b/rpc/rpcconn.h index e41d8e2..aebc79f 100644 --- a/rpc/rpcconn.h +++ b/rpc/rpcconn.h @@ -32,6 +32,7 @@ namespace google { class CopyingInputStreamAdaptor; class CopyingOutputStreamAdaptor; } + class Message; } } @@ -42,6 +43,7 @@ class RpcConnection : public QObject public: RpcConnection(int socketDescriptor, ::google::protobuf::Service *service); virtual ~RpcConnection(); + static void connIdMsgHandler(QtMsgType type, const char* msg); private: @@ -52,6 +54,9 @@ private: signals: void closed(); +public slots: + void sendNotification(int notifType, ::google::protobuf::Message *notifData); + private slots: void start(); void on_clientSock_dataAvail(); diff --git a/rpc/rpcserver.cpp b/rpc/rpcserver.cpp index dad4b76..232ea29 100644 --- a/rpc/rpcserver.cpp +++ b/rpc/rpcserver.cpp @@ -76,5 +76,8 @@ void RpcServer::incomingConnection(int socketDescriptor) // setup thread to "self-destruct" when it is done connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater())); + connect(this, SIGNAL(notifyClients(int, ::google::protobuf::Message*)), + conn, SLOT(sendNotification(int, ::google::protobuf::Message*))); + thread->start(); } diff --git a/rpc/rpcserver.h b/rpc/rpcserver.h index 0a4acdd..0fb4fb3 100644 --- a/rpc/rpcserver.h +++ b/rpc/rpcserver.h @@ -26,6 +26,7 @@ along with this program. If not, see namespace google { namespace protobuf { class Service; + class Message; } } @@ -40,6 +41,9 @@ public: bool registerService(::google::protobuf::Service *service, quint16 tcpPortNum); +signals: + void notifyClients(int notifType, ::google::protobuf::Message *notifData); + protected: void incomingConnection(int socketDescriptor); diff --git a/server/abstractport.cpp b/server/abstractport.cpp index cda4a35..21f443e 100644 --- a/server/abstractport.cpp +++ b/server/abstractport.cpp @@ -61,7 +61,7 @@ void AbstractPort::init() bool AbstractPort::modify(const OstProto::Port &port) { - bool ret = false; + bool ret = true; //! \todo Use reflection to find out which fields are set if (port.has_is_exclusive_control()) @@ -78,7 +78,6 @@ bool AbstractPort::modify(const OstProto::Port &port) if (port.has_user_name()) { data_.set_user_name(port.user_name()); - // TODO: notify other users } return ret; diff --git a/server/drone.cpp b/server/drone.cpp index c46f1df..894490f 100644 --- a/server/drone.cpp +++ b/server/drone.cpp @@ -49,5 +49,8 @@ bool Drone::init() return false; } + connect(service, SIGNAL(notification(int, ::google::protobuf::Message*)), + rpcServer, SIGNAL(notifyClients(int, ::google::protobuf::Message*))); + return true; } diff --git a/server/drone.h b/server/drone.h index 9474207..ab05fe1 100644 --- a/server/drone.h +++ b/server/drone.h @@ -23,7 +23,7 @@ along with this program. If not, see #include class RpcServer; -namespace OstProto { class OstService; } +class MyService; class Drone : public QObject { @@ -35,6 +35,6 @@ public: private: RpcServer *rpcServer; - OstProto::OstService *service; + MyService *service; }; #endif diff --git a/server/drone.pro b/server/drone.pro index d22f138..5022a5b 100644 --- a/server/drone.pro +++ b/server/drone.pro @@ -30,7 +30,8 @@ win32 { } LIBS += -lm LIBS += -lprotobuf -HEADERS += drone.h +HEADERS += drone.h \ + myservice.h SOURCES += \ drone_main.cpp \ drone.cpp \ diff --git a/server/myservice.cpp b/server/myservice.cpp index 55f3bf3..3be71b3 100644 --- a/server/myservice.cpp +++ b/server/myservice.cpp @@ -110,6 +110,9 @@ void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/, ::OstProto::Ack* /*response*/, ::google::protobuf::Closure* done) { + // notification needs to be on heap because signal/slot is across threads! + OstProto::Notification *notif = new OstProto::Notification; + qDebug("In %s", __PRETTY_FUNCTION__); for (int i = 0; i < request->port_size(); i++) @@ -124,11 +127,19 @@ void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/, portLock[id]->lockForWrite(); portInfo[id]->modify(port); portLock[id]->unlock(); + + notif->mutable_port_id_list()->add_port_id()->set_id(id); } } //! \todo (LOW): fill-in response "Ack"???? done->Run(); + + if (notif->port_id_list().port_id_size()) { + notif->set_notif_type(OstProto::portConfigChanged); + emit notification(notif->notif_type(), notif); + // FIXME: who will free notif! + } } void MyService::getStreamIdList(::google::protobuf::RpcController* controller, diff --git a/server/myservice.h b/server/myservice.h index 15c2f5f..ca950e4 100644 --- a/server/myservice.h +++ b/server/myservice.h @@ -23,6 +23,7 @@ along with this program. If not, see #include "../common/protocol.pb.h" #include +#include #include #define MAX_PKT_HDR_SIZE 1536 @@ -30,8 +31,9 @@ along with this program. If not, see class AbstractPort; -class MyService: public OstProto::OstService +class MyService: public QObject, public OstProto::OstService { + Q_OBJECT public: MyService(); virtual ~MyService(); @@ -102,6 +104,9 @@ public: ::OstProto::VersionCompatibility* response, ::google::protobuf::Closure* done); +signals: + void notification(int notifType, ::google::protobuf::Message *notifData); + private: /* * NOTES: