diff --git a/client/port.cpp b/client/port.cpp index 89d94c3..06f40ae 100644 --- a/client/port.cpp +++ b/client/port.cpp @@ -16,6 +16,7 @@ Port::Port(quint32 id, quint32 portGroupId) { mPortId = id; d.mutable_port_id()->set_id(id); + stats.mutable_port_id()->set_id(id); mPortGroupId = portGroupId; } @@ -166,10 +167,6 @@ void Port::getModifiedStreamsSinceLastSync( } } - -// -// ----------- SLOTS ------------- -// void Port::when_syncComplete() { qSort(mStreams); @@ -179,3 +176,8 @@ void Port::when_syncComplete() mLastSyncStreamList.append(mStreams[i].id()); } +void Port::updateStats(OstProto::PortStats *portStats) +{ + stats.MergeFrom(*portStats); +} + diff --git a/client/port.h b/client/port.h index f96d31c..b16d680 100644 --- a/client/port.h +++ b/client/port.h @@ -13,8 +13,9 @@ class Port { //friend class StreamModel; private: - static uint mAllocStreamId; - OstProto::Port d; + static uint mAllocStreamId; + OstProto::Port d; + OstProto::PortStats stats; // FIXME(HI): consider removing mPortId as it is duplicated inside 'd' quint32 mPortId; @@ -68,6 +69,7 @@ public: Q_ASSERT(index < mStreams.size()); return mStreams[index]; } + OstProto::PortStats getStats() { return stats; } // FIXME(MED): naming inconsistency - PortConfig/Stream; also retVal void updatePortConfig(OstProto::Port *port); @@ -89,7 +91,11 @@ public: void getModifiedStreamsSinceLastSync( OstProto::StreamConfigList &streamConfigList); + void when_syncComplete(); + + void updateStats(OstProto::PortStats *portStats); + }; #endif diff --git a/client/portgroup.cpp b/client/portgroup.cpp index 3de90cc..70cb66e 100644 --- a/client/portgroup.cpp +++ b/client/portgroup.cpp @@ -413,3 +413,81 @@ void PortGroup::processModifyStreamAck(OstProto::Ack *ack) // TODO(HI): Apply Button should now be disabled???!!!!??? } + +void PortGroup::startTx(QList portList) +{ + OstProto::PortIdList portIdList; + OstProto::Ack *ack = new OstProto::Ack; + + qDebug("In %s", __FUNCTION__); + + for (int i = 0; i < portList.size(); i++) + { + OstProto::PortId *portId; + portId = portIdList.add_port_id(); + portId->set_id(portList.at(i)); + } + + serviceStub->startTx(rpcController, &portIdList, ack, + NewCallback(this, &PortGroup::processStartTxAck, ack)); +} + +void PortGroup::processStartTxAck(OstProto::Ack *ack) +{ + qDebug("In %s", __FUNCTION__); + + delete ack; +} + +void PortGroup::getPortStats() +{ + OstProto::PortStatsList *portStatsList = new OstProto::PortStatsList; + + qDebug("In %s", __FUNCTION__); + + serviceStub->getStats(rpcController, &portIdList, portStatsList, + NewCallback(this, &PortGroup::processPortStatsList, portStatsList)); +} + +void PortGroup::processPortStatsList(OstProto::PortStatsList *portStatsList) +{ + qDebug("In %s", __FUNCTION__); + + if (rpcController->Failed()) + { + qDebug("%s: rpc failed", __FUNCTION__); + goto _error_exit; + } + + for(int i = 0; i < portStatsList->port_stats_size(); i++) + { + uint id; + + id = portStatsList->port_stats(i).port_id().id(); + // FIXME: don't mix port id & index into mPorts[] + mPorts[id].updateStats(portStatsList->mutable_port_stats(i)); + } + + emit statsChanged(mPortGroupId); + +_error_exit: + delete portStatsList; +} + +void PortGroup::clearPortStats() +{ + OstProto::Ack *ack = new OstProto::Ack; + + qDebug("In %s", __FUNCTION__); + + serviceStub->clearStats(rpcController, &portIdList, ack, + NewCallback(this, &PortGroup::processClearStatsAck, ack)); +} + +void PortGroup::processClearStatsAck(OstProto::Ack *ack) +{ + qDebug("In %s", __FUNCTION__); + + delete ack; +} + diff --git a/client/portgroup.h b/client/portgroup.h index 89c02af..e2b5317 100644 --- a/client/portgroup.h +++ b/client/portgroup.h @@ -69,10 +69,20 @@ public: OstProto::StreamConfigList *streamConfigList = NULL); void processModifyStreamAck(OstProto::Ack *ack); + + void startTx(QList portList); + void processStartTxAck(OstProto::Ack *ack); + + void getPortStats(); + void processPortStatsList(OstProto::PortStatsList *portStatsList); + void clearPortStats(); + void processClearStatsAck(OstProto::Ack *ack); + signals: void portGroupDataChanged(PortGroup* portGroup); void portListAboutToBeChanged(quint32 portGroupId); void portListChanged(quint32 portGroupId); + void statsChanged(quint32 portGroupId); private slots: void on_rpcChannel_stateChanged(); diff --git a/client/portgrouplist.cpp b/client/portgrouplist.cpp index 06df02d..a503449 100644 --- a/client/portgrouplist.cpp +++ b/client/portgrouplist.cpp @@ -74,6 +74,9 @@ void PortGroupList::addPortGroup(PortGroup &portGroup) connect(&portGroup, SIGNAL(portListChanged(quint32)), &mPortStatsModel, SLOT(when_portListChanged())); + connect(&portGroup, SIGNAL(statsChanged(quint32)), + &mPortStatsModel, SLOT(when_portGroup_stats_update(quint32))); + mPortGroups.append(&portGroup); portGroup.connectToHost(); diff --git a/client/portgrouplist.h b/client/portgrouplist.h index 84bdb9c..aadd24d 100644 --- a/client/portgrouplist.h +++ b/client/portgrouplist.h @@ -28,6 +28,7 @@ class PortGroupList : public QObject { public: PortGroupList::PortGroupList(); + PortModel* getPortModel() { return &mPortGroupListModel; } PortStatsModel* getPortStatsModel() { return &mPortStatsModel; } StreamModel* getStreamModel() { return &mStreamListModel; } @@ -37,6 +38,9 @@ public: PortGroup& portGroup(const QModelIndex& index); Port& port(const QModelIndex& index); + int numPortGroups() { return mPortGroups.size(); } + PortGroup& portGroupByIndex(int index) { return *(mPortGroups[index]); } + void addPortGroup(PortGroup &portGroup); void removePortGroup(PortGroup &portGroup); diff --git a/client/portstatsmodel.cpp b/client/portstatsmodel.cpp index 5da586b..d8b40e6 100644 --- a/client/portstatsmodel.cpp +++ b/client/portstatsmodel.cpp @@ -1,10 +1,18 @@ #include "portstatsmodel.h" #include "portgrouplist.h" +#include + PortStatsModel::PortStatsModel(PortGroupList *p, QObject *parent) : QAbstractTableModel(parent) { + QTimer *timer; + pgl = p; + + timer = new QTimer(); + connect(timer, SIGNAL(timeout()), this, SLOT(updateStats())); + timer->start(5000); } int PortStatsModel::rowCount(const QModelIndex &parent) const @@ -32,9 +40,33 @@ int PortStatsModel::columnCount(const QModelIndex &parent ) const return numPorts.last(); } +void PortStatsModel::getDomainIndexes(const QModelIndex &index, + uint &portGroupIdx, uint &portIdx) const +{ + int portNum; + + // TODO(LOW): Optimize using binary search: see qLowerBound() + portNum = index.column() + 1; + for (portGroupIdx = 0; portGroupIdx < (uint) numPorts.size(); portGroupIdx++) + if (portNum <= numPorts.at(portGroupIdx)) + break; + + if (portGroupIdx) + { + if (numPorts.at(portGroupIdx -1)) + portIdx = (portNum - 1) % numPorts.at(portGroupIdx - 1); + else + portIdx = portNum - 1; + } + else + portIdx = portNum - 1; + + //qDebug("PSM: %d - %d, %d", index.column(), portGroupIdx, portIdx); +} + QVariant PortStatsModel::data(const QModelIndex &index, int role) const { - int pgidx, pidx, portNum; + uint pgidx, pidx; // Check for a valid index if (!index.isValid()) @@ -50,31 +82,44 @@ QVariant PortStatsModel::data(const QModelIndex &index, int role) const if (index.column() >= (numPorts.last())) return QVariant(); - // TODO(LOW): Optimize using binary search: see qLowerBound() - portNum = index.column() + 1; - for (pgidx = 0; pgidx < numPorts.size(); pgidx++) - if (portNum <= numPorts.at(pgidx)) - break; - - if (pgidx) - { - if (numPorts.at(pgidx -1)) - pidx = (portNum - 1) % numPorts.at(pgidx - 1); - else - pidx = portNum - 1; - } - else - pidx = portNum - 1; - - //qDebug("PSM: %d - %d, %d", index.column(), pgidx, pidx); + getDomainIndexes(index, pgidx, pidx); // Check role if (role == Qt::DisplayRole) { -#if 0 // PB - return pgl->mPortGroups.at(pgidx)->mPorts.at(pidx).mPortStats[index.row()]; -#endif - return 0; //FIXME: Get actual port stats + OstProto::PortStats stats; + + stats = pgl->mPortGroups.at(pgidx)->mPorts[pidx].getStats(); + + switch(index.row()) + { + case e_STAT_FRAMES_RCVD: + return stats.rx_pkts(); + + case e_STAT_FRAMES_SENT: + return stats.tx_pkts(); + + case e_STAT_FRAME_SEND_RATE: + return stats.tx_pps(); + + case e_STAT_FRAME_RECV_RATE: + return stats.rx_pps(); + + case e_STAT_BYTES_RCVD: + return stats.rx_bytes(); + + case e_STAT_BYTES_SENT: + return stats.tx_bytes(); + + case e_STAT_BYTE_SEND_RATE: + return stats.tx_bps(); + + case e_STAT_BYTE_RECV_RATE: + return stats.rx_bps(); + + default: + return 0; + } } else return QVariant(); @@ -92,6 +137,22 @@ QVariant PortStatsModel::headerData(int section, Qt::Orientation orientation, in return PortStatName.at(section); } +void PortStatsModel::portListFromIndex(QModelIndexList indices, + QList &portList) +{ + portList.clear(); + + for (int i = 0; i < indices.size(); i++) + { + //getDomainIndexes(indices.at(i), portGroupIdx, portIdx); + + for (int j = 0; j < portList.size(); j++) + { + // FIXME(HI): Incomplete!!!! + } + } +} + // // Slots // @@ -121,4 +182,16 @@ void PortStatsModel::on_portStatsUpdate(int port, void*stats) emit dataChanged(topLeft, bottomRight); } +void PortStatsModel::updateStats() +{ + // Request each portgroup to fetch updated stats - the port group + // raises a signal once updated stats are available + for (int i = 0; i < pgl->mPortGroups.size(); i++) + pgl->mPortGroups[i]->getPortStats(); +} +void PortStatsModel::when_portGroup_stats_update(quint32 portGroupId) +{ + // FIXME(MED): update only the changed ports, not all + reset(); +} diff --git a/client/portstatsmodel.h b/client/portstatsmodel.h index 6bf5c5a..6dc4693 100644 --- a/client/portstatsmodel.h +++ b/client/portstatsmodel.h @@ -33,9 +33,8 @@ class PortStatsModel : public QAbstractTableModel { Q_OBJECT - PortGroupList *pgl; - public: + PortStatsModel(PortGroupList *p, QObject *parent = 0); int rowCount(const QModelIndex &parent = QModelIndex()) const; @@ -44,16 +43,32 @@ class PortStatsModel : public QAbstractTableModel QVariant headerData(int section, Qt::Orientation orientation, int role = Qt::DisplayRole) const; + class PortGroupAndPortList { + uint portGroupId; + QList portList; + }; + void portListFromIndex(QModelIndexList indices, + QList &portList); + public slots: void when_portListChanged(); void on_portStatsUpdate(int port, void*stats); + void when_portGroup_stats_update(quint32 portGroupId); + + private slots: + void updateStats(); private: + PortGroupList *pgl; + // numPorts stores the num of ports per portgroup // in the same order as the portgroups are index in the pgl // Also it stores them as cumulative totals QList numPorts; + void getDomainIndexes(const QModelIndex &index, + uint &portGroupIdx, uint &portIdx) const; + }; #endif diff --git a/client/portstatswindow.cpp b/client/portstatswindow.cpp index 1339ebb..28b8f88 100644 --- a/client/portstatswindow.cpp +++ b/client/portstatswindow.cpp @@ -5,20 +5,71 @@ #include "QHeaderView" -//PortStatsWindow::PortStatsWindow(QWidget *parent) : QDialog (parent) PortStatsWindow::PortStatsWindow(PortGroupList *pgl, QWidget *parent) + : QWidget(parent) { setupUi(this); + this->pgl = pgl; model = pgl->getPortStatsModel(); tvPortStats->setModel(model); tvPortStats->horizontalHeader()->setMovable(true); + tvPortStats->verticalHeader()->resizeSections(QHeaderView::ResizeToContents); } PortStatsWindow::~PortStatsWindow() { } +/* ------------- SLOTS -------------- */ + +void PortStatsWindow::on_tbStartTransmit_clicked() +{ + // TODO(MED): get selected ports + + if (pgl->numPortGroups()) + { + QList portIdList; + + // FIXME(HI): Testing only!!! + portIdList.append(1); // MS Loopback adapter + pgl->portGroupByIndex(0).startTx(portIdList); + } +} + +void PortStatsWindow::on_tbStopTransmit_clicked() +{ + // TODO(MED) +} + +void PortStatsWindow::on_tbStartCapture_clicked() +{ + // TODO(MED) +} + +void PortStatsWindow::on_tbStopCapture_clicked() +{ + // TODO(MED) +} + +void PortStatsWindow::on_tbViewCapture_clicked() +{ + // TODO(MED) +} + +void PortStatsWindow::on_tbClear_clicked() +{ + // TODO(MED) +} + +void PortStatsWindow::on_tbClearAll_clicked() +{ + for (int i = 0; i < pgl->numPortGroups(); i++) + { + pgl->portGroupByIndex(0).clearPortStats(); + } +} + void PortStatsWindow::on_tbFilter_clicked() { bool ok; diff --git a/client/portstatswindow.h b/client/portstatswindow.h index e5a1a97..98642c1 100644 --- a/client/portstatswindow.h +++ b/client/portstatswindow.h @@ -6,17 +6,29 @@ #include "ui_portstatswindow.h" #include "portgrouplist.h" -class PortStatsWindow : public QDialog, public Ui::PortStatsWindow +class PortStatsWindow : public QWidget, public Ui::PortStatsWindow { Q_OBJECT public: PortStatsWindow(PortGroupList *pgl, QWidget *parent = 0); ~PortStatsWindow(); + private: + PortGroupList *pgl; QAbstractItemModel *model; private slots: + void on_tbStartTransmit_clicked(); + void on_tbStopTransmit_clicked(); + + void on_tbStartCapture_clicked(); + void on_tbStopCapture_clicked(); + void on_tbViewCapture_clicked(); + + void on_tbClear_clicked(); + void on_tbClearAll_clicked(); + void on_tbFilter_clicked(); }; diff --git a/common/protocol.proto b/common/protocol.proto index 9cb835f..7ffd54f 100644 --- a/common/protocol.proto +++ b/common/protocol.proto @@ -34,7 +34,7 @@ message Llc { message Snap { optional uint32 oui = 1; - optional uint32 type = 2; + //optional uint32 type = 2; } message Eth2 { @@ -126,7 +126,7 @@ message Udp { optional uint32 src_port = 3 [default = 8902]; optional uint32 dst_port = 4 [default = 80]; - optional uint32 totLen = 5; + optional uint32 totlen = 5; optional uint32 cksum = 6; } @@ -279,11 +279,21 @@ message CaptureBufferList { } message PortStats { - // TODO + required PortId port_id = 1; + + optional uint64 rx_pkts = 11; + optional uint64 rx_bytes = 12; + optional uint64 rx_pps = 13; + optional uint64 rx_bps = 14; + + optional uint64 tx_pkts = 21; + optional uint64 tx_bytes = 22; + optional uint64 tx_pps = 23; + optional uint64 tx_bps = 24; } message PortStatsList { - repeated PortStats list = 1; + repeated PortStats port_stats = 1; } service OstService { diff --git a/server/drone.pro b/server/drone.pro index df79396..79cbd1a 100644 --- a/server/drone.pro +++ b/server/drone.pro @@ -1,7 +1,7 @@ TEMPLATE = app CONFIG += qt QT += network -DEFINES += HAVE_REMOTE +DEFINES += HAVE_REMOTE WPCAP INCLUDEPATH += "c:\msys\1.0\local\include" INCLUDEPATH += "C:\DevelLibs\WpdPack\Include" INCLUDEPATH += "..\rpc" diff --git a/server/myservice.cpp b/server/myservice.cpp index de438e1..79db27b 100644 --- a/server/myservice.cpp +++ b/server/myservice.cpp @@ -1,7 +1,311 @@ #include "myservice.h" #include "qdebug.h" -#define LOG(...) {sprintf(logStr, __VA_ARGS__); host->Log(logStr);} +#include + +#define LOG(...) {sprintf(logStr, __VA_ARGS__); host->Log(logStr);} +#define MB (1024*1024) + +int StreamInfo::makePacket(uchar *buf, int bufMaxSize) +{ + int pktLen, len = 0; + uchar scratch[8]; + + // TODO(HI): use FrameLengthMode - don't assume fixed + pktLen = d.core().frame_len(); + if (bufMaxSize < pktLen) + return 0; + + // We always have a Mac Header! + // TODO(HI): use MacMode - don't assume fixed + qToBigEndian((quint64) d.mac().dst_mac(), scratch); + memcpy((buf + len), scratch + 2, 6); + len += 6; + qToBigEndian((quint64) d.mac().src_mac(), scratch); + memcpy((buf + len), &scratch + 2, 6); + len += 6; + + switch(d.core().ft()) + { + case OstProto::StreamCore::e_ft_none: + break; + case OstProto::StreamCore::e_ft_eth_2: + qToBigEndian((quint16) d.eth2().type(), buf+len); + len += 2; + break; + case OstProto::StreamCore::e_ft_802_3_raw: + qToBigEndian((quint16) pktLen, buf+len); + len += 2; + break; + case OstProto::StreamCore::e_ft_802_3_llc: + buf[len+0] = (quint8) d.llc().dsap(); + buf[len+1] = (quint8) d.llc().ssap(); + buf[len+2] = (quint8) d.llc().ctl(); + len +=3; + break; + case OstProto::StreamCore::e_ft_snap: + buf[len+0] = (quint8) d.llc().dsap(); + buf[len+1] = (quint8) d.llc().ssap(); + buf[len+2] = (quint8) d.llc().ctl(); + len +=3; + qToBigEndian((quint32) d.snap().oui(), scratch); + memcpy((buf + len), scratch + 2, 3); + len += 3; + qToBigEndian((quint16) d.eth2().type(), buf+len); + len += 2; + break; + default: + qWarning("Unhandled frame type %d\n", d.core().ft()); + } + + switch (d.core().l3_proto()) + { + case OstProto::StreamCore::e_l3_none: + break; + case OstProto::StreamCore::e_l3_ip: + buf[len+0] = (quint8) (d.ip().ver_hdrlen()); + buf[len+1] = (quint8) (d.ip().tos()); + len += 2; + qToBigEndian((quint16) d.ip().tot_len(), buf+len); + len += 2; + qToBigEndian((quint16) d.ip().id(), buf+len); + len += 2; + qToBigEndian((quint16) (( (d.ip().flags() & 0x3) << 13) | + (d.ip().frag_ofs() & 0x1FFF)), buf+len); + len += 2; + buf[len+0] = (quint8) (d.ip().ttl()); + buf[len+1] = (quint8) (d.ip().proto()); + len += 2; + qToBigEndian((quint16) d.ip().cksum(), buf+len); + len += 2; + // TODO(HI): Use IpMode - don't assume fixed + qToBigEndian((quint32) d.ip().src_ip(), buf+len); + len +=4; + qToBigEndian((quint32) d.ip().dst_ip(), buf+len); + len +=4; + break; + case OstProto::StreamCore::e_l3_arp: + // TODO(LOW) + break; + default: + qWarning("Unhandled l3 proto %d\n", d.core().l3_proto()); + } + + switch (d.core().l4_proto()) + { + case OstProto::StreamCore::e_l4_none: + break; + case OstProto::StreamCore::e_l4_tcp: + qToBigEndian((quint16) d.tcp().src_port(), buf+len); + len += 2; + qToBigEndian((quint16) d.tcp().dst_port(), buf+len); + len += 2; + qToBigEndian((quint16) d.tcp().seq_num(), buf+len); + len += 2; + qToBigEndian((quint16) d.tcp().ack_num(), buf+len); + len += 2; + buf[len+0] = (quint8) d.tcp().hdrlen_rsvd(); + buf[len+1] = (quint8) d.tcp().flags(); + len += 2; + qToBigEndian((quint16) d.tcp().window(), buf+len); + len +=2; + qToBigEndian((quint16) d.tcp().cksum(), buf+len); + len +=2; + qToBigEndian((quint16) d.tcp().urg_ptr(), buf+len); + len +=2; + break; + case OstProto::StreamCore::e_l4_udp: + qToBigEndian((quint16) d.udp().src_port(), buf+len); + len += 2; + qToBigEndian((quint16) d.udp().dst_port(), buf+len); + len += 2; + qToBigEndian((quint16) d.udp().totlen(), buf+len); + len +=2; + qToBigEndian((quint16) d.udp().cksum(), buf+len); + len +=2; + break; + case OstProto::StreamCore::e_l4_icmp: + // TODO(LOW) + break; + case OstProto::StreamCore::e_l4_igmp: + // TODO(LOW) + break; + default: + qWarning("Unhandled l4 proto %d\n", d.core().l4_proto()); + } + + // Fill-in the data pattern + { + int dataLen; + + dataLen = pktLen - len; + for (int i = 0; i < (dataLen/4)+1; i++) + { + // TODO(HI): Use patternMode + qToBigEndian((quint32) d.core().pattern(), buf+len+(i*4)); + } + } + + return pktLen; +} + +// +// ------------------ PortInfo -------------------- +// +PortInfo::PortInfo(uint id, pcap_if_t *dev) + : monitor(this) +{ + char errbuf[PCAP_ERRBUF_SIZE]; + + this->dev = dev; + devHandle = pcap_open(dev->name, 65536, PCAP_OPENFLAG_PROMISCUOUS , 1000, + NULL, errbuf); + if (devHandle == NULL) + { + qDebug("Error opening port %s: %s\n", + dev->name, pcap_geterr(devHandle)); + } + + /* By default, put the interface in statistics mode */ + if (pcap_setmode(devHandle, MODE_STAT)<0) + { + qDebug("Error setting statistics mode.\n"); + } + + d.mutable_port_id()->set_id(id); + d.set_name("eth"); // FIXME(MED): suffix portid + d.set_description(dev->description); + d.set_is_enabled(true); // FIXME(MED):check + d.set_is_oper_up(true); // FIXME(MED):check + d.set_is_exclusive_control(false); // FIXME(MED): check + + resetStats(); + + // We'll create sendqueue later when required + sendQueue = NULL; + isSendQueueDirty=true; + + // Start the monitor thread + monitor.start(); +} + +void PortInfo::update() +{ + uchar pktBuf[2000]; + pcap_pkthdr pktHdr; + + qDebug("In %s", __FUNCTION__); + + if (sendQueue) + pcap_sendqueue_destroy(sendQueue); + + // TODO(LOW): calculate sendqueue size + sendQueue = pcap_sendqueue_alloc(1*MB); + + for (int i = 0; i < streamList.size(); i++) + { + // FIXME(HI): Testing only + //if (streamList[i].d.core().is_enabled()) + { + pktHdr.len = streamList[i].makePacket(pktBuf, sizeof(pktBuf)); + pktHdr.caplen = pktHdr.len; + pktHdr.ts.tv_sec = pktHdr.ts.tv_usec = 0; // FIXME(HI) + + if (-1 == pcap_sendqueue_queue(sendQueue, &pktHdr, + (u_char*) pktBuf)) + { + qDebug("[port %d] sendqueue_queue() failed for streamidx %d\n", + id(), i); + } + } + } + + isSendQueueDirty = false; +} + +void PortInfo::startTransmit() +{ + int n; + + // TODO(HI): Stream Mode - one pass/continuous + n = pcap_sendqueue_transmit(devHandle, sendQueue, false); + if (n == 0) + qDebug("port %d: send error %s\n", id(), pcap_geterr(devHandle)); + else + qDebug("port %d: sent %d bytes\n", id(), n); +} + +void PortInfo::stopTransmit() +{ +} + +void PortInfo::resetStats() +{ + memset((void*) &stats, 0, sizeof(stats)); +} + +// +// ------------------ PortMonitor ------------------- +// + +PortInfo::PortMonitor::PortMonitor(PortInfo *port) +{ + this->port = port; +} + +void PortInfo::PortMonitor::callback(u_char *state, + const struct pcap_pkthdr *header, const u_char *pkt_data) +{ + PortInfo *port = (PortInfo*) state; + quint64 pkts; + quint64 bytes; + + pkts = *((quint64*)(pkt_data + 0)); + bytes = *((quint64*)(pkt_data + 8)); + + port->stats.rxPkts += pkts; + port->stats.rxBytes += bytes; + +#if 0 + for (int i=0; i < 16; i++) + { + qDebug("%02x ", pkt_data[i]); + } + qDebug("{%d: %llu, %llu}\n", port->id(), + pkts, bytes); + qDebug("[%d: pkts : %llu]\n", port->id(), port->stats.rxPkts); + qDebug("[%d: bytes: %llu]\n", port->id(), port->stats.rxBytes); +#endif +} + +void PortInfo::PortMonitor::run() +{ + int ret; + + qDebug("before pcap_loop\n"); + + /* Start the main loop */ + ret = pcap_loop(port->devHandle, -1, &PortInfo::PortMonitor::callback, + (PUCHAR) port); + //ret = pcap_loop(fp, -1, &updateStats, (PUCHAR)&st_ts); + + switch(ret) + { + case 0: + qDebug("Unexpected return from pcap_loop()\n"); + break; + case -1: + qDebug("Unsolicited (error) return from pcap_loop()\n"); + break; + case -2: + qDebug("Solicited return from pcap_loop()\n"); + break; + default: + qDebug("Unknown return value from pcap_loop()\n"); + } +} + +/*--------------- MyService ---------------*/ int MyService::getStreamIndex(unsigned int portIdx, unsigned int streamId) @@ -12,9 +316,9 @@ int MyService::getStreamIndex(unsigned int portIdx, Q_ASSERT(portIdx < numPorts); - for (i = 0; i < portInfo[portIdx].streamList.size(); i++) + for (i = 0; i < portInfo[portIdx]->streamList.size(); i++) { - if (streamId == portInfo[portIdx].streamList.at(i).d.stream_id().id()) + if (streamId == portInfo[portIdx]->streamList.at(i).d.stream_id().id()) goto _found; } @@ -43,34 +347,19 @@ MyService::MyService(AbstractHost *host) goto _fail; } - /* Count number of local ports */ - for(dev = alldevs; dev != NULL; dev = dev->next) + portInfo.clear(); + /* Count, Populate and Print the list */ + for(i=0, dev=alldevs; dev!=NULL; i++, dev=dev->next) + { + portInfo.append(new PortInfo(i, dev)); numPorts++; - portInfo = new PortInfo[numPorts]; - - /* Populate and Print the list */ - for(i=0, dev=alldevs; (i < numPorts) && (dev!=NULL); i++, dev=dev->next) - { - portInfo[i].setId(i); - portInfo[i].setPcapDev(dev); #if 1 LOG("%d. %s", i, dev->name); if (dev->description) { LOG(" (%s)\n", dev->description); } -#endif -#if 0 - // FIXME(HI): Testing only!!!! - { - StreamInfo s; - - s.d.mutable_stream_id()->set_id(0); - portInfo[i].streamList.append(s); - s.d.mutable_stream_id()->set_id(1); - portInfo[i].streamList.append(s); - } #endif } @@ -86,7 +375,6 @@ _fail: MyService::~MyService() { - delete portInfo; pcap_freealldevs(alldevs); } @@ -103,7 +391,7 @@ void MyService::getPortIdList( ::OstProto::PortId *p; p = response->add_port_id(); - p->set_id(portInfo[i].d.port_id().id()); + p->set_id(portInfo[i]->d.port_id().id()); } done->Run(); @@ -126,7 +414,7 @@ const ::OstProto::PortIdList* request, OstProto::Port *p; p = response->add_port(); - p->CopyFrom(portInfo[idx].d); + p->CopyFrom(portInfo[idx]->d); } } @@ -151,11 +439,11 @@ const ::OstProto::PortId* request, } response->mutable_port_id()->set_id(portIdx); - for (int j = 0; j < portInfo[portIdx].streamList.size(); j++) + for (int j = 0; j < portInfo[portIdx]->streamList.size(); j++) { OstProto::StreamId *s, *q; - q = portInfo[portIdx].streamList[j].d.mutable_stream_id(); + q = portInfo[portIdx]->streamList[j].d.mutable_stream_id(); s = response->add_stream_id(); s->CopyFrom(*q); @@ -192,7 +480,7 @@ const ::OstProto::StreamIdList* request, continue; // TODO(LOW): Partial status of RPC s = response->add_stream(); - s->CopyFrom(portInfo[portIdx].streamList[streamIndex].d); + s->CopyFrom(portInfo[portIdx]->streamList[streamIndex].d); } _exit: @@ -229,10 +517,11 @@ const ::OstProto::StreamIdList* request, // expected in a subsequent "modifyStream" request - set the stream id // now itself however!!! s.d.mutable_stream_id()->CopyFrom(request->stream_id(i)); - portInfo[portIdx].streamList.append(s); + portInfo[portIdx]->streamList.append(s); // TODO(LOW): fill-in response "Ack"???? } + portInfo[portIdx]->setDirty(true); _exit: done->Run(); } @@ -262,10 +551,11 @@ const ::OstProto::StreamIdList* request, if (streamIndex < 0) continue; // TODO(LOW): Partial status of RPC - portInfo[portIdx].streamList.removeAt(streamIndex); + portInfo[portIdx]->streamList.removeAt(streamIndex); // TODO(LOW): fill-in response "Ack"???? } + portInfo[portIdx]->setDirty(true); _exit: done->Run(); } @@ -295,11 +585,12 @@ const ::OstProto::StreamConfigList* request, if (streamIndex < 0) continue; // TODO(LOW): Partial status of RPC - portInfo[portIdx].streamList[streamIndex].d.MergeFrom( + portInfo[portIdx]->streamList[streamIndex].d.MergeFrom( request->stream(i)); // TODO(LOW): fill-in response "Ack"???? } + portInfo[portIdx]->setDirty(true); _exit: done->Run(); } @@ -310,7 +601,33 @@ const ::OstProto::PortIdList* request, ::google::protobuf::Closure* done) { qDebug("In %s", __PRETTY_FUNCTION__); - controller->SetFailed("Not Implemented"); + + // If any of the ports in the request are dirty, first update them + for (int i=0; i < request->port_id_size(); i++) + { + uint portIdx; + + portIdx = request->port_id(i).id(); + if (portIdx >= numPorts) + continue; // FIXME(MED): partial RPC? + + if (portInfo[portIdx]->isDirty()) + portInfo[portIdx]->update(); + } + + for (int i=0; i < request->port_id_size(); i++) + { + uint portIdx; + + portIdx = request->port_id(i).id(); + if (portIdx >= numPorts) + continue; // FIXME(MED): partial RPC? + + portInfo[portIdx]->startTransmit(); + } + + // TODO(LOW): fill-in response "Ack"???? + done->Run(); } @@ -320,7 +637,18 @@ const ::OstProto::PortIdList* request, ::google::protobuf::Closure* done) { qDebug("In %s", __PRETTY_FUNCTION__); - controller->SetFailed("Not Implemented"); + + for (int i=0; i < request->port_id_size(); i++) + { + uint portIdx; + + portIdx = request->port_id(i).id(); + if (portIdx >= numPorts) + continue; // FIXME(MED): partial RPC? + + portInfo[portIdx]->stopTransmit(); + } + // TODO(LOW): fill-in response "Ack"???? done->Run(); } @@ -360,7 +688,30 @@ const ::OstProto::PortIdList* request, ::google::protobuf::Closure* done) { qDebug("In %s", __PRETTY_FUNCTION__); - controller->SetFailed("Not Implemented"); + + for (int i=0; i < request->port_id_size(); i++) + { + uint portidx; + ::OstProto::PortStats *s; + + portidx = request->port_id(i).id(); + if (portidx >= numPorts) + continue; // FIXME(med): partial rpc? + + s = response->add_port_stats(); + s->mutable_port_id()->set_id(request->port_id(i).id()); + + s->set_rx_pkts(portInfo[portidx]->stats.rxPkts); + s->set_rx_bytes(portInfo[portidx]->stats.rxBytes); + s->set_rx_pps(portInfo[portidx]->stats.rxPps); + s->set_rx_bps(portInfo[portidx]->stats.rxBps); + + s->set_tx_pkts(portInfo[portidx]->stats.txPkts); + s->set_tx_bytes(portInfo[portidx]->stats.txBytes); + s->set_tx_pps(portInfo[portidx]->stats.txPps); + s->set_tx_bps(portInfo[portidx]->stats.txBps); + } + done->Run(); } @@ -370,7 +721,19 @@ const ::OstProto::PortIdList* request, ::google::protobuf::Closure* done) { qDebug("In %s", __PRETTY_FUNCTION__); - controller->SetFailed("Not Implemented"); + + for (int i=0; i < request->port_id_size(); i++) + { + uint portIdx; + + portIdx = request->port_id(i).id(); + if (portIdx >= numPorts) + continue; // FIXME(MED): partial RPC? + + portInfo[portIdx]->resetStats(); + } + // TODO(LOW): fill-in response "Ack"???? + done->Run(); } diff --git a/server/myservice.h b/server/myservice.h index 45577be..69e2f71 100644 --- a/server/myservice.h +++ b/server/myservice.h @@ -10,7 +10,9 @@ #include "../common/protocol.pb.h" #include "abstracthost.h" #include +#include #include +#include #include "../rpc/pbhelper.h" @@ -22,10 +24,12 @@ class MyService; class StreamInfo { friend class MyService; + friend class PortInfo; OstProto::Stream d; StreamInfo() { PbHelper pbh; pbh.ForceSetSingularDefault(&d); } + int StreamInfo::makePacket(uchar *buf, int bufMaxSize); }; @@ -33,34 +37,65 @@ class PortInfo { friend class MyService; + class PortMonitor: public QThread + { + friend class PortInfo; + + PortInfo *port; + public: + PortMonitor(PortInfo *port); + static void callback(u_char *state, + const struct pcap_pkthdr *header, const u_char *pkt_data); + void run(); + }; + OstProto::Port d; + pcap_if_t *dev; + pcap_t *devHandle; + pcap_send_queue *sendQueue; + bool isSendQueueDirty; + PortMonitor monitor; + + struct PortStats + { + quint64 rxPkts; + quint64 rxBytes; + quint64 rxPps; + quint64 rxBps; + + quint64 txPkts; + quint64 txBytes; + quint64 txPps; + quint64 txBps; + }; + + struct PortStats stats; /*! StreamInfo::d::stream_id and index into streamList[] are NOT same! */ QList streamList; public: - // TODO(LOW): Both setId and setPcapDev() should together form the ctor - void setId(unsigned int id) { d.mutable_port_id()->set_id(id); } - void setPcapDev(pcap_if_t *dev) - { - this->dev = dev; - d.set_name("eth"); // FIXME(MED): suffix portid - d.set_description(dev->description); - d.set_is_enabled(true); // FIXME(MED):check - d.set_is_oper_up(true); // FIXME(MED):check - d.set_is_exclusive_control(false); // FIXME(MED): check - } + PortInfo::PortInfo(uint id, pcap_if_t *dev); + uint id() { return d.port_id().id(); } + bool isDirty() { return isSendQueueDirty; } + void setDirty(bool dirty) { isSendQueueDirty = dirty; } + void update(); + void startTransmit(); + void stopTransmit(); + void resetStats(); }; + class MyService: public OstProto::OstService { AbstractHost *host; char logStr[1024]; uint numPorts; + /*! PortInfo::d::port_id and index into portInfo[] are same! */ - PortInfo *portInfo; + QList portInfo; pcap_if_t *alldevs; int getStreamIndex(unsigned int portIdx,unsigned int streamId);