From 84c7fe1e060964cdbd5cecc9675f723c3ed1e5c5 Mon Sep 17 00:00:00 2001 From: "Srivats P." Date: Tue, 3 Nov 2009 14:02:09 +0000 Subject: [PATCH] Features - Added support for retrieving the packet capture buffer from server to client (does not work consistently however - needs investigation) - getCaptureBuffer() Rpc signature changed - RPC: Added support in Rpc Channel (client) to queue calls - RPC: Added support for transferring arbitrary binary data from server to client (used to get packet capture files) - Rpc header changed - length is now 4 bytes instead of 2; there is no rsvd field any longer Fixes - RPC: Fix for the case when a msg is not received all at once over the socket - StreamConfigDialog: fixed display issue in packet view for combo protocols containing meta fields - Fixed issue with Stacked Vlan not retaining data for both CVlan and SVlan - Fixed incorrect payload size issue with increment/decrement frame length modes Refactoring, Cleanup etc. - RPC: Minor code and TODOs cleanup - Server: Minor code and TODOs cleanup - Server: Removed unused file(s): rxtx.cpp, rxtx.h - Server: Replaced direct use of ProtocolList with the ProtocolListIterator - Common: Minor code and TODOs cleanup - StreamBase::frameLen() now returns the length based on the mode/min/max and the passed in streamIndex - AbstractProtocol interface changed for methods - protocolFrameSize(), protocolFrameOffset(), protocolFramePayloadSize() : all of them now take streamIndex as an optional param with 0 as the default value - Protocols implementing the above methods changed accordingly --- client/packetmodel.cpp | 26 +- client/portgroup.cpp | 129 +++++---- client/portgroup.h | 7 +- common/abstractprotocol.cpp | 33 ++- common/abstractprotocol.h | 6 +- common/dot3.cpp | 7 +- common/ip4.cpp | 7 +- common/ip4.proto | 3 +- common/llc.cpp | 1 - common/mac.cpp | 1 - common/ostproto.pro | 3 + common/payload.cpp | 17 +- common/payload.h | 2 +- common/protocol.proto | 14 +- common/protocolmanager.cpp | 12 +- common/sample.cpp | 22 +- common/snap.cpp | 1 - common/streambase.cpp | 35 ++- common/streambase.h | 6 +- common/svlan.cpp | 49 ++++ common/svlan.h | 23 ++ common/svlan.proto | 8 + common/tcp.cpp | 1 - common/udp.cpp | 8 +- common/vlan.cpp | 1 - common/vlan.h | 4 +- common/vlanstack.h | 3 +- common/vlanstack.proto | 2 +- rpc/pbhelper.h | 3 + rpc/pbrpcchannel.cpp | 216 ++++++++++----- rpc/pbrpcchannel.h | 15 +- rpc/pbrpccommon.h | 17 +- rpc/pbrpccontroller.h | 13 +- rpc/rpcserver.cpp | 110 +++++--- rpc/rpcserver.h | 2 +- server/drone.cpp | 79 +----- server/drone.h | 23 +- server/drone_main.cpp | 4 - server/myservice.cpp | 185 +++++++------ server/myservice.h | 9 +- server/pcapextra.cpp | 4 +- server/rxtx.cpp | 514 ------------------------------------ server/rxtx.h | 84 ------ 43 files changed, 664 insertions(+), 1045 deletions(-) create mode 100644 common/svlan.cpp create mode 100644 common/svlan.h create mode 100644 common/svlan.proto delete mode 100644 server/rxtx.cpp delete mode 100644 server/rxtx.h diff --git a/client/packetmodel.cpp b/client/packetmodel.cpp index 908de63..6b7b834 100644 --- a/client/packetmodel.cpp +++ b/client/packetmodel.cpp @@ -132,13 +132,29 @@ _exit: QVariant PacketModel::data(const QModelIndex &index, int role) const { - IndexId id; + IndexId id; + int fieldIdx = 0; if (!index.isValid()) return QVariant(); id.w = index.internalId(); + if (id.ws.type == ITYP_FIELD) + { + const AbstractProtocol *p = mSelectedProtocols.at(id.ws.protocol); + int n = index.row() + 1; + + while (n) + { + if (!(p->fieldFlags(fieldIdx).testFlag( + AbstractProtocol::FieldIsMeta))) + n--; + fieldIdx++; + } + fieldIdx--; + } + // FIXME(HI): Relook at this completely if (role == Qt::UserRole) { @@ -151,7 +167,7 @@ QVariant PacketModel::data(const QModelIndex &index, int role) const case ITYP_FIELD: return mSelectedProtocols.at(id.ws.protocol)->fieldData( - index.row(), AbstractProtocol::FieldFrameValue); + fieldIdx, AbstractProtocol::FieldFrameValue); default: qWarning("%s: Unhandled ItemType", __FUNCTION__); @@ -170,7 +186,7 @@ QVariant PacketModel::data(const QModelIndex &index, int role) const case ITYP_FIELD: return mSelectedProtocols.at(id.ws.protocol)->fieldData( - index.row(), AbstractProtocol::FieldBitSize); + fieldIdx, AbstractProtocol::FieldBitSize); default: qWarning("%s: Unhandled ItemType", __FUNCTION__); @@ -189,9 +205,9 @@ QVariant PacketModel::data(const QModelIndex &index, int role) const .arg(mSelectedProtocols.at(id.ws.protocol)->name()); case ITYP_FIELD: - return mSelectedProtocols.at(id.ws.protocol)->fieldData(index.row(), + return mSelectedProtocols.at(id.ws.protocol)->fieldData(fieldIdx, AbstractProtocol::FieldName).toString() + QString(" : ") + - mSelectedProtocols.at(id.ws.protocol)->fieldData(index.row(), + mSelectedProtocols.at(id.ws.protocol)->fieldData(fieldIdx, AbstractProtocol::FieldTextValue).toString(); default: diff --git a/client/portgroup.cpp b/client/portgroup.cpp index 4614528..76888f3 100644 --- a/client/portgroup.cpp +++ b/client/portgroup.cpp @@ -1,6 +1,8 @@ +#include +#include + #include "portgroup.h" -#include quint32 PortGroup::mPortGroupAllocId = 0; @@ -10,7 +12,15 @@ PortGroup::PortGroup(QHostAddress ip, quint16 port) mPortGroupId = PortGroup::mPortGroupAllocId++; rpcChannel = new PbRpcChannel(ip, port); - rpcController = new PbRpcController(); + + /*! + \todo (HIGH) RPC Controller should be allocated and deleted for each RPC invocation + as implemented currently, if a RPC is invoked before the previous completes, + rpc controller is overwritten due to the Reset() call - maybe we need to pass the + pointer to the controller to the callback function also? + */ + rpcController = new PbRpcController; + rpcControllerStats = new PbRpcController; serviceStub = new OstProto::OstService::Stub(rpcChannel, OstProto::OstService::STUB_OWNS_CHANNEL); @@ -458,21 +468,21 @@ void PortGroup::stopTx(QList *portList) qDebug("In %s", __FUNCTION__); if (state() != QAbstractSocket::ConnectedState) - return; + goto _exit; + + if ((portList == NULL) || (portList->size() == 0)) + goto _exit; ack = new OstProto::Ack; - if (portList == NULL) - goto _exit; - else + + for (int i = 0; i < portList->size(); i++) { - for (int i = 0; i < portList->size(); i++) - { - OstProto::PortId *portId; - portId = portIdList.add_port_id(); - portId->set_id(portList->at(i)); - } + OstProto::PortId *portId; + portId = portIdList.add_port_id(); + portId->set_id(portList->at(i)); } + rpcController->Reset(); serviceStub->stopTx(rpcController, &portIdList, ack, NewCallback(this, &PortGroup::processStopTxAck, ack)); _exit: @@ -489,19 +499,19 @@ void PortGroup::startCapture(QList *portList) if (state() != QAbstractSocket::ConnectedState) return; - ack = new OstProto::Ack; - if (portList == NULL) + if ((portList == NULL) || (portList->size() == 0)) goto _exit; - else + + ack = new OstProto::Ack; + + for (int i = 0; i < portList->size(); i++) { - for (int i = 0; i < portList->size(); i++) - { - OstProto::PortId *portId; - portId = portIdList.add_port_id(); - portId->set_id(portList->at(i)); - } + OstProto::PortId *portId; + portId = portIdList.add_port_id(); + portId->set_id(portList->at(i)); } + rpcController->Reset(); serviceStub->startCapture(rpcController, &portIdList, ack, NewCallback(this, &PortGroup::processStartCaptureAck, ack)); _exit: @@ -518,19 +528,18 @@ void PortGroup::stopCapture(QList *portList) if (state() != QAbstractSocket::ConnectedState) return; - ack = new OstProto::Ack; - if (portList == NULL) + if ((portList == NULL) || (portList->size() == 0)) goto _exit; - else + + ack = new OstProto::Ack; + for (int i = 0; i < portList->size(); i++) { - for (int i = 0; i < portList->size(); i++) - { - OstProto::PortId *portId; - portId = portIdList.add_port_id(); - portId->set_id(portList->at(i)); - } + OstProto::PortId *portId; + portId = portIdList.add_port_id(); + portId->set_id(portList->at(i)); } + rpcController->Reset(); serviceStub->stopCapture(rpcController, &portIdList, ack, NewCallback(this, &PortGroup::processStopCaptureAck, ack)); _exit: @@ -539,29 +548,38 @@ _exit: void PortGroup::viewCapture(QList *portList) { - OstProto::PortIdList portIdList; - OstProto::CaptureBufferList *bufList; + static QTemporaryFile *capFile = NULL; qDebug("In %s", __FUNCTION__); if (state() != QAbstractSocket::ConnectedState) - return; - - bufList = new OstProto::CaptureBufferList; - if (portList == NULL) goto _exit; - else - { - for (int i = 0; i < portList->size(); i++) - { - OstProto::PortId *portId; - portId = portIdList.add_port_id(); - portId->set_id(portList->at(i)); - } - } - serviceStub->getCaptureBuffer(rpcController, &portIdList, bufList, - NewCallback(this, &PortGroup::processViewCaptureAck, bufList)); + if ((portList == NULL) || (portList->size() != 1)) + goto _exit; + + if (capFile) + delete capFile; + + /*! \todo (MED) unable to reuse the same file 'coz capFile->resize(0) is + not working - it fails everytime */ + capFile = new QTemporaryFile(); + capFile->open(); + qDebug("Temp CapFile = %s", capFile->fileName().toAscii().constData()); + + for (int i = 0; i < portList->size(); i++) + { + OstProto::PortId portId; + OstProto::CaptureBuffer *buf; + + portId.set_id(portList->at(i)); + + buf = new OstProto::CaptureBuffer; + rpcController->Reset(); + rpcController->setBinaryBlob(capFile); + serviceStub->getCaptureBuffer(rpcController, &portId, buf, + NewCallback(this, &PortGroup::processViewCaptureAck, buf, (QFile*) capFile)); + } _exit: return; } @@ -594,11 +612,18 @@ void PortGroup::processStopCaptureAck(OstProto::Ack *ack) delete ack; } -void PortGroup::processViewCaptureAck(OstProto::CaptureBufferList *bufList) +void PortGroup::processViewCaptureAck(OstProto::CaptureBuffer *buf, QFile *capFile) { qDebug("In %s", __FUNCTION__); - delete bufList; + capFile->flush(); + capFile->close(); + + if (!QProcess::startDetached("C:/Program Files/Wireshark/wireshark.exe", + QStringList() << capFile->fileName())) + qDebug("Failed starting Wireshark"); + + delete buf; } void PortGroup::getPortStats() @@ -611,7 +636,8 @@ void PortGroup::getPortStats() return; portStatsList = new OstProto::PortStatsList; - serviceStub->getStats(rpcController, &portIdList, portStatsList, + rpcControllerStats->Reset(); + serviceStub->getStats(rpcControllerStats, &portIdList, portStatsList, NewCallback(this, &PortGroup::processPortStatsList, portStatsList)); } @@ -619,7 +645,7 @@ void PortGroup::processPortStatsList(OstProto::PortStatsList *portStatsList) { //qDebug("In %s", __FUNCTION__); - if (rpcController->Failed()) + if (rpcControllerStats->Failed()) { qDebug("%s: rpc failed", __FUNCTION__); goto _error_exit; @@ -664,6 +690,7 @@ void PortGroup::clearPortStats(QList *portList) } } + rpcController->Reset(); serviceStub->clearStats(rpcController, &portIdList, ack, NewCallback(this, &PortGroup::processClearStatsAck, ack)); } diff --git a/client/portgroup.h b/client/portgroup.h index c297674..c74640a 100644 --- a/client/portgroup.h +++ b/client/portgroup.h @@ -17,6 +17,8 @@ LOW #define DEFAULT_SERVER_PORT 7878 +class QFile; + class PortGroup : public QObject { Q_OBJECT @@ -30,7 +32,8 @@ private: quint16 mServerPort; #endif PbRpcChannel *rpcChannel; - ::google::protobuf::RpcController *rpcController; + PbRpcController *rpcController; + PbRpcController *rpcControllerStats; ::OstProto::OstService::Stub *serviceStub; ::OstProto::PortIdList portIdList; @@ -80,7 +83,7 @@ public: void stopCapture(QList *portList = NULL); void processStopCaptureAck(OstProto::Ack *ack); void viewCapture(QList *portList = NULL); - void processViewCaptureAck(OstProto::CaptureBufferList *bufList); + void processViewCaptureAck(OstProto::CaptureBuffer *buf, QFile *capFile); void getPortStats(); void processPortStatsList(OstProto::PortStatsList *portStatsList); diff --git a/common/abstractprotocol.cpp b/common/abstractprotocol.cpp index 24355f2..10678fb 100644 --- a/common/abstractprotocol.cpp +++ b/common/abstractprotocol.cpp @@ -7,7 +7,7 @@ /*! \class AbstractProtocol - // FIXME - update this text + \todo (MED) update AbstractProtocol documentation Bare Minimum set of methods that a subclass needs to reimplement - protoDataCopyInto() [pure virtual] - protoDataCopyFrom() [pure virtual] @@ -58,7 +58,7 @@ quint32 AbstractProtocol::protocolNumber() const /* \fn virtual void protoDataCopyFrom(const OstProto::OstProto::StreamCore &stream) = 0; - FIXME */ + */ /*! Returns the full name of the protocol \n The default implementation returns a null string */ @@ -137,7 +137,18 @@ AbstractProtocol::FieldFlags AbstractProtocol::fieldFlags(int index) const The FieldTextValue attribute may include additional information about the field's value e.g. a checksum field may include "(correct)" or "(incorrect)" alongwith the actual checksum value. \n - The default implementation returns FIXME + + The default implementation returns a empty string for FieldName and + FieldTextValue; empty byte array of size 0 for FieldFrameValue; 0 for + FieldValue; subclasses are expected to return meaning values for all + these attributes. The only exception is the 'FieldBitSize' attribute - + the default implementation takes the (byte) size of FieldFrameValue, + multiplies it with 8 and returns the result - this can be used by + subclasses for fields which are an integral multiple of bytes; for + fields whose size are a non-integral multiple of bytes or smaller than + a byte, subclasses should return the correct value. Also for fields + which represent checksums, subclasses should return a value for + FieldBitSize - even if it is an integral multiple of bytes \note If a subclass uses any of the below functions to derive FieldFrameValue, the subclass should handle and return a value for @@ -202,7 +213,7 @@ quint32 AbstractProtocol::payloadProtocolId(ProtocolIdType type) const return id; } -int AbstractProtocol::protocolFrameSize() const +int AbstractProtocol::protocolFrameSize(int streamIndex) const { if (protoSize < 0) { @@ -211,7 +222,7 @@ int AbstractProtocol::protocolFrameSize() const for (int i = 0; i < fieldCount(); i++) { if (!fieldFlags(i).testFlag(FieldIsMeta)) - bitsize += fieldData(i, FieldBitSize).toUInt(); + bitsize += fieldData(i, FieldBitSize, streamIndex).toUInt(); } protoSize = (bitsize+7)/8; } @@ -220,34 +231,34 @@ int AbstractProtocol::protocolFrameSize() const return protoSize; } -int AbstractProtocol::protocolFrameOffset() const +int AbstractProtocol::protocolFrameOffset(int streamIndex) const { int size = 0; AbstractProtocol *p = prev; while (p) { - size += p->protocolFrameSize(); + size += p->protocolFrameSize(streamIndex); p = p->prev; } if (parent) - size += parent->protocolFrameOffset(); + size += parent->protocolFrameOffset(streamIndex); qDebug("%s: ofs = %d", __FUNCTION__, size); return size; } -int AbstractProtocol::protocolFramePayloadSize() const +int AbstractProtocol::protocolFramePayloadSize(int streamIndex) const { int size = 0; AbstractProtocol *p = next; while (p) { - size += p->protocolFrameSize(); + size += p->protocolFrameSize(streamIndex); p = p->next; } if (parent) - size += parent->protocolFramePayloadSize(); + size += parent->protocolFramePayloadSize(streamIndex); qDebug("%s: payloadSize = %d", __FUNCTION__, size); return size; diff --git a/common/abstractprotocol.h b/common/abstractprotocol.h index 13db703..3e1dc12 100644 --- a/common/abstractprotocol.h +++ b/common/abstractprotocol.h @@ -97,9 +97,9 @@ public: QByteArray protocolFrameValue(int streamIndex = 0, bool forCksum = false) const; - virtual int protocolFrameSize() const; - int protocolFrameOffset() const; - int protocolFramePayloadSize() const; + virtual int protocolFrameSize(int streamIndex = 0) const; + int protocolFrameOffset(int streamIndex = 0) const; + int protocolFramePayloadSize(int streamIndex = 0) const; virtual quint32 protocolFrameCksum(int streamIndex = 0, CksumType cksumType = CksumIp) const; diff --git a/common/dot3.cpp b/common/dot3.cpp index cae0012..0a88d50 100644 --- a/common/dot3.cpp +++ b/common/dot3.cpp @@ -77,7 +77,7 @@ QVariant Dot3Protocol::fieldData(int index, FieldAttrib attrib, quint16 len; //len = mpStream->frameLen() - SZ_FCS; - len = protocolFramePayloadSize(); + len = protocolFramePayloadSize(streamIndex); return len; } case FieldTextValue: @@ -85,7 +85,7 @@ QVariant Dot3Protocol::fieldData(int index, FieldAttrib attrib, quint16 len; //len = mpStream->frameLen() - SZ_FCS; - len = protocolFramePayloadSize(); + len = protocolFramePayloadSize(streamIndex); return QString("%1").arg(len); } case FieldFrameValue: @@ -94,7 +94,7 @@ QVariant Dot3Protocol::fieldData(int index, FieldAttrib attrib, QByteArray fv; //len = mpStream->frameLen() - SZ_FCS; - len = protocolFramePayloadSize(); + len = protocolFramePayloadSize(streamIndex); fv.resize(2); qToBigEndian(len, (uchar*) fv.data()); return fv; @@ -116,7 +116,6 @@ QVariant Dot3Protocol::fieldData(int index, FieldAttrib attrib, bool Dot3Protocol::setFieldData(int index, const QVariant &value, FieldAttrib attrib) { - // FIXME return false; } diff --git a/common/ip4.cpp b/common/ip4.cpp index 4fd594b..c929872 100644 --- a/common/ip4.cpp +++ b/common/ip4.cpp @@ -236,7 +236,7 @@ QVariant Ip4Protocol::fieldData(int index, FieldAttrib attrib, { int totlen; totlen = data.is_override_totlen() ? data.totlen() : - (protocolFramePayloadSize() + 20); + (protocolFramePayloadSize(streamIndex) + 20); return totlen; } case FieldFrameValue: @@ -244,7 +244,7 @@ QVariant Ip4Protocol::fieldData(int index, FieldAttrib attrib, QByteArray fv; int totlen; totlen = data.is_override_totlen() ? data.totlen() : - (protocolFramePayloadSize() + 20); + (protocolFramePayloadSize(streamIndex) + 20); fv.resize(2); qToBigEndian((quint16) totlen, (uchar*) fv.data()); return fv; @@ -253,7 +253,7 @@ QVariant Ip4Protocol::fieldData(int index, FieldAttrib attrib, { int totlen; totlen = data.is_override_totlen() ? data.totlen() : - (protocolFramePayloadSize() + 20); + (protocolFramePayloadSize(streamIndex) + 20); return QString("%1").arg(totlen); } case FieldBitSize: @@ -320,7 +320,6 @@ QVariant Ip4Protocol::fieldData(int index, FieldAttrib attrib, case FieldFrameValue: { QByteArray fv; - // FIXME need to shift for 13 bits fv.resize(2); qToBigEndian((quint16) (data.frag_ofs()), (uchar*) fv.data()); diff --git a/common/ip4.proto b/common/ip4.proto index c17ec75..84fcbf4 100644 --- a/common/ip4.proto +++ b/common/ip4.proto @@ -20,7 +20,6 @@ message Ip4 { optional uint32 tos = 6; optional uint32 totlen = 7; optional uint32 id = 8 [default = 1234]; - // TODO: rename flags to frag_flags optional uint32 flags = 9; optional uint32 frag_ofs = 10; optional uint32 ttl = 11 [default = 127]; @@ -39,7 +38,7 @@ message Ip4 { optional uint32 dst_ip_count = 20 [default = 16]; optional fixed32 dst_ip_mask = 21 [default = 0xFFFFFF00]; - // TODO: Options + //! \todo (LOW) IPv4 Options } extend Protocol { diff --git a/common/llc.cpp b/common/llc.cpp index b352cab..e949e08 100644 --- a/common/llc.cpp +++ b/common/llc.cpp @@ -128,7 +128,6 @@ QVariant LlcProtocol::fieldData(int index, FieldAttrib attrib, bool LlcProtocol::setFieldData(int index, const QVariant &value, FieldAttrib attrib) { - // FIXME return false; } diff --git a/common/mac.cpp b/common/mac.cpp index ae80905..8e468a3 100644 --- a/common/mac.cpp +++ b/common/mac.cpp @@ -236,7 +236,6 @@ QVariant MacProtocol::fieldData(int index, FieldAttrib attrib, bool MacProtocol::setFieldData(int index, const QVariant &value, FieldAttrib attrib) { - // FIXME return false; } diff --git a/common/ostproto.pro b/common/ostproto.pro index 9945212..8ae7f99 100644 --- a/common/ostproto.pro +++ b/common/ostproto.pro @@ -25,6 +25,7 @@ PROTOS += \ dot2llc.proto \ dot2snap.proto \ vlan.proto \ + svlan.proto \ vlanstack.proto \ ip4.proto \ tcp.proto \ @@ -45,6 +46,7 @@ HEADERS += \ dot2llc.h \ dot2snap.h \ vlan.h \ + svlan.h \ vlanstack.h \ ip4.h \ tcp.h \ @@ -62,6 +64,7 @@ SOURCES += \ llc.cpp \ snap.cpp \ vlan.cpp \ + svlan.cpp \ ip4.cpp \ tcp.cpp \ udp.cpp diff --git a/common/payload.cpp b/common/payload.cpp index a83817c..1125b73 100644 --- a/common/payload.cpp +++ b/common/payload.cpp @@ -75,9 +75,16 @@ QString PayloadProtocol::shortName() const return QString("DATA"); } -int PayloadProtocol::protocolFrameSize() const +int PayloadProtocol::protocolFrameSize(int streamIndex) const { - return (mpStream->frameLen() - protocolFrameOffset() - SZ_FCS); + int len; + + len = mpStream->frameLen(streamIndex) - protocolFrameOffset(streamIndex) + - SZ_FCS; + + qDebug("%s: this = %p, streamIndex = %d, len = %d", __FUNCTION__, this, + streamIndex, len); + return len; } int PayloadProtocol::fieldCount() const @@ -125,8 +132,7 @@ QVariant PayloadProtocol::fieldData(int index, FieldAttrib attrib, QByteArray fv; int dataLen; - dataLen = mpStream->frameLen() - protocolFrameOffset(); - dataLen -= SZ_FCS; + dataLen = protocolFrameSize(streamIndex); // FIXME: Hack! Bad! Bad! Very Bad!!! if (dataLen <= 0) @@ -149,7 +155,7 @@ QVariant PayloadProtocol::fieldData(int index, FieldAttrib attrib, fv[i] = 0xFF - (i % (0xFF + 1)); break; case OstProto::Payload::e_dp_random: - //! \todo cksum will be incorrect for random pattern + //! \todo (HIGH) cksum is incorrect for random pattern for (int i = 0; i < dataLen; i++) fv[i] = qrand() % (0xFF + 1); break; @@ -178,7 +184,6 @@ QVariant PayloadProtocol::fieldData(int index, FieldAttrib attrib, bool PayloadProtocol::setFieldData(int index, const QVariant &value, FieldAttrib attrib) { - // FIXME return false; } diff --git a/common/payload.h b/common/payload.h index cb93006..c2d5f4d 100644 --- a/common/payload.h +++ b/common/payload.h @@ -44,7 +44,7 @@ public: virtual QString name() const; virtual QString shortName() const; - virtual int protocolFrameSize() const; + virtual int protocolFrameSize(int streamIndex = 0) const; virtual int fieldCount() const; diff --git a/common/protocol.proto b/common/protocol.proto index 372ad54..1f3c0a5 100644 --- a/common/protocol.proto +++ b/common/protocol.proto @@ -1,7 +1,5 @@ // stream.proto -// FIXME: Re-evaluate Tag Values - package OstProto; message StreamId { @@ -56,9 +54,6 @@ message StreamControl { optional NextWhat next = 6 [default = e_nw_goto_next]; optional uint32 packets_per_sec = 7 [default = 1]; optional uint32 bursts_per_sec = 8 [default = 1]; - - // TODO: Gaps? - } message ProtocolId { @@ -81,11 +76,12 @@ message Protocol { kLlcFieldNumber = 123; kSnapFieldNumber = 124; - kVlanStackFieldNumber = 125; + kSvlanFieldNumber = 125; kVlanFieldNumber = 126; kDot2LlcFieldNumber = 127; kDot2SnapFieldNumber = 128; + kVlanStackFieldNumber = 129; kIp4FieldNumber = 130; kArpFieldNumber = 131; @@ -111,7 +107,7 @@ message Void { } message Ack { - // TODO + //! \todo (LOW) do we need any fields in 'Ack' } message PortId { @@ -146,7 +142,7 @@ message StreamConfigList { } message CaptureBuffer { - // TODO + //! \todo (HIGH) define CaptureBuffer } message CaptureBufferList { @@ -190,7 +186,7 @@ service OstService { rpc startCapture(PortIdList) returns (Ack); rpc stopCapture(PortIdList) returns (Ack); - rpc getCaptureBuffer(PortIdList) returns (CaptureBufferList); + rpc getCaptureBuffer(PortId) returns (CaptureBuffer); rpc getStats(PortIdList) returns (PortStatsList); rpc clearStats(PortIdList) returns (Ack); diff --git a/common/protocolmanager.cpp b/common/protocolmanager.cpp index 9010354..56c1912 100644 --- a/common/protocolmanager.cpp +++ b/common/protocolmanager.cpp @@ -1,12 +1,9 @@ #include "protocolmanager.h" - -// FIXME(HI): remove -#include "protocol.pb.h" #include "abstractprotocol.h" +#include "protocol.pb.h" #include "mac.h" #include "payload.h" - #include "eth2.h" #include "dot3.h" #include "llc.h" @@ -23,6 +20,9 @@ ProtocolManager OstProtocolManager; ProtocolManager::ProtocolManager() { + /*! \todo (LOW) calls to registerProtocol() should be done by the protocols + themselves (once this is done remove the #includes for all the protocols) + */ registerProtocol(OstProto::Protocol::kMacFieldNumber, QString("mac"), (void*) MacProtocol::createInstance); registerProtocol(OstProto::Protocol::kPayloadFieldNumber, @@ -39,6 +39,8 @@ ProtocolManager::ProtocolManager() QString("dot2Llc"), (void*) Dot2LlcProtocol::createInstance); registerProtocol(OstProto::Protocol::kDot2SnapFieldNumber, QString("dot2Snap"), (void*) Dot2SnapProtocol::createInstance); + registerProtocol(OstProto::Protocol::kSvlanFieldNumber, + QString("svlan"), (void*) VlanProtocol::createInstance); registerProtocol(OstProto::Protocol::kVlanFieldNumber, QString("vlan"), (void*) VlanProtocol::createInstance); registerProtocol(OstProto::Protocol::kVlanStackFieldNumber, @@ -54,7 +56,7 @@ ProtocolManager::ProtocolManager() void ProtocolManager::registerProtocol(int protoNumber, QString protoName, void *protoInstanceCreator) { - // TODO: validate incoming params for duplicates with existing + //! \todo (MED) validate incoming params for duplicates with existing nameToNumberMap.insert(protoName, protoNumber); numberToNameMap.insert(protoNumber, protoName); factory.insert(protoNumber, protoInstanceCreator); diff --git a/common/sample.cpp b/common/sample.cpp index 3076e6c..63aaa7e 100644 --- a/common/sample.cpp +++ b/common/sample.cpp @@ -3,6 +3,10 @@ #include "sample.h" +/*! \todo (MED) Complete the "sample" protocol and make it compilable so that + it can be used as an example for new protocols + */ + SampleConfigForm::SampleConfigForm(QWidget *parent) : QWidget(parent) { @@ -90,18 +94,18 @@ QVariant SampleProtocol::fieldData(int index, FieldAttrib attrib, { switch (index) { - case sample_FIXME: + case sample_one: { switch(attrib) { case FieldName: - return QString("FIXME"); + return QString("ONE"); case FieldValue: - return data.FIXME(); + return data.one(); case FieldTextValue: - return QString("%1").arg(data.FIXME()); + return QString("%1").arg(data.one()); case FieldFrameValue: - return QByteArray(1, (char)(data.FIXME() & 0xF0)); + return QByteArray(1, (char)(data.one() & 0xF0)); case FieldBitSize: return 4; default: @@ -110,16 +114,16 @@ QVariant SampleProtocol::fieldData(int index, FieldAttrib attrib, break; } - case sample_FIXME: + case sample_two: { switch(attrib) { case FieldName: - return QString("FIXME"); + return QString("TWO"); case FieldValue: - return FIXME; + return data.two(); case FieldTextValue: - return QString("%1").arg(FIXME); + return QString("%1").arg(data.two()); case FieldFrameValue: { QByteArray fv; diff --git a/common/snap.cpp b/common/snap.cpp index c651bb1..3dfd7e4 100644 --- a/common/snap.cpp +++ b/common/snap.cpp @@ -133,7 +133,6 @@ QVariant SnapProtocol::fieldData(int index, FieldAttrib attrib, bool SnapProtocol::setFieldData(int index, const QVariant &value, FieldAttrib attrib) { - // FIXME return false; } diff --git a/common/streambase.cpp b/common/streambase.cpp index ecfc07c..1ed8d7f 100644 --- a/common/streambase.cpp +++ b/common/streambase.cpp @@ -169,9 +169,40 @@ bool StreamBase::setLenMode(FrameLengthMode lenMode) return true; } -quint16 StreamBase::frameLen() +quint16 StreamBase::frameLen(int streamIndex) { - return mCore->frame_len(); + int pktLen; + + // Decide a frame length based on length mode + switch(lenMode()) + { + case OstProto::StreamCore::e_fl_fixed: + pktLen = mCore->frame_len(); + break; + case OstProto::StreamCore::e_fl_inc: + pktLen = frameLenMin() + (streamIndex % + (frameLenMax() - frameLenMin() + 1)); + break; + case OstProto::StreamCore::e_fl_dec: + pktLen = frameLenMax() - (streamIndex % + (frameLenMax() - frameLenMin() + 1)); + break; + case OstProto::StreamCore::e_fl_random: + //! \todo (MED) This 'random' sequence is same across iterations + qsrand(((uint) this)); + for (int i = 0; i <= streamIndex; i++) + pktLen = qrand(); + pktLen = frameLenMin() + (pktLen % + (frameLenMax() - frameLenMin() + 1)); + break; + default: + qWarning("Unhandled len mode %d. Using default 64", + lenMode()); + pktLen = 64; + break; + } + + return pktLen; } bool StreamBase::setFrameLen(quint16 frameLen) diff --git a/common/streambase.h b/common/streambase.h index 99838d9..878b2d7 100644 --- a/common/streambase.h +++ b/common/streambase.h @@ -17,8 +17,6 @@ private: OstProto::StreamCore *mCore; OstProto::StreamControl *mControl; -protected: - //! \todo TODO: Make ProtocolList a private member of StreamBase? ProtocolList *currentFrameProtocols; public: @@ -30,7 +28,7 @@ public: ProtocolListIterator* createProtocolListIterator(); - // TODO: make a copy constructor + //! \todo (LOW) should we have a copy constructor?? public: enum FrameLengthMode { @@ -81,7 +79,7 @@ public: FrameLengthMode lenMode(); bool setLenMode(FrameLengthMode lenMode); - quint16 frameLen(); + quint16 frameLen(int streamIndex = 0); bool setFrameLen(quint16 frameLen); quint16 frameLenMin(); diff --git a/common/svlan.cpp b/common/svlan.cpp new file mode 100644 index 0000000..55ef836 --- /dev/null +++ b/common/svlan.cpp @@ -0,0 +1,49 @@ +#include + +#include "svlan.h" +#include "svlan.pb.h" + +SVlanProtocol::SVlanProtocol(StreamBase *stream, AbstractProtocol *parent) + : VlanProtocol(stream, parent) +{ + data.set_tpid(0x88a8); + data.set_is_override_tpid(true); +} + +SVlanProtocol::~SVlanProtocol() +{ +} + +AbstractProtocol* SVlanProtocol::createInstance(StreamBase *stream, + AbstractProtocol *parent) +{ + return new SVlanProtocol(stream, parent); +} + +quint32 SVlanProtocol::protocolNumber() const +{ + return OstProto::Protocol::kSvlanFieldNumber; +} + +void SVlanProtocol::protoDataCopyInto(OstProto::Protocol &protocol) const +{ + protocol.MutableExtension(OstProto::svlan)->CopyFrom(data); + protocol.mutable_protocol_id()->set_id(protocolNumber()); +} + +void SVlanProtocol::protoDataCopyFrom(const OstProto::Protocol &protocol) +{ + if (protocol.protocol_id().id() == protocolNumber() && + protocol.HasExtension(OstProto::svlan)) + data.MergeFrom(protocol.GetExtension(OstProto::svlan)); +} + +QString SVlanProtocol::name() const +{ + return QString("SVlan"); +} + +QString SVlanProtocol::shortName() const +{ + return QString("SVlan"); +} diff --git a/common/svlan.h b/common/svlan.h new file mode 100644 index 0000000..0fb4b97 --- /dev/null +++ b/common/svlan.h @@ -0,0 +1,23 @@ +#ifndef _SVLAN_H +#define _SVLAN_H + +#include "vlan.h" + +class SVlanProtocol : public VlanProtocol +{ +public: + SVlanProtocol(StreamBase *stream, AbstractProtocol *parent = 0); + virtual ~SVlanProtocol(); + + static AbstractProtocol* createInstance(StreamBase *stream, + AbstractProtocol *parent = 0); + virtual quint32 protocolNumber() const; + + virtual void protoDataCopyInto(OstProto::Protocol &protocol) const; + virtual void protoDataCopyFrom(const OstProto::Protocol &protocol); + + virtual QString name() const; + virtual QString shortName() const; +}; + +#endif diff --git a/common/svlan.proto b/common/svlan.proto new file mode 100644 index 0000000..72e4557 --- /dev/null +++ b/common/svlan.proto @@ -0,0 +1,8 @@ +import "protocol.proto"; +import "vlan.proto"; + +package OstProto; + +extend Protocol { + optional Vlan svlan = 125; +} diff --git a/common/tcp.cpp b/common/tcp.cpp index dc461b6..be16120 100644 --- a/common/tcp.cpp +++ b/common/tcp.cpp @@ -377,7 +377,6 @@ QVariant TcpProtocol::fieldData(int index, FieldAttrib attrib, bool TcpProtocol::setFieldData(int index, const QVariant &value, FieldAttrib attrib) { - // FIXME return false; } diff --git a/common/udp.cpp b/common/udp.cpp index c722b54..0d5b772 100644 --- a/common/udp.cpp +++ b/common/udp.cpp @@ -159,7 +159,7 @@ QVariant UdpProtocol::fieldData(int index, FieldAttrib attrib, totlen = data.is_override_totlen() ? data.totlen() : - (protocolFramePayloadSize() + 8); + (protocolFramePayloadSize(streamIndex) + 8); return totlen; } case FieldFrameValue: @@ -168,7 +168,7 @@ QVariant UdpProtocol::fieldData(int index, FieldAttrib attrib, int totlen; totlen = data.is_override_totlen() ? data.totlen() : - (protocolFramePayloadSize() + 8); + (protocolFramePayloadSize(streamIndex) + 8); fv.resize(2); qToBigEndian((quint16) totlen, (uchar*) fv.data()); return fv; @@ -178,7 +178,7 @@ QVariant UdpProtocol::fieldData(int index, FieldAttrib attrib, int totlen; totlen = data.is_override_totlen() ? data.totlen() : - (protocolFramePayloadSize() + 8); + (protocolFramePayloadSize(streamIndex) + 8); return QString("%1").arg(totlen); } case FieldBitSize: @@ -254,7 +254,7 @@ QVariant UdpProtocol::fieldData(int index, FieldAttrib attrib, bool UdpProtocol::setFieldData(int index, const QVariant &value, FieldAttrib attrib) { - // FIXME + //! implement UdpProtocol::setFieldData() return false; } diff --git a/common/vlan.cpp b/common/vlan.cpp index 49b9a27..244a473 100644 --- a/common/vlan.cpp +++ b/common/vlan.cpp @@ -196,7 +196,6 @@ QVariant VlanProtocol::fieldData(int index, FieldAttrib attrib, bool VlanProtocol::setFieldData(int index, const QVariant &value, FieldAttrib attrib) { - // FIXME return false; } diff --git a/common/vlan.h b/common/vlan.h index c5655da..5e5d7de 100644 --- a/common/vlan.h +++ b/common/vlan.h @@ -16,7 +16,6 @@ public: class VlanProtocol : public AbstractProtocol { private: - OstProto::Vlan data; VlanConfigForm *configForm; enum Vlanfield { @@ -31,6 +30,9 @@ private: vlan_fieldCount }; +protected: + OstProto::Vlan data; + public: VlanProtocol(StreamBase *stream, AbstractProtocol *parent = 0); virtual ~VlanProtocol(); diff --git a/common/vlanstack.h b/common/vlanstack.h index 5202729..f74aaed 100644 --- a/common/vlanstack.h +++ b/common/vlanstack.h @@ -2,9 +2,10 @@ #define _VLAN_STACK_H #include "comboprotocol.h" +#include "svlan.h" #include "vlan.h" typedef ComboProtocol VlanStackProtocol; + SVlanProtocol, VlanProtocol> VlanStackProtocol; #endif diff --git a/common/vlanstack.proto b/common/vlanstack.proto index 4c3d06d..a54f11f 100644 --- a/common/vlanstack.proto +++ b/common/vlanstack.proto @@ -8,5 +8,5 @@ message VlanStack { } extend Protocol { - optional VlanStack vlanStack = 125; + optional VlanStack vlanStack = 129; } diff --git a/rpc/pbhelper.h b/rpc/pbhelper.h index e3d85b0..7a70c75 100644 --- a/rpc/pbhelper.h +++ b/rpc/pbhelper.h @@ -6,9 +6,11 @@ #include +#if 0 // not reqd. any longer? class PbHelper { public: + // FIXME: Change msg from * to & void ForceSetSingularDefault(::google::protobuf::Message *msg) { @@ -146,3 +148,4 @@ public: } }; #endif +#endif diff --git a/rpc/pbrpcchannel.cpp b/rpc/pbrpcchannel.cpp index ed60d2f..9d5be05 100644 --- a/rpc/pbrpcchannel.cpp +++ b/rpc/pbrpcchannel.cpp @@ -1,7 +1,5 @@ #include "pbrpcchannel.h" -//#include "../common/protocol.pb.h" - PbRpcChannel::PbRpcChannel(QHostAddress ip, quint16 port) { isPending = false; @@ -65,17 +63,36 @@ void PbRpcChannel::CallMethod( ::google::protobuf::Message *response, ::google::protobuf::Closure* done) { - char msg[4096]; // FIXME: hardcoding - char *p = (char *)&msg; + char msg[MSGBUF_SIZE]; int len; + bool ret; - //qDebug("In %s", __FUNCTION__); + if (isPending) + { + RpcCall call; + qDebug("RpcChannel: queueing method %d since %d is pending", + method->index(), pendingMethodId); + + call.method = method; + call.controller = controller; + call.request = req; + call.response = response; + call.done = done; + + pendingCallList.append(call); + + Q_ASSERT(pendingCallList.size() < 100); + + return; + } if (!req->IsInitialized()) { - qDebug("RpcChannel: missing required fields in request"); + qWarning("RpcChannel: missing required fields in request"); qDebug(req->InitializationErrorString().c_str()); + qFatal("exiting"); + controller->SetFailed("Required fields missing"); done->Run(); return; @@ -87,103 +104,172 @@ void PbRpcChannel::CallMethod( this->response=response; isPending = true; - *((quint16*)(p+0)) = HTONS(PB_MSG_TYPE_REQUEST); // type - //qDebug("CLi:GET16 = %d/%d, type = %d", GET16(p+0), NTOHS(GET16(p+0)), - //PB_MSG_TYPE_REQUEST); - *((quint16*)(p+2)) = HTONS(method->index()); // method id - // (p+4) len later after serialization - *((quint16*)(p+6)) = HTONS(0); // rsvd - - // SerialData is at offset 8 - req->SerializeToArray((void*) (p+8), sizeof(msg)); + ret = req->SerializeToArray((void*) (&msg[PB_HDR_SIZE]), sizeof(msg)); + Q_ASSERT(ret == true); len = req->ByteSize(); - *((quint16*)(p+4)) = HTONS(len); // len + *((quint16*)(&msg[0])) = HTONS(PB_MSG_TYPE_REQUEST); // type + *((quint16*)(&msg[2])) = HTONS(method->index()); // method id + *((quint32*)(&msg[4])) = HTONL(len); // len // Avoid printing stats since it happens every couple of seconds if (pendingMethodId != 12) { - qDebug("client(%s) sending %d bytes encoding <%s>", __FUNCTION__, len+8, - req->DebugString().c_str()); - BUFDUMP(msg, len+8); + qDebug("client(%s) sending %d bytes encoding <%s>", __FUNCTION__, + PB_HDR_SIZE + len, req->DebugString().c_str()); + BUFDUMP(msg, PB_HDR_SIZE + len); } - mpSocket->write(msg, len + 8); + mpSocket->write(msg, PB_HDR_SIZE + len); } void PbRpcChannel::on_mpSocket_readyRead() { - char msg[4096]; // FIXME: hardcoding; + char msg[MSGBUF_SIZE]; char *p = (char*)&msg; int msgLen; - quint16 type, method, len, rsvd; - PbRpcController *controller; + static bool parsing = false; + static quint16 type, method; + static quint32 len; - //qDebug("In %s", __FUNCTION__); - - msgLen = mpSocket->read(msg, sizeof(msg)); + //qDebug("%s: bytesAvail = %d", __FUNCTION__, mpSocket->bytesAvailable()); - //qDebug("client(%s) rcvd %d bytes", __FUNCTION__, msgLen); - //BUFDUMP(msg, msgLen); - - type = NTOHS(GET16(p+0)); - method = NTOHS(GET16(p+2)); - len = NTOHS(GET16(p+4)); - rsvd = NTOHS(GET16(p+6)); - - if (!isPending) + if (!parsing) { - qDebug("not waiting for response"); - - goto _error_exit; + // Do we have an entire header? If not, we'll wait ... + if (mpSocket->bytesAvailable() < PB_HDR_SIZE) + { + qDebug("client: not enough data available for a complete header"); + return; + } + + msgLen = mpSocket->read(msg, PB_HDR_SIZE); + + Q_ASSERT(msgLen == PB_HDR_SIZE); + + type = NTOHS(GET16(p+0)); + method = NTOHS(GET16(p+2)); + len = NTOHL(GET32(p+4)); + + //BUFDUMP(msg, PB_HDR_SIZE); + //qDebug("type = %hu, method = %hu, len = %u", type, method, len); + + parsing = true; } - if (type != PB_MSG_TYPE_RESPONSE) + switch (type) { - qDebug("invalid msgType %d (expected = %d)", type, - PB_MSG_TYPE_RESPONSE); - - goto _error_exit; - } + case PB_MSG_TYPE_BINBLOB: + { + static quint32 cumLen = 0; + QIODevice *blob; - if (pendingMethodId != method) - { - qDebug("invalid method id %d (expected = %d)", method, - pendingMethodId); - - goto _error_exit; - } + blob = static_cast(controller)->binaryBlob(); + Q_ASSERT(blob != NULL); + while ((cumLen < len) && mpSocket->bytesAvailable()) + { + int l; - // Serialized data starts from offset 8 - response->ParseFromArray((void*) &msg[8], len); + l = mpSocket->read(msg, sizeof(msg)); + blob->write(msg, l); + cumLen += l; + } - // Avoid printing stats - if (method != 12) - { - qDebug("client(%s): Parsed as %s", __FUNCTION__, - response->DebugString().c_str()); - } + qDebug("%s: bin blob rcvd %d/%d", __PRETTY_FUNCTION__, cumLen, len); - if (!response->IsInitialized()) - { - qDebug("RpcChannel: missing required fields in response"); - qDebug(response->InitializationErrorString().c_str()); + if (cumLen < len) + return; - controller->SetFailed("Required fields missing"); + cumLen = 0; + + if (!isPending) + { + qDebug("not waiting for response"); + goto _error_exit; + } + + if (pendingMethodId != method) + { + qDebug("invalid method id %d (expected = %d)", method, + pendingMethodId); + goto _error_exit; + } + + break; + } + + case PB_MSG_TYPE_RESPONSE: + // Wait till we have the entire message + if (mpSocket->bytesAvailable() < len) + { + qDebug("client: not enough data available for a complete msg"); + return; + } + + msgLen = mpSocket->read(msg, sizeof(msg)); + + Q_ASSERT((unsigned) msgLen == len); + + //qDebug("client(%s) rcvd %d bytes", __FUNCTION__, msgLen); + //BUFDUMP(msg, msgLen); + + if (!isPending) + { + qDebug("not waiting for response"); + goto _error_exit; + } + + if (pendingMethodId != method) + { + qDebug("invalid method id %d (expected = %d)", method, + pendingMethodId); + goto _error_exit; + } + + response->ParseFromArray((void*) msg, len); + + // Avoid printing stats + if (method != 12) + { + qDebug("client(%s): Parsed as %s", __FUNCTION__, + response->DebugString().c_str()); + } + + if (!response->IsInitialized()) + { + qWarning("RpcChannel: missing required fields in response"); + qDebug(response->InitializationErrorString().c_str()); + + controller->SetFailed("Required fields missing"); + } + break; + + default: + qFatal("%s: unexpected type %d", __PRETTY_FUNCTION__, type); + goto _error_exit; + } pendingMethodId = -1; controller = NULL; - //done = NULL; response = NULL; isPending = false; + parsing = false; done->Run(); + if (pendingCallList.size()) + { + RpcCall call = pendingCallList.takeFirst(); + CallMethod(call.method, call.controller, call.request, call.response, + call.done); + } + return; _error_exit: + parsing = false; qDebug("client(%s) discarding received msg", __FUNCTION__); return; } diff --git a/rpc/pbrpcchannel.h b/rpc/pbrpcchannel.h index 657b095..17f287a 100644 --- a/rpc/pbrpcchannel.h +++ b/rpc/pbrpcchannel.h @@ -25,13 +25,22 @@ class PbRpcChannel : public QObject, public ::google::protobuf::RpcChannel // passed by the stub to CallMethod(). They are reset to NULL when // we get a response back from the server in on_mpSocket_readyRead() // after calling done->Run(). - // - // TODO(?): change controller, done and response to references - // instead of pointers? + + /*! \todo (MED) : change controller, done and response to references + instead of pointers? */ ::google::protobuf::RpcController *controller; ::google::protobuf::Closure *done; ::google::protobuf::Message *response; + typedef struct _RpcCall { + const ::google::protobuf::MethodDescriptor *method; + ::google::protobuf::RpcController *controller; + const ::google::protobuf::Message *request; + ::google::protobuf::Message *response; + ::google::protobuf::Closure *done; + } RpcCall; + QList pendingCallList; + QHostAddress mServerAddress; quint16 mServerPort; QTcpSocket *mpSocket; diff --git a/rpc/pbrpccommon.h b/rpc/pbrpccommon.h index 5407602..ef4b1ee 100644 --- a/rpc/pbrpccommon.h +++ b/rpc/pbrpccommon.h @@ -1,7 +1,7 @@ #ifndef _PB_RPC_COMMON_H #define _PB_RPC_COMMON_H -// FIXME: check which one is right - wrong one seems to be working!!!!! +//! \todo (LOW) check which one is right - wrong one seems to be working!!!!! #if 0 #define GET16(p) (quint16)( \ (*((quint8*)(p)+0) << 8 ) \ @@ -10,6 +10,11 @@ #define GET16(p) (quint16)( \ (*((quint8*)(p)+1) << 8 ) \ | (*((quint8*)(p)+0))) +#define GET32(p) (quint32)( \ + (*((quint8*)(p)+3) << 24) \ + | (*((quint8*)(p)+2) << 16) \ + | (*((quint8*)(p)+1) << 8 ) \ + | (*((quint8*)(p)+0))) #endif #define BYTESWAP4(x) \ @@ -22,7 +27,7 @@ (((x & 0xFF00) >> 8) | \ ((x & 0x00FF) << 8)) -// TODO: portability +//! \todo (LOW) : portability #if 1 #define HTONL(x) BYTESWAP4(x) #define NTOHL(x) BYTESWAP4(x) @@ -43,10 +48,14 @@ ** RPC Header (8) ** - MSG_TYPE (2) ** - METHOD_ID (2) -** - LEN (2) [not including this header] -** - RSVD (2) +** - LEN (4) [not including this header] */ +#define PB_HDR_SIZE 8 + #define PB_MSG_TYPE_REQUEST 1 #define PB_MSG_TYPE_RESPONSE 2 +#define PB_MSG_TYPE_BINBLOB 3 + +#define MSGBUF_SIZE 4096 #endif diff --git a/rpc/pbrpccontroller.h b/rpc/pbrpccontroller.h index 0b67ded..acc9520 100644 --- a/rpc/pbrpccontroller.h +++ b/rpc/pbrpccontroller.h @@ -6,22 +6,27 @@ class PbRpcController : public ::google::protobuf::RpcController { bool failed; + QIODevice *blob; std::string errStr; public: - PbRpcController() { failed = false; } + PbRpcController() { Reset(); } // Client Side Methods - void Reset() { failed=false;} + void Reset() { failed=false; blob = NULL; } bool Failed() const { return failed; } - void StartCancel() { /* TODO */} + void StartCancel() { /*! \todo (MED) */} std::string ErrorText() const { return errStr; } // Server Side Methods void SetFailed(const std::string &reason) { failed = true; errStr = reason; } bool IsCanceled() const { return false; }; - void NotifyOnCancel(::google::protobuf::Closure *callback) { /*TODO*/ } + void NotifyOnCancel(::google::protobuf::Closure *callback) { /*! \todo (MED) */ } + + // srivatsp added + QIODevice* binaryBlob() { return blob; }; + void setBinaryBlob(QIODevice *binaryBlob) { blob = binaryBlob; }; }; #endif diff --git a/rpc/rpcserver.cpp b/rpc/rpcserver.cpp index a3978cb..039fc83 100644 --- a/rpc/rpcserver.cpp +++ b/rpc/rpcserver.cpp @@ -1,4 +1,4 @@ -#include "pbhelper.h" +//#include "pbhelper.h" #include "rpcserver.h" RpcServer::RpcServer() @@ -37,45 +37,68 @@ bool RpcServer::registerService(::google::protobuf::Service *service, void RpcServer::done(::google::protobuf::Message *resp, PbRpcController *PbRpcController) { - char msg[4096]; // FIXME: hardcoding - char *p = (char *)&msg; + QIODevice *blob; + char msg[MSGBUF_SIZE]; int len; //qDebug("In RpcServer::done"); - // TODO: check PbRpcController to see if method failed if (PbRpcController->Failed()) { qDebug("rpc failed"); goto _exit; } - if (!resp->IsInitialized()) + blob = PbRpcController->binaryBlob(); + if (blob) { - qDebug("response missing required fields!!"); - qDebug(resp->InitializationErrorString().c_str()); + len = blob->size(); + qDebug("is binary blob of len %d", len); + + *((quint16*)(&msg[0])) = HTONS(PB_MSG_TYPE_BINBLOB); // type + *((quint16*)(&msg[2])) = HTONS(pendingMethodId); // method + (*(quint32*)(&msg[4])) = HTONL(len); // len + + clientSock->write(msg, PB_HDR_SIZE); + + blob->seek(0); + while (!blob->atEnd()) + { + int l; + + len = blob->read(msg, sizeof(msg)); + l = clientSock->write(msg, len); + Q_ASSERT(l == len); + } + goto _exit; } - *((quint16*)(p+0)) = HTONS(PB_MSG_TYPE_RESPONSE); // type TODO:RESPONSE - *((quint16*)(p+2)) = HTONS(pendingMethodId); // method - *((quint16*)(p+6)) = HTONS(0); // rsvd + if (!resp->IsInitialized()) + { + qWarning("response missing required fields!!"); + qDebug(resp->InitializationErrorString().c_str()); + qFatal("exiting"); + goto _exit; + } - // SerialData is at offset 8 - resp->SerializeToArray((void*) (p+8), sizeof(msg)); + resp->SerializeToArray((void*) &msg[PB_HDR_SIZE], sizeof(msg)); len = resp->ByteSize(); - (*(quint16*)(p+4)) = HTONS(len); // len + + *((quint16*)(&msg[0])) = HTONS(PB_MSG_TYPE_RESPONSE); // type + *((quint16*)(&msg[2])) = HTONS(pendingMethodId); // method + *((quint32*)(&msg[4])) = HTONL(len); // len // Avoid printing stats since it happens once every couple of seconds if (pendingMethodId != 12) { qDebug("Server(%s): sending %d bytes to client encoding <%s>", - __FUNCTION__, len + 8, resp->DebugString().c_str()); + __FUNCTION__, len + PB_HDR_SIZE, resp->DebugString().c_str()); //BUFDUMP(msg, len + 8); } - clientSock->write(msg, len + 8); + clientSock->write(msg, PB_HDR_SIZE + len); _exit: delete PbRpcController; @@ -91,7 +114,7 @@ void RpcServer::when_newConnection() LogInt(tr("already connected, no new connections will be accepted\n")); // Accept and close connection - // TODO: Send reason msg to client + //! \todo (MED) Send reason msg to client sock = server->nextPendingConnection(); sock->disconnectFromHost(); sock->deleteLater(); @@ -127,26 +150,37 @@ void RpcServer::when_error(QAbstractSocket::SocketError socketError) void RpcServer::when_dataAvail() { - char msg[4096]; // FIXME: hardcoding; + char msg[MSGBUF_SIZE]; int msgLen; - char *p = (char*) &msg; - quint16 type, method, len, rsvd; + static bool parsing = false; + static quint16 type, method; + static quint32 len; const ::google::protobuf::MethodDescriptor *methodDesc; ::google::protobuf::Message *req, *resp; PbRpcController *controller; - + + if (!parsing) + { + if (clientSock->bytesAvailable() < PB_HDR_SIZE) + return; + + msgLen = clientSock->read(msg, PB_HDR_SIZE); + + Q_ASSERT(msgLen == PB_HDR_SIZE); + + type = NTOHS(GET16(&msg[0])); + method = NTOHS(GET16(&msg[2])); + len = NTOHL(GET32(&msg[4])); + //qDebug("type = %d, method = %d, len = %d", type, method, len); + + parsing = true; + } + + if (clientSock->bytesAvailable() < len) + return; + msgLen = clientSock->read(msg, sizeof(msg)); - //LogInt(QString(QByteArray(msg, msgLen).toHex())); - - //qDebug("Server %s: rcvd %d bytes", __FUNCTION__, msgLen); - //BUFDUMP(msg, msgLen); - - type = NTOHS(GET16(p+0)); - method = NTOHS(GET16(p+2)); - len = NTOHS(GET16(p+4)); - rsvd = NTOHS(GET16(p+6)); - //qDebug("type = %d, method = %d, len = %d, rsvd = %d", - //type, method, len, rsvd); + Q_ASSERT((unsigned) msgLen == len); if (type != PB_MSG_TYPE_REQUEST) { @@ -154,19 +188,18 @@ void RpcServer::when_dataAvail() type, PB_MSG_TYPE_REQUEST); goto _error_exit; } - methodDesc = service->GetDescriptor()->method(method); if (!methodDesc) { qDebug("server(%s): invalid method id %d", __FUNCTION__, method); - goto _error_exit; // TODO: Return Error to client + goto _error_exit; //! \todo Return Error to client } if (isPending) { qDebug("server(%s): rpc pending, try again", __FUNCTION__); - goto _error_exit; // TODO: Return Error to client + goto _error_exit; //! \todo Return Error to client } pendingMethodId = method; @@ -175,12 +208,12 @@ void RpcServer::when_dataAvail() req = service->GetRequestPrototype(methodDesc).New(); resp = service->GetResponsePrototype(methodDesc).New(); - // Serialized data starts from offset 8 - req->ParseFromArray((void*) (msg+8), len); + req->ParseFromArray((void*)msg, len); if (!req->IsInitialized()) { - qDebug("Missing required fields in request"); + qWarning("Missing required fields in request"); qDebug(req->InitializationErrorString().c_str()); + qFatal("exiting"); delete req; delete resp; @@ -196,9 +229,12 @@ void RpcServer::when_dataAvail() service->CallMethod(methodDesc, controller, req, resp, NewCallback(this, &RpcServer::done, resp, controller)); + parsing = false; + return; _error_exit: + parsing = false; qDebug("server(%s): discarding msg from client", __FUNCTION__); return; } diff --git a/rpc/rpcserver.h b/rpc/rpcserver.h index 9e21587..d93b08a 100644 --- a/rpc/rpcserver.h +++ b/rpc/rpcserver.h @@ -27,7 +27,7 @@ class RpcServer : public QObject void LogInt (QString log) {qDebug("%s", log.toAscii().data());} public: - RpcServer(); // TODO: use 'parent' param + RpcServer(); //! \todo (LOW) use 'parent' param virtual ~RpcServer(); bool registerService(::google::protobuf::Service *service, diff --git a/server/drone.cpp b/server/drone.cpp index 98300ee..1f94d7e 100644 --- a/server/drone.cpp +++ b/server/drone.cpp @@ -1,33 +1,15 @@ #include "drone.h" -extern int myport; // FIXME(HIGH) +extern int myport; Drone::Drone(QDialog *parent) : QDialog(parent) { ui.setupUi(this); -#if 0 // PB - rxtx = new RxTx(this); -#endif + rpcServer = new RpcServer(); service = new MyService(this); - rpcServer->registerService(service, myport?myport:7878); - -#if 0 // PB - serverPortNum = DRONE_PORT; - clientSock = NULL; - - if (myport) - serverPortNum = myport); - - server = new QTcpServer(this); - connect(server, SIGNAL(newConnection()), this, SLOT(when_newConnection())); - //if (!server->listen(QHostAddress("10.0.0.1"), serverPortNum)) - if (!server->listen(QHostAddress::Any, serverPortNum)) - LogInt(tr("Unable to start the server: %1").arg(server->errorString())); - else - LogInt(tr("The server is running on %1:%2").arg(server->serverAddress().toString()).arg(server->serverPort())); -#endif + rpcServer->registerService(service, myport ? myport : 7878); } void Drone::Log(const char* str) @@ -35,62 +17,7 @@ void Drone::Log(const char* str) ui.teLog->append(QString(str)); } -#if 0 // PB -int Drone::SendMsg(const void* msg, int size) -{ - qDebug("Inside SendMsg\n"); - clientSock->write((char*) msg, size); -} -#endif - void Drone::LogInt(const QString &str) { ui.teLog->append(str); } - -#if 0 // PB -void Drone::when_newConnection() -{ - if (clientSock) - { - QTcpSocket *sock; - - LogInt(tr("already connected, no new connections will be accepted\n")); - sock = server->nextPendingConnection(); - // TODO: Send reason msg to client - sock->disconnectFromHost(); - goto _exit; - } - clientSock = server->nextPendingConnection(); - LogInt(tr("accepting new connection from %1:%2").arg(clientSock->peerAddress().toString()).arg(clientSock->peerPort())); - connect(clientSock, SIGNAL(readyRead()), - this, SLOT(when_dataAvail())); - connect(clientSock, SIGNAL(disconnected()), - this, SLOT(when_disconnected())); - connect(clientSock, SIGNAL(error(QAbstractSocket::SocketError)), - this, SLOT(when_error(QAbstractSocket::SocketError))); - - -_exit: - return; -} - -void Drone::when_disconnected() -{ - LogInt(tr("closing connection from %1:%2").arg(clientSock->peerAddress().toString()).arg(clientSock->peerPort())); - clientSock->deleteLater(); - clientSock = NULL; -} - -void Drone::when_dataAvail() -{ - QByteArray msg = clientSock->read(1024); // FIXME: hardcoding - LogInt(QString(msg.toHex())); - rxtx->ProcessMsg(msg.constData(), msg.size()); -} - -void Drone::when_error(QAbstractSocket::SocketError socketError) -{ - LogInt(clientSock->errorString()); -} -#endif diff --git a/server/drone.h b/server/drone.h index 3c4adba..31574a6 100644 --- a/server/drone.h +++ b/server/drone.h @@ -3,15 +3,12 @@ #include #include + #include "ui_drone.h" #include "abstracthost.h" -#if 0 // PB -#include "rxtx.h" -#endif #include "rpcserver.h" #include "myservice.h" - class Drone : public QDialog, AbstractHost { Q_OBJECT @@ -20,28 +17,10 @@ class Drone : public QDialog, AbstractHost Ui::Drone ui; Drone(QDialog *parent = 0); void Log(const char *msg); -#if 0 // PB - int SendMsg(const void* msg, int msgLen); -#endif private: -#if 0 // PB - RxTx *rxtx; -#endif RpcServer *rpcServer; OstProto::OstService *service; void LogInt(const QString &msg); -#if 0 // PB - QTcpServer *server; - QTcpSocket *clientSock; -#define DRONE_PORT 7878 - quint16 serverPortNum; - - private slots: - void when_newConnection(); - void when_disconnected(); - void when_dataAvail(); - void when_error(QAbstractSocket::SocketError socketError); -#endif }; #endif diff --git a/server/drone_main.cpp b/server/drone_main.cpp index 5a66b6a..007f1af 100644 --- a/server/drone_main.cpp +++ b/server/drone_main.cpp @@ -2,20 +2,16 @@ Drone *drone; -//void FindDevList(void); - int myport; int main(int argc, char *argv[]) { QApplication app(argc, argv); - // FIXME(HIGH) if (argc > 1) myport = atoi(argv[1]); drone = new Drone; - //FindDevList(); drone->show(); return app.exec(); } diff --git a/server/myservice.cpp b/server/myservice.cpp index ff01e17..cb9a2db 100644 --- a/server/myservice.cpp +++ b/server/myservice.cpp @@ -2,12 +2,12 @@ #include #include #include "qdebug.h" -#include #include "myservice.h" -#include "../common/protocollist.h" +#include "../common/protocollistiterator.h" #include "../common/abstractprotocol.h" +#include "../rpc/pbrpccontroller.h" #if 0 #include @@ -19,12 +19,6 @@ StreamInfo::StreamInfo() { -#if 0 - PbHelper pbh; - - pbh.ForceSetSingularDefault(mCore); - pbh.ForceSetSingularDefault(mControl); -#endif } StreamInfo::~StreamInfo() @@ -35,30 +29,7 @@ int StreamInfo::makePacket(uchar *buf, int bufMaxSize, int n) { int pktLen, len = 0; - // Decide a frame length based on length mode - switch(lenMode()) - { - case OstProto::StreamCore::e_fl_fixed: - pktLen = frameLen(); - break; - case OstProto::StreamCore::e_fl_inc: - pktLen = frameLenMin() + (n % - (frameLenMax() - frameLenMin() + 1)); - break; - case OstProto::StreamCore::e_fl_dec: - pktLen = frameLenMax() - (n % - (frameLenMax() - frameLenMin() + 1)); - break; - case OstProto::StreamCore::e_fl_random: - pktLen = frameLenMin() + (qrand() % - (frameLenMax() - frameLenMin() + 1)); - break; - default: - qWarning("Unhandled len mode %d. Using default 64", - lenMode()); - pktLen = 64; - break; - } + pktLen = frameLen(n); // pktLen is adjusted for CRC/FCS which will be added by the NIC pktLen -= 4; @@ -66,18 +37,22 @@ int StreamInfo::makePacket(uchar *buf, int bufMaxSize, int n) if ((pktLen < 0) || (pktLen > bufMaxSize)) return 0; - // FIXME: Calculated pktLen is an input to Payload Protocol - foreach(const AbstractProtocol* proto, *currentFrameProtocols) - { - QByteArray ba; + ProtocolListIterator *iter; + iter = createProtocolListIterator(); + while (iter->hasNext()) + { + AbstractProtocol *proto; + QByteArray ba; + + proto = iter->next(); ba = proto->protocolFrameValue(n); + if (len + ba.size() < bufMaxSize) - { memcpy(buf+len, ba.constData(), ba.size()); - } len += ba.size(); } + delete iter; return pktLen; } @@ -155,9 +130,9 @@ PortInfo::PortInfo(uint id, pcap_if_t *dev) if (dev->description) d.set_description(dev->description); - d.set_is_enabled(true); // FIXME(MED):check - d.set_is_oper_up(true); // FIXME(MED):check - d.set_is_exclusive_control(false); // FIXME(MED): check + d.set_is_enabled(true); //! \todo (LOW) admin enable/disable of port + d.set_is_oper_up(true); //! \todo (HIGH) oper up/down of port + d.set_is_exclusive_control(false); //! \todo (HIGH) port exclusive control memset((void*) &stats, 0, sizeof(stats)); resetStats(); @@ -190,7 +165,7 @@ void PortInfo::update() sendQueueList.clear(); returnToQIdx = -1; - // TODO(LOW): calculate sendqueue size + //! \todo (LOW): calculate sendqueue size sendQ.sendQueue = pcap_sendqueue_alloc(1*MB); sendQ.sendQueueCumLen.clear(); @@ -254,7 +229,7 @@ void PortInfo::update() { sendQueueList.append(sendQ); - // TODO(LOW): calculate sendqueue size + //! \todo (LOW): calculate sendqueue size sendQ.sendQueue = pcap_sendqueue_alloc(1*MB); sendQ.sendQueueCumLen.clear(); @@ -291,16 +266,17 @@ void PortInfo::update() goto _stop_no_more_pkts; case ::OstProto::StreamControl::e_nw_goto_id: - // TODO(MED): define and use - // streamList[i].d.control().goto_stream_id(); + /*! \todo (MED): define and use + streamList[i].d.control().goto_stream_id(); */ - // TODO(MED): assumes goto Id is less than current!!!! - // To support goto to any id, do - // if goto_id > curr_id then - // i = goto_id; - // goto restart; - // else - // returnToQIdx = 0; + /*! \todo (MED): assumes goto Id is less than current!!!! + To support goto to any id, do + if goto_id > curr_id then + i = goto_id; + goto restart; + else + returnToQIdx = 0; + */ returnToQIdx=0; goto _stop_no_more_pkts; @@ -344,9 +320,9 @@ void PortInfo::stopCapture() capturer.stop(); } -void PortInfo::viewCapture() +QFile* PortInfo::captureFile() { - capturer.view(); + return capturer.captureFile(); } void PortInfo::resetStats() @@ -550,7 +526,7 @@ void PortInfo::PortMonitorRx::callbackRx(u_char *state, // Update RxStats and RxRates using PCAP data usec = (header->ts.tv_sec - port->lastTsRx.tv_sec) * 1000000 + (header->ts.tv_usec - port->lastTsRx.tv_usec); - // TODO(rate) + //! \todo support Rx Pkt/Bit rate on Linux (libpcap callback) #if 0 port->stats.rxPps = (pkts * 1000000) / usec; port->stats.rxBps = (bytes * 1000000) / usec; @@ -581,7 +557,7 @@ void PortInfo::PortMonitorTx::callbackTx(u_char *state, // Update TxStats and TxRates using PCAP data usec = (header->ts.tv_sec - port->lastTsTx.tv_sec) * 1000000 + (header->ts.tv_usec - port->lastTsTx.tv_usec); - // TODO(rate) + //! \todo support Tx Pkt/Bit rate on Linux (libpcap callback) #if 0 port->stats.txPps = (pkts * 1000000) / usec; port->stats.txBps = (bytes * 1000000) / usec; @@ -703,7 +679,7 @@ PortInfo::PortTransmitter::PortTransmitter(PortInfo *port) void PortInfo::PortTransmitter::run() { - // TODO(HI): Stream Mode - one pass/continuous + //! \todo (MED) Stream Mode - continuous: define before implement // NOTE1: We can't use pcap_sendqueue_transmit() directly even on Win32 // 'coz of 2 reasons - there's no way of stopping it before all packets @@ -739,14 +715,20 @@ PortInfo::PortCapture::PortCapture(PortInfo *port) dumpHandle = NULL; } +PortInfo::PortCapture::~PortCapture() +{ +} + void PortInfo::PortCapture::run() { + int ret; + if (capHandle == NULL) { char errbuf[PCAP_ERRBUF_SIZE]; - capHandle = pcap_open_live(port->dev->name, 0, - PCAP_OPENFLAG_PROMISCUOUS, 1000 /*ms*/, errbuf); + capHandle = pcap_open_live(port->dev->name, 65535, + PCAP_OPENFLAG_PROMISCUOUS, -1, errbuf); if (capHandle == NULL) { qDebug("Error opening port %s: %s\n", @@ -757,12 +739,27 @@ void PortInfo::PortCapture::run() { if (!capFile.open()) qFatal("Unable to open temp cap file"); - qDebug("cap file = %s", capFile.fileName().toAscii().constData()); } + + qDebug("cap file = %s", capFile.fileName().toAscii().constData()); dumpHandle = pcap_dump_open(capHandle, capFile.fileName().toAscii().constData()); - pcap_loop(capHandle, -1, pcap_dump, (uchar*) dumpHandle); + ret = pcap_loop(capHandle, -1, pcap_dump, (uchar*) dumpHandle); + switch (ret) + { + case -2: + qDebug("%s: breakloop called %d", __PRETTY_FUNCTION__, ret); + break; + + case -1: + case 0: + qFatal("%s: unexpected break from loop (%d): %s", + __PRETTY_FUNCTION__, ret, pcap_geterr(capHandle)); + break; + default: + qFatal("%s: Unexpected return value %d", __PRETTY_FUNCTION__, ret); + } } void PortInfo::PortCapture::stop() @@ -776,11 +773,9 @@ void PortInfo::PortCapture::stop() } } -void PortInfo::PortCapture::view() +QFile* PortInfo::PortCapture::captureFile() { - // FIXME: hack - when correcting this remove the include also - QProcess::execute("C:/Program Files/Wireshark/wireshark.exe", - QStringList() << capFile.fileName()); + return &capFile; } @@ -914,7 +909,7 @@ const ::OstProto::PortId* request, { qDebug("%s: Invalid port id %d", __PRETTY_FUNCTION__, portIdx); controller->SetFailed("Invalid Port Id"); - goto _exit; // TODO(LOW): Partial status of RPC + goto _exit; //! \todo (LOW): Partial status of RPC } response->mutable_port_id()->set_id(portIdx); @@ -954,7 +949,7 @@ const ::OstProto::StreamIdList* request, streamIndex = getStreamIndex(portIdx, request->stream_id(i).id()); if (streamIndex < 0) - continue; // TODO(LOW): Partial status of RPC + continue; //! \todo(LOW): Partial status of RPC s = response->add_stream(); @@ -989,7 +984,7 @@ const ::OstProto::StreamIdList* request, // If stream with same id as in request exists already ==> error!! streamIndex = getStreamIndex(portIdx, request->stream_id(i).id()); if (streamIndex >= 0) - continue; // TODO(LOW): Partial status of RPC + continue; //! \todo (LOW): Partial status of RPC // Append a new "default" stream - actual contents of the new stream is // expected in a subsequent "modifyStream" request - set the stream id @@ -997,7 +992,7 @@ const ::OstProto::StreamIdList* request, s->mStreamId.CopyFrom(request->stream_id(i)); portInfo[portIdx]->streamList.append(s); - // TODO(LOW): fill-in response "Ack"???? + //! \todo (LOW): fill-in response "Ack"???? } portInfo[portIdx]->setDirty(true); _exit: @@ -1027,11 +1022,11 @@ const ::OstProto::StreamIdList* request, streamIndex = getStreamIndex(portIdx, request->stream_id(i).id()); if (streamIndex < 0) - continue; // TODO(LOW): Partial status of RPC + continue; //! \todo (LOW): Partial status of RPC delete portInfo[portIdx]->streamList.takeAt(streamIndex); - // TODO(LOW): fill-in response "Ack"???? + //! \todo (LOW): fill-in response "Ack"???? } portInfo[portIdx]->setDirty(true); _exit: @@ -1061,12 +1056,12 @@ const ::OstProto::StreamConfigList* request, streamIndex = getStreamIndex(portIdx, request->stream(i).stream_id().id()); if (streamIndex < 0) - continue; // TODO(LOW): Partial status of RPC + continue; //! \todo (LOW): Partial status of RPC portInfo[portIdx]->streamList[streamIndex]->protoDataCopyFrom( request->stream(i)); - // TODO(LOW): fill-in response "Ack"???? + //! \todo(LOW): fill-in response "Ack"???? } portInfo[portIdx]->setDirty(true); _exit: @@ -1087,7 +1082,7 @@ const ::OstProto::PortIdList* request, portIdx = request->port_id(i).id(); if (portIdx >= numPorts) - continue; // TODO(LOW): partial RPC? + continue; //! \todo (LOW): partial RPC? if (portInfo[portIdx]->isDirty()) portInfo[portIdx]->update(); @@ -1099,12 +1094,12 @@ const ::OstProto::PortIdList* request, portIdx = request->port_id(i).id(); if (portIdx >= numPorts) - continue; // TODO(LOW): partial RPC? + continue; //! \todo (LOW): partial RPC? portInfo[portIdx]->startTransmit(); } - // TODO(LOW): fill-in response "Ack"???? + //! \todo (LOW): fill-in response "Ack"???? done->Run(); } @@ -1122,11 +1117,11 @@ const ::OstProto::PortIdList* request, portIdx = request->port_id(i).id(); if (portIdx >= numPorts) - continue; // TODO(LOW): partial RPC? + continue; //! \todo (LOW): partial RPC? portInfo[portIdx]->stopTransmit(); } - // TODO(LOW): fill-in response "Ack"???? + //! \todo (LOW): fill-in response "Ack"???? done->Run(); } @@ -1143,7 +1138,7 @@ const ::OstProto::PortIdList* request, portIdx = request->port_id(i).id(); if (portIdx >= numPorts) - continue; // TODO(LOW): partial RPC? + continue; //! \todo (LOW): partial RPC? portInfo[portIdx]->startCapture(); } @@ -1163,7 +1158,7 @@ const ::OstProto::PortIdList* request, portIdx = request->port_id(i).id(); if (portIdx >= numPorts) - continue; // TODO(LOW): partial RPC? + continue; //! \todo (LOW): partial RPC? portInfo[portIdx]->stopCapture(); } @@ -1172,25 +1167,25 @@ const ::OstProto::PortIdList* request, } void MyService::getCaptureBuffer(::google::protobuf::RpcController* controller, -const ::OstProto::PortIdList* request, -::OstProto::CaptureBufferList* response, +const ::OstProto::PortId* request, +::OstProto::CaptureBuffer* response, ::google::protobuf::Closure* done) { + uint portIdx; qDebug("In %s", __PRETTY_FUNCTION__); - // FIXME: BAD BAD VERY BAD !!!!!! - for (int i=0; i < request->port_id_size(); i++) + portIdx = request->id(); + if (portIdx >= numPorts) { - uint portIdx; - - portIdx = request->port_id(i).id(); - if (portIdx >= numPorts) - continue; // TODO(LOW): partial RPC? - - portInfo[portIdx]->viewCapture(); + controller->SetFailed("invalid portid"); + goto _exit; } - controller->SetFailed("Not Implemented"); + portInfo[portIdx]->stopCapture(); + static_cast(controller)->setBinaryBlob( + portInfo[portIdx]->captureFile()); + +_exit: done->Run(); } @@ -1208,7 +1203,7 @@ const ::OstProto::PortIdList* request, portidx = request->port_id(i).id(); if (portidx >= numPorts) - continue; // TODO(LOW): partial rpc? + continue; //! \todo(LOW): partial rpc? s = response->add_port_stats(); s->mutable_port_id()->set_id(request->port_id(i).id()); @@ -1260,11 +1255,11 @@ const ::OstProto::PortIdList* request, portIdx = request->port_id(i).id(); if (portIdx >= numPorts) - continue; // TODO(LOW): partial RPC? + continue; //! \todo (LOW): partial RPC? portInfo[portIdx]->resetStats(); } - // TODO(LOW): fill-in response "Ack"???? + //! \todo (LOW): fill-in response "Ack"???? done->Run(); } diff --git a/server/myservice.h b/server/myservice.h index 270bfba..fb55283 100644 --- a/server/myservice.h +++ b/server/myservice.h @@ -105,9 +105,10 @@ class PortInfo public: PortCapture(PortInfo *port); + ~PortCapture(); void run(); void stop(); - void view(); + QFile* captureFile(); }; OstProto::Port d; @@ -173,7 +174,7 @@ public: void stopTransmit(); void startCapture(); void stopCapture(); - void viewCapture(); + QFile* captureFile(); void resetStats(); }; @@ -241,8 +242,8 @@ public: ::OstProto::Ack* response, ::google::protobuf::Closure* done); virtual void getCaptureBuffer(::google::protobuf::RpcController* controller, - const ::OstProto::PortIdList* request, - ::OstProto::CaptureBufferList* response, + const ::OstProto::PortId* request, + ::OstProto::CaptureBuffer* response, ::google::protobuf::Closure* done); virtual void getStats(::google::protobuf::RpcController* controller, const ::OstProto::PortIdList* request, diff --git a/server/pcapextra.cpp b/server/pcapextra.cpp index 4ad8a51..197e83b 100644 --- a/server/pcapextra.cpp +++ b/server/pcapextra.cpp @@ -79,14 +79,14 @@ _restart: if (*p_stop) return ret; - // TODO(HI): Timing between subsequent sendQueues + //! \todo (HIGH): Timing between subsequent sendQueues } if (returnToQIdx >= 0) { i = returnToQIdx; - // FIXME: 1s fixed; Change this to ipg of last stream + //! \todo (HIGH) 1s fixed; Change this to ipg of last stream (*pf_usleep)(1000000); goto _restart; } diff --git a/server/rxtx.cpp b/server/rxtx.cpp deleted file mode 100644 index 11789f1..0000000 --- a/server/rxtx.cpp +++ /dev/null @@ -1,514 +0,0 @@ - -FIXME(HI): File Not used anymore - -#if 0 -#include "qtglobal" // FIXME: needed only for qdebug -#include "rxtx.h" -#if 0 // PB -#include "../common/protocol.h" -#endif - - - -//#define LOG(...) drone->ui.teLOG->append(QString().sprintf( __VA_ARGS__)) -//#define LOG(...) drone->LOG(QString().sprintf( __VA_ARGS__)) -#define LOG(...) {sprintf(logStr, __VA_ARGS__); host->Log(logStr);} - - -RxTx::RxTx(AbstractHost *host) -{ - pcap_if_t *d; - int i=0; - char errbuf[PCAP_ERRBUF_SIZE]; - - // Init Data - RxTx::host = host; - numPorts = 0; - alldevs = NULL; - - LOG("Retrieving the device list from the local machine\n"); - if (pcap_findalldevs_ex(PCAP_SRC_IF_STRING, NULL, &alldevs, errbuf) == -1) - { - LOG("Error in pcap_findalldevs_ex: %s\n", errbuf); - goto _fail; - } - - /* Count number of local ports */ - for(d = alldevs; d != NULL; d = d->next) - numPorts++; - - portInfo = new PortInfo[numPorts]; - - /* Print the list */ - for(i=0, d=alldevs; d!=NULL; i++, d=d->next) - { - portInfo[i].portId = i; - portInfo[i].dev = d; - portInfo[i].streamHead = NULL; - portInfo[i].streamTail = NULL; -#if 1 - LOG("%d. %s", i, d->name); - if (d->description) - { - LOG(" (%s)\n", d->description); - } - else - LOG(" (No description available)\n"); -#endif - } - - if (i == 0) - { - LOG("\nNo interfaces found! Make sure WinPcap is installed.\n"); - goto _fail; - } - -_fail: - return; -} - -#if 0 -RxTx::LOG(char* fmt, ...) -{ - sprintf(logStr, fmt, _VA_ARGS_); - host->LOG(logStr); -} -#endif - -RxTx::~RxTx() -{ - unsigned int i; - - for (i = 0; i < numPorts; i++) - DeleteAllStreams(i); - pcap_freealldevs(alldevs); -} - -void RxTx::ProcessMsg(const char* msg, int len) -{ - tCommHdr *hdr; - // TODO: For now assuming we'll get a complete msg - // but need to fix this as this is a TCP stream - - hdr = (tCommHdr*) msg; - - if (hdr->ver != 1) // FIXME:hardcoding - { - LOG("Rcvd msg with invalid version %d\n", hdr->ver); - goto _exit; - } - - qDebug("msgType - %x: %x\n", hdr->msgType, NTOHS(hdr->msgType)); - switch (NTOHS(hdr->msgType)) - { - case e_MT_GetCapability: - SendCapabilityInfo(); - break; - case e_MT_ChangePortConfig: - ProcessPortConfig(msg+sizeof(tCommHdr), len - sizeof(tCommHdr)); - break; - case e_MT_GetPortConfig: - SendPortInfo(0); // FIXME - break; - - default: - LOG("Rcvd msg with unrecognized msgType %d\n", NTOHS(hdr->msgType)); - } - -_exit: - return; -} - -void RxTx::SendCapabilityInfo(void) -{ - unsigned char *msg, *p; - unsigned int i, msgLen; - - p = msg = (unsigned char*) pktBuff; - ((tCommHdr*)(p))->ver = 1; - ((tCommHdr*)(p))->msgType = HTONS(e_MT_CapabilityInfo); - p += sizeof(tCommHdr); - - for (i = 0; i < numPorts; i++) - { - // TLV: Port Capability - ((tTlvPortCapability*)(p))->tlvType = HTONS(e_TT_PortCapability); - ((tTlvPortCapability*)(p))->tlvLen = HTONS(sizeof(tTlvPortCapability)); - ((tTlvPortCapability*)(p))->portId = HTONL(portInfo[i].portId); - ((tTlvPortCapability*)(p))->portSpeed = 0; // TODO -#if 0 - strncpy(((tTlvPortCapability*)(p))->name, - portInfo[i].dev->name, TLV_MAX_PORT_NAME); - ((tTlvPortCapability*)(p))->name[TLV_MAX_PORT_NAME-1] = 0; -#else - strcpy(((tTlvPortCapability*)(p))->portName, "eth"); - //strcat(((tTlvPortCapability*)(p))->name, itoa(portInfo[i].portId, NULL, 10)); - itoa(portInfo[i].portId, &(((tTlvPortCapability*)(p))->portName[3]), 10); -#endif - strncpy(((tTlvPortCapability*)(p))->portDesc, - portInfo[i].dev->description, TLV_MAX_PORT_DESC); - ((tTlvPortCapability*)(p))->portDesc[TLV_MAX_PORT_DESC -1] = 0; - p += sizeof(tTlvPortCapability); - } - msgLen = (p - msg); - ((tCommHdr*)(msg))->msgLen = HTONS(msgLen); - - logStr[0] = 0; - for (i = 0; i < msgLen >> 2; i++) - { - char word[10]; - - sprintf(word, "%08X ", HTONL(((unsigned int *)(msg))[i])); - strcat(logStr, word); - } - host->Log("Sending msg\n"); - host->Log(logStr); -#if 0 // PB - host->SendMsg(pktBuff, msgLen); -#endif -} - -void RxTx::ProcessPortConfig(const char* msg, int len) -{ - // ASSUMPTION: msg points to start of first TLV - UINT8 *p = (UINT8*) msg; - uTlvStream u; - Stream *s; - - // Extract and process each TLV - while (len) - { - if (len < 12) - { - LOG("Length (%d) Error - not enough to fit a TLV", len); - goto _exit; - } - - u.tlv.tlvType = NTOHS(GET16(p)); - u.tlv.tlvLen = NTOHS(GET16(p+2)); - u.tlv.portId = NTOHL(GET32(p+4)); - u.tlv.streamId = NTOHL(GET32(p+8)); - - p += 12; - len -= 12; - - // Locate the correct node for processing - if (u.tlv.portId >= numPorts) - goto _next_tlv; - - s = GetStream(u.tlv.portId, u.tlv.streamId); - if ((s == NULL) && (u.tlv.tlvType!= e_TT_StreamOper)) - { - LOG("Unrecognized stream Id %d\n", u.tlv.streamId); - goto _next_tlv; - } - - switch(u.tlv.tlvType) - { - case e_TT_StreamOper: - u.oper.streamOper = NTOHS(GET16(p+2)); - switch (u.oper.streamOper) - { - case TLV_STREAM_OPER_DELETE: - if (!DeleteStream(u.tlv.portId, u.tlv.streamId)) - { - LOG("No Stream with id %d currently in list\n", - u.tlv.streamId); - goto _next_tlv; - } - break; - case TLV_STREAM_OPER_INSERT_HEAD: - s = new Stream; - s->id = u.tlv.streamId; - - InsertStreamAtHead(u.tlv.portId, s); - break; - case TLV_STREAM_OPER_INSERT_TAIL: - s = new Stream; - s->id = u.tlv.streamId; - - InsertStreamAtTail(u.tlv.portId, s); - break; - case TLV_STREAM_OPER_INSERT_BEFORE: - { - UINT32 nextStreamId; - - s = new Stream; - s->id = u.tlv.streamId; - - nextStreamId = NTOHS(GET32(p+4)); - - if (!InsertStreamBefore(u.tlv.portId, s, nextStreamId)) - { - LOG("List Empty or No stream with id %d " - "currently in list\n", nextStreamId); - goto _next_tlv; - } - break; - } - default: - LOG("Unrecognized Stream Oper %d\n", - u.oper.streamOper); - goto _next_tlv; - } - break; - case e_TT_StreamName: - strncpy(s->name, (char*) p, MAX_STREAM_NAME_SIZE); - break; - - case e_TT_StreamStatus: - u.status.streamStatus = NTOHL(GET32(p)); - if (u.status.streamStatus == TLV_STREAM_STATUS_DISABLED) - s->flags |= STREAM_FLAG_VALUE_STATUS_DISABLED; // FIXME - else if (u.status.streamStatus == TLV_STREAM_STATUS_ENABLED) - s->flags |= STREAM_FLAG_VALUE_STATUS_ENABLED; // FIXME - else - goto _next_tlv; - break; - - case e_TT_StreamFrameLength: - u.frameLen.frameLenMode = NTOHS(GET16(p)); - u.frameLen.frameLen = NTOHS(GET16(p+2)); - u.frameLen.frameLenMin = NTOHS(GET16(p+4)); - u.frameLen.frameLenMax = NTOHS(GET16(p+6)); - - s->pktLen = u.frameLen.frameLen; - - // FIXME: other frameLen params - break; - - case e_TT_StreamDataPattern: - u.dataPattern.dataPatternMode = NTOHS(GET16(p)); - u.dataPattern.dataPattern = NTOHS(GET32(p+4)); - - s->dataPattern = u.dataPattern.dataPattern; - - // FIXME: other dataPattern params - break; - - case e_TT_StreamHeaderData: - u.headerData.headerLen = NTOHS(GET16(p+2)); - - s->hdrLen = u.headerData.headerLen; - memcpy(s->pktHdr, p+4, u.headerData.headerLen); - break; - - default: - LOG("Unrecognizeed/Unexpected TLV %d\n", u.tlv.tlvType); - } - -_next_tlv: - p += u.tlv.tlvLen; - len -= u.tlv.tlvLen; - } - -_exit: - return; -} - -void RxTx::SendPortInfo(unsigned int port) -{ - // FIXME -} - -/* -** --------------------- STREAM LIST OPERATIONS ------------------------- -*/ - -void RxTx::InsertStreamAtHead(unsigned int port, Stream *s) -{ - if (portInfo[port].streamHead == NULL) - { - // list empty - first entry being added - s->next = NULL; - portInfo[port].streamHead = portInfo[port].streamTail = s; - } - else - { - // at least one entry in list, so tail does not change - s->next = portInfo[port].streamHead; - portInfo[port].streamHead = s; - } -} - -void RxTx::InsertStreamAtTail(unsigned int port, Stream *s) -{ - s->next = NULL; - if (portInfo[port].streamHead == NULL) - { - // list empty - first entry being added - portInfo[port].streamHead = portInfo[port].streamTail = s; - } - else - { - // at least one entry in list, so head does not change - portInfo[port].streamTail->next = s; - portInfo[port].streamTail = s; - } -} - -bool RxTx::InsertStreamBefore(unsigned int port, Stream *s, - unsigned int nextStreamId) -{ - Stream *q, *r; - - // For an "Insert Before", list cannot be empty - if (portInfo[port].streamHead == NULL) - { - LOG("Cannot 'insert before' in an empty list"); - return false; - } - - // Traverse with 'r' and keep track of previous with 'q' - q = NULL; - r = portInfo[port].streamHead; - while (r != NULL) - { - if (r->id == nextStreamId) - { - if (r == portInfo[port].streamHead) - { - // Insert at Head - s->next = portInfo[port].streamHead; - portInfo[port].streamHead = s; - } - else if (r == portInfo[port].streamTail) - { - // Insert one before Tail - s->next = portInfo[port].streamTail; - q->next = s; - } - else - { - s->next = r; - q->next = s; - } - - break; - } - q = r; - r = r->next; - } - - if (r == NULL) - return false; - else - return true; -} - -bool RxTx::DeleteStream(unsigned int port, Stream *s) -{ - Stream *q, *r; - - // Traverse with 'r' and keep track of prev with 'q' - q = NULL; - r = portInfo[port].streamHead; - while (r != NULL) - { - if (r == s) - { - if (r == portInfo[port].streamHead) - { - if (portInfo[port].streamHead == portInfo[port].streamTail) - { - portInfo[port].streamHead = NULL; - portInfo[port].streamTail = NULL; - } - else - portInfo[port].streamHead = portInfo[port].streamHead->next; - } - else if (r == portInfo[port].streamTail) - { - q->next = NULL; - portInfo[port].streamTail = q; - } - else - { - q->next = r->next; - } - - delete r; - break; - } - q = r; - r = r->next; - } - - if (r == NULL) - return false; - else - return true; -} - -bool RxTx::DeleteStream(unsigned int port, unsigned int streamId) -{ - Stream *q, *r; - - // Traverse with 'r' and keep track of prev with 'q' - q = NULL; - r = portInfo[port].streamHead; - while (r != NULL) - { - if (r->id == streamId) - { - if (r == portInfo[port].streamHead) - { - if (portInfo[port].streamHead == portInfo[port].streamTail) - { - portInfo[port].streamHead = NULL; - portInfo[port].streamTail = NULL; - } - else - portInfo[port].streamHead = portInfo[port].streamHead->next; - } - else if (r == portInfo[port].streamTail) - { - q->next = NULL; - portInfo[port].streamTail = q; - } - else - { - q->next = r->next; - } - - delete r; - break; - } - q = r; - r = r->next; - } - - if (r == NULL) - return false; - else - return true; -} - -void RxTx::DeleteAllStreams(unsigned int port) -{ - Stream *r, *q; - - r = portInfo[port].streamHead; - while (r != NULL) - { - q = r; - r = r->next; - delete q; - } -} - -Stream* RxTx::GetStream(unsigned int port, unsigned int streamId) -{ - Stream *r; - - r = portInfo[port].streamHead; - while (r != NULL) - { - if (r->id == streamId) - return r; - r = r->next; - } - - return NULL; -} -#endif diff --git a/server/rxtx.h b/server/rxtx.h deleted file mode 100644 index 7b86991..0000000 --- a/server/rxtx.h +++ /dev/null @@ -1,84 +0,0 @@ - -FIXME(HI): File not used anymore - -#if 0 - -#ifndef _RXTX_H -#define _RXTX_H - -#include "pcap.h" -#include "abstracthost.h" - -#include "../common/protocol.h" - -#define GET16(x) (UINT16)( \ - (*((UINT8*)x+0) << 16 ) \ - | (*((UINT8*)x+1))) - -#define GET32(x) (UINT32)( \ - (*((UINT8*)x+0) << 24) \ - | (*((UINT8*)x+1) << 16) \ - | (*((UINT8*)x+2) << 8 ) \ - | (*((UINT8*)x+3))) - -#define MAX_PKT_HDR_SIZE 1536 -#define MAX_STREAM_NAME_SIZE 64 - -typedef struct _Stream -{ - unsigned int id; - char name[MAX_STREAM_NAME_SIZE]; - unsigned char pktHdr[MAX_PKT_HDR_SIZE]; - unsigned short hdrLen; - unsigned short pktLen; - unsigned int dataPattern; - unsigned int flags; -#define STREAM_FLAG_MASK_STATUS 0x00000001 -#define STREAM_FLAG_VALUE_STATUS_DISABLED 0x00000000 -#define STREAM_FLAG_VALUE_STATUS_ENABLED 0x00000001 - - struct _Stream *next; -} Stream; - -typedef struct -{ - unsigned int portId; - pcap_if_t *dev; - Stream *streamHead; - Stream *streamTail; -} PortInfo; - -class RxTx -{ - public: - RxTx(AbstractHost* host); - ~RxTx(); - void ProcessMsg(const char* msg, int len); - - private: - AbstractHost *host; - char logStr[1024]; - -#define MAX_PKT_SIZE 1024 - unsigned char pktBuff[MAX_PKT_SIZE]; - unsigned numPorts; - PortInfo *portInfo; - pcap_if_t *alldevs; - - void InsertStreamAtHead(unsigned int port, Stream *s); - void InsertStreamAtTail(unsigned int port, Stream *s); - bool InsertStreamBefore(unsigned int port, Stream *s, - unsigned int nextStreamId); - bool DeleteStream(unsigned int port, Stream *s); - bool DeleteStream(unsigned int port, unsigned int streamId); - void DeleteAllStreams(unsigned int port); - Stream* GetStream(unsigned int port, unsigned int streamId); - - //void Log(char *fmt, ...); - void SendCapabilityInfo(void); - void SendPortInfo(unsigned int port); - void ProcessPortConfig(const char* msg, int len); -}; - -#endif -#endif