diff --git a/rpc/pbrpcchannel.cpp b/rpc/pbrpcchannel.cpp index 9d5be05..f99c00b 100644 --- a/rpc/pbrpcchannel.cpp +++ b/rpc/pbrpcchannel.cpp @@ -290,6 +290,15 @@ void PbRpcChannel::on_mpSocket_connected() void PbRpcChannel::on_mpSocket_disconnected() { qDebug("In %s", __FUNCTION__); + + pendingMethodId = -1; + controller = NULL; + response = NULL; + isPending = false; + // \todo convert parsing from static to data member + //parsing = false + pendingCallList.clear(); + emit disconnected(); } diff --git a/rpc/pbrpccontroller.h b/rpc/pbrpccontroller.h index acc9520..916cb95 100644 --- a/rpc/pbrpccontroller.h +++ b/rpc/pbrpccontroller.h @@ -3,6 +3,8 @@ #include +class QIODevice; + class PbRpcController : public ::google::protobuf::RpcController { bool failed; diff --git a/server/abstractport.cpp b/server/abstractport.cpp new file mode 100644 index 0000000..8cbecd9 --- /dev/null +++ b/server/abstractport.cpp @@ -0,0 +1,214 @@ +#include "abstractport.h" + +#include +#include + +#include "../common/streambase.h" + +AbstractPort::AbstractPort(int id, const char *device) +{ + data_.mutable_port_id()->set_id(id); + data_.set_name(QString("if%1 ").arg(id).toStdString()); + + //! \todo (LOW) admin enable/disable of port + data_.set_is_enabled(true); + + //! \todo (HIGH) port exclusive control + data_.set_is_exclusive_control(false); + + isSendQueueDirty_ = true; + linkState_ = OstProto::LinkStateUnknown; + + memset((void*) &stats_, 0, sizeof(stats_)); + resetStats(); +} + +void AbstractPort::init() +{ +} + +AbstractPort::~AbstractPort() +{ +} + +StreamBase* AbstractPort::stream(int streamId) +{ + for (int i = 0; i < streamList_.size(); i++) + { + if (streamId == streamList_.at(i)->id()) + return streamList_.at(i); + } + + return NULL; +} + +bool AbstractPort::addStream(StreamBase *stream) +{ + streamList_.append(stream); + isSendQueueDirty_ = true; + return true; +} + +bool AbstractPort::deleteStream(int streamId) +{ + for (int i = 0; i < streamList_.size(); i++) + { + StreamBase *stream; + + if (streamId == streamList_.at(i)->id()) + { + stream = streamList_.takeAt(i); + delete stream; + + isSendQueueDirty_ = true; + return true; + } + } + + return false; +} + +void AbstractPort::updatePacketList() +{ + int len; + bool isVariable; + uchar pktBuf[2000]; + long sec; + long usec; + + qDebug("In %s", __FUNCTION__); + + clearPacketList(); + //returnToQIdx = -1; + + // First sort the streams by ordinalValue + qSort(streamList_); + + sec = 0; + usec = 0; + for (int i = 0; i < streamList_.size(); i++) + { +//_restart: + if (streamList_[i]->isEnabled()) + { + long numPackets, numBursts; + long ibg, ipg; + + switch (streamList_[i]->sendUnit()) + { + case OstProto::StreamControl::e_su_bursts: + numBursts = streamList_[i]->numBursts(); + numPackets = streamList_[i]->burstSize(); + ibg = 1000000/streamList_[i]->burstRate(); + ipg = 0; + break; + case OstProto::StreamControl::e_su_packets: + numBursts = 1; + numPackets = streamList_[i]->numPackets(); + ibg = 0; + ipg = 1000000/streamList_[i]->packetRate(); + break; + default: + qWarning("Unhandled stream control unit %d", + streamList_[i]->sendUnit()); + continue; + } + qDebug("numBursts = %ld, numPackets = %ld\n", + numBursts, numPackets); + qDebug("ibg = %ld, ipg = %ld\n", ibg, ipg); + + if (streamList_[i]->isFrameVariable()) + { + isVariable = true; + } + else + { + isVariable = false; + len = streamList_[i]->frameValue(pktBuf, sizeof(pktBuf), 0); + } + + for (int j = 0; j < numBursts; j++) + { + for (int k = 0; k < numPackets; k++) + { + if (isVariable) + { + len = streamList_[i]->frameValue(pktBuf, + sizeof(pktBuf), j * numPackets + k); + } + if (len <= 0) + continue; + + usec += ipg; + if (usec > 1000000) + { + sec++; + usec -= 1000000; + } + + qDebug("q(%d, %d, %d) sec = %lu usec = %lu", + i, j, k, sec, usec); + + appendToPacketList(sec, usec, pktBuf, len); + + } // for (numPackets) + + usec += ibg; + if (usec > 1000000) + { + sec++; + usec -= 1000000; + } + } // for (numBursts) + + switch(streamList_[i]->nextWhat()) + { + case ::OstProto::StreamControl::e_nw_stop: + goto _stop_no_more_pkts; + + case ::OstProto::StreamControl::e_nw_goto_id: + /*! \todo (MED): define and use + streamList_[i].d.control().goto_stream_id(); */ + + /*! \todo (MED): assumes goto Id is less than current!!!! + To support goto to any id, do + if goto_id > curr_id then + i = goto_id; + goto restart; + else + returnToQIdx = 0; + */ + + setPacketListLoopMode(true); + goto _stop_no_more_pkts; + + case ::OstProto::StreamControl::e_nw_goto_next: + break; + + default: + qFatal("---------- %s: Unhandled case (%d) -----------", + __FUNCTION__, streamList_[i]->nextWhat() ); + break; + } + + } // if (stream is enabled) + } // for (numStreams) + +_stop_no_more_pkts: + isSendQueueDirty_ = false; +} + +void AbstractPort::stats(PortStats *stats) +{ + stats->rxPkts = stats_.rxPkts - epochStats_.rxPkts; + stats->rxBytes = stats_.rxBytes - epochStats_.rxBytes; + stats->rxPps = stats_.rxPps; + stats->rxBps = stats_.rxBps; + + stats->txPkts = stats_.txPkts - epochStats_.txPkts; + stats->txBytes = stats_.txBytes - epochStats_.txBytes; + stats->txPps = stats_.txPps; + stats->txBps = stats_.txBps; +} + +/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */ diff --git a/server/abstractport.h b/server/abstractport.h new file mode 100644 index 0000000..6c0bcc3 --- /dev/null +++ b/server/abstractport.h @@ -0,0 +1,82 @@ +#ifndef _SERVER_ABSTRACT_PORT_H +#define _SERVER_ABSTRACT_PORT_H + +#include +#include + +#include "../common/protocol.pb.h" + +class StreamBase; +class QIODevice; + +class AbstractPort +{ +public: + struct PortStats + { + quint64 rxPkts; + quint64 rxBytes; + quint64 rxPps; + quint64 rxBps; + + quint64 txPkts; + quint64 txBytes; + quint64 txPps; + quint64 txBps; + }; + + AbstractPort(int id, const char *device); + virtual ~AbstractPort(); + + virtual void init(); + + int id() { return data_.port_id().id(); } + void protoDataCopyInto(OstProto::Port *port) { port->CopyFrom(data_); } + + int streamCount() { return streamList_.size(); } + StreamBase* stream(int streamId); + bool addStream(StreamBase *stream); + bool deleteStream(int streamId); + + bool isDirty() { return isSendQueueDirty_; } + void setDirty() { isSendQueueDirty_ = true; } + + virtual OstProto::LinkState linkState() { return linkState_; } + + virtual void clearPacketList() = 0; + virtual bool appendToPacketList(long sec, long usec, const uchar *packet, + int length) = 0; + virtual void setPacketListLoopMode(bool loop) = 0; + void updatePacketList(); + + virtual void startTransmit() = 0; + virtual void stopTransmit() = 0; + virtual bool isTransmitOn() = 0; + + virtual void startCapture() = 0; + virtual void stopCapture() = 0; + virtual bool isCaptureOn() = 0; + virtual QIODevice* captureData() = 0; + + void stats(PortStats *stats); + void resetStats() { epochStats_ = stats_; } + +protected: + OstProto::Port data_; + OstProto::LinkState linkState_; + + struct PortStats stats_; + //! \todo Need lock for stats access/update + +private: + bool isSendQueueDirty_; + /*! \note StreamBase::id() and index into streamList[] are NOT same! */ + QList streamList_; + + struct PortStats epochStats_; + +}; + +#endif + +/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */ diff --git a/server/drone.pro b/server/drone.pro index a5cd2bb..fe06b4e 100644 --- a/server/drone.pro +++ b/server/drone.pro @@ -3,18 +3,24 @@ CONFIG += qt debug QT += network script DEFINES += HAVE_REMOTE WPCAP INCLUDEPATH += "../rpc" -LIBS += -lprotobuf win32:LIBS += -lwpcap -lpacket unix:LIBS += -lpcap win32:LIBS += -L"../common/debug" -lostproto unix:LIBS += -L"../common" -lostproto win32:LIBS += -L"../rpc/debug" -lpbrpc unix:LIBS += -L"../rpc" -lpbrpc +LIBS += -lprotobuf POST_TARGETDEPS += "../common/debug/libostproto.a" "../rpc/debug/libpbrpc.a" RESOURCES += drone.qrc HEADERS += drone.h FORMS += drone.ui -SOURCES += drone_main.cpp drone.cpp +SOURCES += \ + drone_main.cpp \ + drone.cpp \ + portmanager.cpp \ + abstractport.cpp \ + pcapport.cpp \ + winpcapport.cpp SOURCES += myservice.cpp SOURCES += pcapextra.cpp diff --git a/server/drone.qrc b/server/drone.qrc new file mode 100644 index 0000000..a642656 --- /dev/null +++ b/server/drone.qrc @@ -0,0 +1,5 @@ + + + icons/portgroup.png + + diff --git a/server/myservice.cpp b/server/myservice.cpp index 1fa13af..e742fe6 100644 --- a/server/myservice.cpp +++ b/server/myservice.cpp @@ -1,1301 +1,423 @@ +#include "myservice.h" + +#if 0 #include #include #include "qdebug.h" -#include "myservice.h" #include "../common/protocollistiterator.h" #include "../common/abstractprotocol.h" +#endif +#include "../common/streambase.h" #include "../rpc/pbrpccontroller.h" - -#if 0 -#include -#include -#endif - -#define LOG(...) {} -#define MB (1024*1024) - -StreamInfo::StreamInfo() -{ -} - -StreamInfo::~StreamInfo() -{ -} - -// -// ------------------ PortInfo -------------------- -// -PortInfo::PortInfo(uint id, pcap_if_t *dev) - : monitorRx(this), monitorTx(this), transmitter(this), capturer(this) -{ - char errbuf[PCAP_ERRBUF_SIZE]; - - this->dev = dev; - -#ifdef Q_OS_WIN32 - adapter = PacketOpenAdapter(dev->name); - if (!adapter) - qFatal("Unable to open adapter %s", dev->name); - oidData = (PPACKET_OID_DATA) malloc(sizeof(PACKET_OID_DATA) + sizeof(uint)); - if (oidData) - { - memset(oidData, 0, sizeof(PACKET_OID_DATA) + sizeof(uint)); - oidData->Length=sizeof(uint); - } - else - qFatal("failed to alloc oidData"); -#endif - - /* - * Get 2 device handles - one for rx and one for tx. If we use only - * one handle for both rx and tx anythin that we tx using the single - * handle is not received back to us - */ - devHandleRx = pcap_open_live(dev->name, 0, PCAP_OPENFLAG_PROMISCUOUS , - 1000 /*ms*/, errbuf); - if (devHandleRx == NULL) - { - qDebug("Error opening port %s: %s\n", - dev->name, pcap_geterr(devHandleRx)); - } - -#if 0 - if (pcap_setdirection(devHandleRx, PCAP_D_IN)<0) - { - qDebug("[%s] Error setting direction inbound only\n", dev->name); - } -#endif - - /* By default, put the interface in statistics mode */ - if (pcap_setmode(devHandleRx, MODE_STAT)<0) - { - qDebug("Error setting statistics mode.\n"); - } - - devHandleTx = pcap_open_live(dev->name, 0, PCAP_OPENFLAG_PROMISCUOUS , - 1000 /*ms*/, errbuf); - if (devHandleTx == NULL) - { - qDebug("Error opening port %s: %s\n", - dev->name, pcap_geterr(devHandleTx)); - } - -#if 0 - if (pcap_setdirection(devHandleTx, PCAP_D_OUT)<0) - { - qDebug("[%s] Error setting direction outbound only\n", dev->name); - } -#endif - - /* By default, put the interface in statistics mode */ - if (pcap_setmode(devHandleTx, MODE_STAT)<0) - { - qDebug("Error setting statistics mode.\n"); - } - - d.mutable_port_id()->set_id(id); - -#ifdef Q_OS_WIN32 - d.set_name(QString("if%1 ").arg(id).toAscii().constData()); -#else - if (dev->name) - d.set_name(dev->name); - else - d.set_name(QString("if%1 ").arg(id).toAscii().constData()); -#endif - d.set_name(d.name()+"{"+ - pcap_datalink_val_to_name(pcap_datalink(devHandleRx))+"}"); - - if (dev->description) - d.set_description(dev->description); - d.set_is_enabled(true); //! \todo (LOW) admin enable/disable of port - d.set_is_exclusive_control(false); //! \todo (HIGH) port exclusive control - - memset((void*) &stats, 0, sizeof(stats)); - resetStats(); - - linkState = OstProto::LinkStateUnknown; - - // We'll create sendqueue later when required - sendQueueList.clear(); - returnToQIdx = -1; - pcapExtra.txPkts = 0; - pcapExtra.txBytes = 0; - isSendQueueDirty=true; - - // Start the monitor thread - monitorRx.start(); - monitorTx.start(); -} - -void PortInfo::updateLinkState() -{ -#ifdef Q_OS_WIN32 - OstProto::LinkState newLinkState - = OstProto::LinkStateUnknown; - - memset(oidData, 0, sizeof(PACKET_OID_DATA) + sizeof(uint)); - oidData->Oid = OID_GEN_MEDIA_CONNECT_STATUS; - oidData->Length = sizeof(uint); - if (PacketRequest(adapter, 0, oidData)) - { - uint state; - - if (oidData->Length == sizeof(state)) - { - memcpy((void*)&state, (void*)oidData->Data, oidData->Length); - if (state == 0) - newLinkState = OstProto::LinkStateUp; - else if (state == 1) - newLinkState = OstProto::LinkStateDown; - } - } - - linkState = newLinkState; -#elif defined(Q_OS_LINUX) - //! \todo (HI) implement link state for linux - get from /proc maybe? -#endif -} - -void PortInfo::update() -{ - int len; - bool isVariable; - uchar pktBuf[2000]; - pcap_pkthdr pktHdr; - ost_pcap_send_queue sendQ; - - qDebug("In %s", __FUNCTION__); - - if (sendQueueList.size()) - { - foreach(sendQ, sendQueueList) - pcap_sendqueue_destroy(sendQ.sendQueue); - } - sendQueueList.clear(); - returnToQIdx = -1; - - //! \todo (LOW): calculate sendqueue size - sendQ.sendQueue = pcap_sendqueue_alloc(1*MB); - sendQ.sendQueueCumLen.clear(); - - // First sort the streams by ordinalValue - qSort(streamList); - - pktHdr.ts.tv_sec = 0; - pktHdr.ts.tv_usec = 0; - for (int i = 0; i < streamList.size(); i++) - { -//_restart: - if (streamList[i]->isEnabled()) - { - long numPackets, numBursts; - long ibg, ipg; - - switch (streamList[i]->sendUnit()) - { - case OstProto::StreamControl::e_su_bursts: - numBursts = streamList[i]->numBursts(); - numPackets = streamList[i]->burstSize(); - ibg = 1000000/streamList[i]->burstRate(); - ipg = 0; - break; - case OstProto::StreamControl::e_su_packets: - numBursts = 1; - numPackets = streamList[i]->numPackets(); - ibg = 0; - ipg = 1000000/streamList[i]->packetRate(); - break; - default: - qWarning("Unhandled stream control unit %d", - streamList[i]->sendUnit()); - continue; - } - qDebug("numBursts = %ld, numPackets = %ld\n", - numBursts, numPackets); - qDebug("ibg = %ld, ipg = %ld\n", ibg, ipg); - - if (streamList[i]->isFrameVariable()) - { - isVariable = true; - } - else - { - isVariable = false; - len = streamList[i]->frameValue(pktBuf, sizeof(pktBuf), 0); - } - - for (int j = 0; j < numBursts; j++) - { - for (int k = 0; k < numPackets; k++) - { - if (isVariable) - { - len = streamList[i]->frameValue(pktBuf, sizeof(pktBuf), - j * numPackets + k); - } - if (len > 0) - { - pktHdr.caplen = pktHdr.len = len; - pktHdr.ts.tv_usec += ipg; - if (pktHdr.ts.tv_usec > 1000000) - { - pktHdr.ts.tv_sec++; - pktHdr.ts.tv_usec -= 1000000; - } - - // Not enough space? Alloc another one! - if ((sendQ.sendQueue->len + len + sizeof(pcap_pkthdr)) - > sendQ.sendQueue->maxlen) - { - sendQueueList.append(sendQ); - - //! \todo (LOW): calculate sendqueue size - sendQ.sendQueue = pcap_sendqueue_alloc(1*MB); - sendQ.sendQueueCumLen.clear(); - -#if 0 - pktHdr.ts.tv_sec = 0; - pktHdr.ts.tv_usec = 0; -#endif - } - - qDebug("q(%d, %d, %d) sec = %lu usec = %lu", - i, j, k, pktHdr.ts.tv_sec, pktHdr.ts.tv_usec); - - if (-1 == pcap_sendqueue_queue(sendQ.sendQueue, &pktHdr, - (u_char*) pktBuf)) - { - qDebug("[port %d] sendqueue_queue() failed for " - "streamidx %d\n", id(), i); - } - else - sendQ.sendQueueCumLen.append(sendQ.sendQueue->len); - } - } // for (numPackets) - pktHdr.ts.tv_usec += ibg; - if (pktHdr.ts.tv_usec > 1000000) - { - pktHdr.ts.tv_sec++; - pktHdr.ts.tv_usec -= 1000000; - } - } // for (numBursts) - - switch(streamList[i]->nextWhat()) - { - case ::OstProto::StreamControl::e_nw_stop: - goto _stop_no_more_pkts; - - case ::OstProto::StreamControl::e_nw_goto_id: - /*! \todo (MED): define and use - streamList[i].d.control().goto_stream_id(); */ - - /*! \todo (MED): assumes goto Id is less than current!!!! - To support goto to any id, do - if goto_id > curr_id then - i = goto_id; - goto restart; - else - returnToQIdx = 0; - */ - - returnToQIdx=0; - goto _stop_no_more_pkts; - - case ::OstProto::StreamControl::e_nw_goto_next: - break; - - default: - qFatal("---------- %s: Unhandled case (%d) -----------", - __FUNCTION__, streamList[i]->nextWhat() ); - break; - } - - } // if (stream is enabled) - } // for (numStreams) - -_stop_no_more_pkts: - // The last alloc'ed sendQ appended here - sendQueueList.append(sendQ); - - isSendQueueDirty = false; -} - -void PortInfo::startTransmit() -{ - transmitter.start(); -} - -void PortInfo::stopTransmit() -{ - transmitter.stop(); -} - -void PortInfo::startCapture() -{ - capturer.start(); -} - -void PortInfo::stopCapture() -{ - capturer.stop(); -} - -QFile* PortInfo::captureFile() -{ - return capturer.captureFile(); -} - -void PortInfo::resetStats() -{ - memcpy((void*) &epochStats, (void*) &stats, sizeof(stats)); -} - -// -// ------------------ PortMonitor ------------------- -// - -PortInfo::PortMonitorRx::PortMonitorRx(PortInfo *port) -{ - this->port = port; -#ifdef Q_OS_WIN32 - { - int sz = sizeof(PACKET_OID_DATA) + sizeof(quint64) + 4; - //oidData = GlobalAllocPtr(GMEM_MOVEABLE | GMEM_ZEROINIT, - //sizeof(PACKET_OID_DATA) + sizeof(quint64) - 1); - oidData = (PPACKET_OID_DATA) malloc(sz); - if (oidData) - { - memset(oidData, 0, sz); - oidData->Length=sizeof(quint64); - } - else - qFatal("failed to alloc oidData"); - } -#endif -} - -PortInfo::PortMonitorTx::PortMonitorTx(PortInfo *port) -{ - this->port = port; -#ifdef Q_OS_WIN32 - { - int sz = sizeof(PACKET_OID_DATA) + sizeof(quint64) + 4; - //oidData = GlobalAllocPtr(GMEM_MOVEABLE | GMEM_ZEROINIT, - //sizeof(PACKET_OID_DATA) + sizeof(quint64) - 1); - oidData = (PPACKET_OID_DATA) malloc(sz); - if (oidData) - { - memset(oidData, 0, sz); - oidData->Length=sizeof(quint64); - } - else - qFatal("failed to alloc oidData"); - } -#endif -} - -#ifdef Q_OS_WIN32 -void PortInfo::PortMonitorRx::callbackRx(u_char *state, - const struct pcap_pkthdr *header, const u_char *pkt_data) -{ - // This is the WinPcap Callback - which is a 'stats mode' callback - - uint usec; - PortInfo *port = (PortInfo*) state; - - quint64 pkts; - quint64 bytes; - - // Update RxStats and RxRates using PCAP data - pkts = *((quint64*)(pkt_data + 0)); - bytes = *((quint64*)(pkt_data + 8)); - -#if 0 - if (port->id() == 2) - qDebug("# %llu", pkts); -#endif - - // Note: PCAP reported bytes includes ETH_FRAME_HDR_SIZE - adjust for it - bytes -= pkts * ETH_FRAME_HDR_SIZE; - - usec = (header->ts.tv_sec - port->lastTsRx.tv_sec) * 1000000 + - (header->ts.tv_usec - port->lastTsRx.tv_usec); - port->stats.rxPps = (pkts * 1000000) / usec; - port->stats.rxBps = (bytes * 1000000) / usec; - - port->stats.rxPkts += pkts; - port->stats.rxBytes += bytes; - - // Store curr timestamp as last timestamp - port->lastTsRx.tv_sec = header->ts.tv_sec; - port->lastTsRx.tv_usec = header->ts.tv_usec; - -#if 0 - for (int i=0; i < 16; i++) - { - qDebug("%02x ", pkt_data[i]); - } - qDebug("{%d: %llu, %llu}\n", port->id(), - pkts, bytes); - qDebug("[%d: pkts : %llu]\n", port->id(), port->stats.rxPkts); - qDebug("[%d: bytes: %llu]\n", port->id(), port->stats.rxBytes); -#endif - - // Retreive NIC stats -#if 0 - port->monitorRx.oidData->Oid = OID_GEN_RCV_OK; - if (PacketRequest(port->devHandleRx->adapter, 0, port->monitorRx.oidData)) - { - if (port->monitorRx.oidData->Length <= sizeof(port->stats.rxPktsNic)) - memcpy((void*)&port->stats.rxPktsNic, - (void*)port->monitorRx.oidData->Data, - port->monitorRx.oidData->Length); - } -#endif -} -void PortInfo::PortMonitorTx::callbackTx(u_char *state, - const struct pcap_pkthdr *header, const u_char *pkt_data) -{ - // This is the WinPcap Callback - which is a 'stats mode' callback - - uint usec; - PortInfo *port = (PortInfo*) state; - - quint64 pkts; - quint64 bytes; - - -#if 0 - // Update RxStats and RxRates using PCAP data - pkts = *((quint64*)(pkt_data + 0)); - bytes = *((quint64*)(pkt_data + 8)); - -#if 0 - if (port->id() == 2) - qDebug("@ %llu", pkts); -#endif - - // Note: PCAP reported bytes includes ETH_FRAME_HDR_SIZE - adjust for it - bytes -= pkts * ETH_FRAME_HDR_SIZE; - - usec = (header->ts.tv_sec - port->lastTsTx.tv_sec) * 1000000 + - (header->ts.tv_usec - port->lastTsTx.tv_usec); - port->stats.txPps = (pkts * 1000000) / usec; - port->stats.txBps = (bytes * 1000000) / usec; - - port->stats.txPkts += pkts; - port->stats.txBytes += bytes; -#endif - - // Since WinPCAP (due to NDIS limitation) cannot distinguish between - // rx/tx packets, pcap stats are not of much use - for the tx stats - // update from PcapExtra - - pkts = port->pcapExtra.txPkts - port->stats.txPkts; - bytes = port->pcapExtra.txBytes - port->stats.txBytes; - - // Use the pcap timestamp for rate calculation though - usec = (header->ts.tv_sec - port->lastTsTx.tv_sec) * 1000000 + - (header->ts.tv_usec - port->lastTsTx.tv_usec); - port->stats.txPps = (pkts * 1000000) / usec; - port->stats.txBps = (bytes * 1000000) / usec; - - port->stats.txPkts = port->pcapExtra.txPkts; - port->stats.txBytes = port->pcapExtra.txBytes; - - // Store curr timestamp as last timestamp - port->lastTsTx.tv_sec = header->ts.tv_sec; - port->lastTsTx.tv_usec = header->ts.tv_usec; - -#if 0 - for (int i=0; i < 16; i++) - { - qDebug("%02x ", pkt_data[i]); - } - qDebug("{%d: %llu, %llu}\n", port->id(), - pkts, bytes); - qDebug("[%d: pkts : %llu]\n", port->id(), port->stats.rxPkts); - qDebug("[%d: bytes: %llu]\n", port->id(), port->stats.rxBytes); -#endif - - // Retreive NIC stats -#if 0 - port->monitorTx.oidData->Oid = OID_GEN_XMIT_OK; - if (PacketRequest(port->devHandleTx->adapter, 0, port->monitorTx.oidData)) - { - if (port->monitorTx.oidData->Length <= sizeof(port->stats.txPktsNic)) - memcpy((void*)&port->stats.txPktsNic, - (void*)port->monitorTx.oidData->Data, - port->monitorTx.oidData->Length); - } -#endif -} -#else -void PortInfo::PortMonitorRx::callbackRx(u_char *state, - const struct pcap_pkthdr *header, const u_char *pkt_data) -{ - // This is the LibPcap Callback - which is a 'capture mode' callback - // This callback is called once for EVERY packet - - uint usec; - PortInfo *port = (PortInfo*) state; - - quint64 pkts; - quint64 bytes; - - // Update RxStats and RxRates using PCAP data - usec = (header->ts.tv_sec - port->lastTsRx.tv_sec) * 1000000 + - (header->ts.tv_usec - port->lastTsRx.tv_usec); - //! \todo support Rx Pkt/Bit rate on Linux (libpcap callback) -#if 0 - port->stats.rxPps = (pkts * 1000000) / usec; - port->stats.rxBps = (bytes * 1000000) / usec; -#endif - - // Note: For a 'capture callback' PCAP reported bytes DOES NOT include - // ETH_FRAME_HDR_SIZE - so don't adjust for it - port->stats.rxPkts++; - port->stats.rxBytes += header->len; - - // Store curr timestamp as last timestamp - port->lastTsRx.tv_sec = header->ts.tv_sec; - port->lastTsRx.tv_usec = header->ts.tv_usec; -} - -void PortInfo::PortMonitorTx::callbackTx(u_char *state, - const struct pcap_pkthdr *header, const u_char *pkt_data) -{ - // This is the LibPcap Callback - which is a 'capture mode' callback - // This callback is called once for EVERY packet - - uint usec; - PortInfo *port = (PortInfo*) state; - - quint64 pkts; - quint64 bytes; - - // Update TxStats and TxRates using PCAP data - usec = (header->ts.tv_sec - port->lastTsTx.tv_sec) * 1000000 + - (header->ts.tv_usec - port->lastTsTx.tv_usec); - //! \todo support Tx Pkt/Bit rate on Linux (libpcap callback) -#if 0 - port->stats.txPps = (pkts * 1000000) / usec; - port->stats.txBps = (bytes * 1000000) / usec; -#endif - - // Note: For a 'capture callback' PCAP reported bytes DOES NOT include - // ETH_FRAME_HDR_SIZE - so don't adjust for it - - port->stats.txPkts++; - port->stats.txBytes += header->len; - - // Store curr timestamp as last timestamp - port->lastTsTx.tv_sec = header->ts.tv_sec; - port->lastTsTx.tv_usec = header->ts.tv_usec; -} -#endif -void PortInfo::PortMonitorRx::run() -{ - int ret; - - qDebug("before pcap_loop rx \n"); -#if 1 - /* Start the main loop */ - ret = pcap_loop(port->devHandleRx, -1, - &PortInfo::PortMonitorRx::callbackRx, (u_char*) port); - - switch(ret) - { - case 0: - qDebug("Unexpected return from pcap_loop()\n"); - break; - case -1: - qDebug("Unsolicited (error) return from pcap_loop()\n"); - break; - case -2: - qDebug("Solicited return from pcap_loop()\n"); - break; - default: - qDebug("Unknown return value from pcap_loop()\n"); - } -#else - while (1) - { - /* Start the main loop */ - ret = pcap_dispatch(port->devHandleRx, -1, - &PortInfo::PortMonitorRx::callbackRx, (u_char*) port); - - switch(ret) - { - case -1: - qDebug("Unsolicited (error) return from pcap_loop() %s\n", - pcap_geterr(port->devHandleRx)); - break; - case -2: - qDebug("Solicited return from pcap_loop()\n"); - break; - default: - //qDebug("%d pkts rcvd\n", ret); - break; - } - } -#endif -} - -void PortInfo::PortMonitorTx::run() -{ - int ret; - - qDebug("before pcap_loopTx\n"); -#if 1 - /* Start the main loop */ - ret = pcap_loop(port->devHandleTx, -1, - &PortInfo::PortMonitorTx::callbackTx, (u_char*) port); - - switch(ret) - { - case 0: - qDebug("Unexpected return from pcap_loop()\n"); - break; - case -1: - qDebug("Unsolicited (error) return from pcap_loop()\n"); - break; - case -2: - qDebug("Solicited return from pcap_loop()\n"); - break; - default: - qDebug("Unknown return value from pcap_loop()\n"); - } -#else - while (1) - { - /* Start the main loop */ - ret = pcap_dispatch(port->devHandleTx, -1, - &PortInfo::PortMonitorTx::callbackTx, (u_char*) port); - - switch(ret) - { - case -1: - qDebug("Unsolicited (error) return from pcap_loop() %s\n", - pcap_geterr(port->devHandleTx)); - break; - case -2: - qDebug("Solicited return from pcap_loop()\n"); - break; - default: - //qDebug("%d pkts rcvd\n", ret); - break; - } - } -#endif -} - -/*--------------- PortTransmitter ---------------*/ - -PortInfo::PortTransmitter::PortTransmitter(PortInfo *port) -{ - this->port = port; -} - -void PortInfo::PortTransmitter::run() -{ - //! \todo (MED) Stream Mode - continuous: define before implement - - // NOTE1: We can't use pcap_sendqueue_transmit() directly even on Win32 - // 'coz of 2 reasons - there's no way of stopping it before all packets - // in the sendQueue are sent out and secondly, stats are available only - // when all packets have been sent - no periodic updates - // - // NOTE2: Transmit on the Rx Handle so that we can receive it back - // on the Tx Handle to do stats - // - // NOTE3: Update pcapExtra counters - port TxStats will be updated in the - // 'stats callback' function so that both Rx and Tx stats are updated - // together - - m_stop = 0; - ost_pcap_sendqueue_list_transmit(port->devHandleRx, port->sendQueueList, - port->returnToQIdx, true, &m_stop, - &port->pcapExtra.txPkts, &port->pcapExtra.txBytes, - QThread::usleep); - m_stop = 0; -} - -void PortInfo::PortTransmitter::stop() -{ - m_stop = 1; -} - -/*--------------- PortCapture ---------------*/ - -PortInfo::PortCapture::PortCapture(PortInfo *port) -{ - this->port = port; - capHandle = NULL; - dumpHandle = NULL; -} - -PortInfo::PortCapture::~PortCapture() -{ -} - -void PortInfo::PortCapture::run() -{ - int ret; - char errbuf[PCAP_ERRBUF_SIZE]; - - capHandle = pcap_open_live(port->dev->name, 65535, - PCAP_OPENFLAG_PROMISCUOUS, 1000 /* ms */, errbuf); - if (capHandle == NULL) - { - qDebug("Error opening port %s: %s\n", - port->dev->name, pcap_geterr(capHandle)); - return; - } - - if (!capFile.isOpen()) - { - if (!capFile.open()) - qFatal("Unable to open temp cap file"); - } - - qDebug("cap file = %s", capFile.fileName().toAscii().constData()); - dumpHandle = pcap_dump_open(capHandle, - capFile.fileName().toAscii().constData()); - - m_stop = 0; - while (m_stop == 0) - { - struct pcap_pkthdr *hdr; - const uchar *data; - - ret = pcap_next_ex(capHandle, &hdr, &data); - switch (ret) - { - case 1: - pcap_dump((uchar*) dumpHandle, hdr, data); - case 0: - continue; - case -1: - qWarning("%s: error reading packet (%d): %s", - __PRETTY_FUNCTION__, ret, pcap_geterr(capHandle)); - break; - case -2: - default: - qFatal("%s: Unexpected return value %d", __PRETTY_FUNCTION__, ret); - } - } - m_stop = 0; - pcap_dump_close(dumpHandle); - pcap_close(capHandle); - dumpHandle = NULL; - capHandle = NULL; -} - -void PortInfo::PortCapture::stop() -{ - m_stop = 1; -} - -QFile* PortInfo::PortCapture::captureFile() -{ - return &capFile; -} - - -/*--------------- MyService ---------------*/ - -int MyService::getStreamIndex(unsigned int portIdx, - unsigned int streamId) -{ - int i; - - // note: index and id are interchageable for port but not for stream - - Q_ASSERT(portIdx < numPorts); - - for (i = 0; i < portInfo[portIdx]->streamList.size(); i++) - { - if (streamId == portInfo[portIdx]->streamList.at(i)->mStreamId.id()) - goto _found; - } - - qDebug("%s: stream id %d not found", __PRETTY_FUNCTION__, streamId); - return -1; - -_found: - return i; -} +#include "portmanager.h" MyService::MyService() { - pcap_if_t *dev; - int i=0; - char errbuf[PCAP_ERRBUF_SIZE]; + PortManager *portManager = PortManager::instance(); + int n = portManager->portCount(); - // Init Data - numPorts = 0; - alldevs = NULL; - - LOG("Retrieving the device list from the local machine\n"); - if (pcap_findalldevs(&alldevs, errbuf) == -1) - { - LOG("Error in pcap_findalldevs_ex: %s\n", errbuf); - goto _fail; - } - - portInfo.clear(); - /* Count, Populate and Print the list */ - for(i=0, dev=alldevs; dev!=NULL; i++, dev=dev->next) - { - portInfo.append(new PortInfo(i, dev)); - numPorts++; - -#if 1 - LOG("%d. %s", i, dev->name); - if (dev->description) - { - LOG(" (%s)\n", dev->description); - } -#endif - } - - if (i == 0) - { - LOG("\nNo interfaces found! Make sure WinPcap is installed.\n"); - goto _fail; - } - -_fail: - return; + for (int i = 0; i < n; i++) + portInfo.append(portManager->port(i)); } MyService::~MyService() { - pcap_freealldevs(alldevs); } -void MyService::getPortIdList( - ::google::protobuf::RpcController* controller, - const ::OstProto::Void* request, - ::OstProto::PortIdList* response, - ::google::protobuf::Closure* done) +void MyService::getPortIdList(::google::protobuf::RpcController* controller, + const ::OstProto::Void* request, + ::OstProto::PortIdList* response, + ::google::protobuf::Closure* done) { - qDebug("In %s", __PRETTY_FUNCTION__); + qDebug("In %s", __PRETTY_FUNCTION__); - for (uint i = 0; i < numPorts; i++) - { - ::OstProto::PortId *p; + for (int i = 0; i < portInfo.size(); i++) + { + ::OstProto::PortId *p; - p = response->add_port_id(); - p->set_id(portInfo[i]->d.port_id().id()); - } + p = response->add_port_id(); + p->set_id(portInfo[i]->id()); + } - done->Run(); + done->Run(); } void MyService::getPortConfig(::google::protobuf::RpcController* controller, -const ::OstProto::PortIdList* request, -::OstProto::PortConfigList* response, -::google::protobuf::Closure* done) + const ::OstProto::PortIdList* request, + ::OstProto::PortConfigList* response, + ::google::protobuf::Closure* done) { - qDebug("In %s", __PRETTY_FUNCTION__); + qDebug("In %s", __PRETTY_FUNCTION__); - for (int i=0; i < request->port_id_size(); i++) - { - unsigned int idx; + for (int i = 0; i < request->port_id_size(); i++) + { + int id; - idx = request->port_id(i).id(); - if (idx < numPorts) - { - OstProto::Port *p; + id = request->port_id(i).id(); + if (id < portInfo.size()) + { + OstProto::Port *p; - p = response->add_port(); - p->CopyFrom(portInfo[idx]->d); - } - } + p = response->add_port(); + portInfo[id]->protoDataCopyInto(p); + } + } - done->Run(); + done->Run(); } void MyService::getStreamIdList(::google::protobuf::RpcController* controller, -const ::OstProto::PortId* request, -::OstProto::StreamIdList* response, -::google::protobuf::Closure* done) + const ::OstProto::PortId* request, + ::OstProto::StreamIdList* response, + ::google::protobuf::Closure* done) { - unsigned int portIdx; + int portId; - qDebug("In %s", __PRETTY_FUNCTION__); + qDebug("In %s", __PRETTY_FUNCTION__); - portIdx = request->id(); - if (portIdx >= numPorts) - { - qDebug("%s: Invalid port id %d", __PRETTY_FUNCTION__, portIdx); - controller->SetFailed("Invalid Port Id"); - goto _exit; //! \todo (LOW): Partial status of RPC - } + portId = request->id(); + if ((portId < 0) || (portId >= portInfo.size())) + goto _invalid_port; - response->mutable_port_id()->set_id(portIdx); - for (int j = 0; j < portInfo[portIdx]->streamList.size(); j++) - { - OstProto::StreamId *s; + response->mutable_port_id()->set_id(portId); + for (int i = 0; i < portInfo[portId]->streamCount(); i++) + { + OstProto::StreamId *s; - s = response->add_stream_id(); - s->CopyFrom(portInfo[portIdx]->streamList[j]->mStreamId); - } + s = response->add_stream_id(); + s->set_id(portInfo[portId]->stream(i)->id()); + } + done->Run(); + return; -_exit: - done->Run(); +_invalid_port: + controller->SetFailed("Invalid Port Id"); + done->Run(); } void MyService::getStreamConfig(::google::protobuf::RpcController* controller, -const ::OstProto::StreamIdList* request, -::OstProto::StreamConfigList* response, -::google::protobuf::Closure* done) + const ::OstProto::StreamIdList* request, + ::OstProto::StreamConfigList* response, + ::google::protobuf::Closure* done) { - unsigned int portIdx; + int portId; - qDebug("In %s", __PRETTY_FUNCTION__); + qDebug("In %s", __PRETTY_FUNCTION__); - portIdx = request->port_id().id(); - if (portIdx >= numPorts) - { - controller->SetFailed("invalid portid"); - goto _exit; - } + portId = request->port_id().id(); + if ((portId < 0) || (portId >= portInfo.size())) + goto _invalid_port; - response->mutable_port_id()->set_id(portIdx); - for (int i = 0; i < request->stream_id_size(); i++) - { - int streamIndex; - OstProto::Stream *s; + response->mutable_port_id()->set_id(portId); + for (int i = 0; i < request->stream_id_size(); i++) + { + StreamBase *stream; + OstProto::Stream *s; - streamIndex = getStreamIndex(portIdx, request->stream_id(i).id()); - if (streamIndex < 0) - continue; //! \todo(LOW): Partial status of RPC + stream = portInfo[portId]->stream(request->stream_id(i).id()); + if (!stream) + continue; //! \todo(LOW): Partial status of RPC - s = response->add_stream(); + s = response->add_stream(); + stream->protoDataCopyInto(*s); + } + done->Run(); + return; - portInfo[portIdx]->streamList[streamIndex]->protoDataCopyInto(*s); - } - -_exit: - done->Run(); +_invalid_port: + controller->SetFailed("invalid portid"); + done->Run(); } void MyService::addStream(::google::protobuf::RpcController* controller, -const ::OstProto::StreamIdList* request, -::OstProto::Ack* response, -::google::protobuf::Closure* done) + const ::OstProto::StreamIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done) { - unsigned int portIdx; + int portId; - qDebug("In %s", __PRETTY_FUNCTION__); + qDebug("In %s", __PRETTY_FUNCTION__); - portIdx = request->port_id().id(); - if (portIdx >= numPorts) - { - controller->SetFailed("invalid portid"); - goto _exit; - } + portId = request->port_id().id(); + if ((portId < 0) || (portId >= portInfo.size())) + goto _invalid_port; - for (int i = 0; i < request->stream_id_size(); i++) - { - int streamIndex; - StreamInfo *s = new StreamInfo; + for (int i = 0; i < request->stream_id_size(); i++) + { + StreamBase *stream; - // If stream with same id as in request exists already ==> error!! - streamIndex = getStreamIndex(portIdx, request->stream_id(i).id()); - if (streamIndex >= 0) - continue; //! \todo (LOW): Partial status of RPC + // If stream with same id as in request exists already ==> error!! + stream = portInfo[portId]->stream(request->stream_id(i).id()); + if (stream) + continue; //! \todo (LOW): Partial status of RPC - // Append a new "default" stream - actual contents of the new stream is - // expected in a subsequent "modifyStream" request - set the stream id - // now itself however!!! - s->mStreamId.CopyFrom(request->stream_id(i)); - portInfo[portIdx]->streamList.append(s); + // Append a new "default" stream - actual contents of the new stream is + // expected in a subsequent "modifyStream" request - set the stream id + // now itself however!!! + stream = new StreamBase; + stream->setId(request->stream_id(i).id()); + portInfo[portId]->addStream(stream); - //! \todo (LOW): fill-in response "Ack"???? - } - portInfo[portIdx]->setDirty(true); -_exit: - done->Run(); + } + + //! \todo (LOW): fill-in response "Ack"???? + + done->Run(); + return; + +_invalid_port: + controller->SetFailed("invalid portid"); + done->Run(); } void MyService::deleteStream(::google::protobuf::RpcController* controller, -const ::OstProto::StreamIdList* request, -::OstProto::Ack* response, -::google::protobuf::Closure* done) + const ::OstProto::StreamIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done) { - unsigned int portIdx; + int portId; - qDebug("In %s", __PRETTY_FUNCTION__); + qDebug("In %s", __PRETTY_FUNCTION__); - portIdx = request->port_id().id(); - if (portIdx >= numPorts) - { - controller->SetFailed("invalid portid"); - goto _exit; - } + portId = request->port_id().id(); + if ((portId < 0) || (portId >= portInfo.size())) + goto _invalid_port; - for (int i = 0; i < request->stream_id_size(); i++) - { - int streamIndex; - StreamInfo s; + for (int i = 0; i < request->stream_id_size(); i++) + portInfo[portId]->deleteStream(request->stream_id(i).id()); - streamIndex = getStreamIndex(portIdx, request->stream_id(i).id()); - if (streamIndex < 0) - continue; //! \todo (LOW): Partial status of RPC + //! \todo (LOW): fill-in response "Ack"???? - delete portInfo[portIdx]->streamList.takeAt(streamIndex); + done->Run(); + return; - //! \todo (LOW): fill-in response "Ack"???? - } - portInfo[portIdx]->setDirty(true); -_exit: - done->Run(); +_invalid_port: + controller->SetFailed("invalid portid"); + done->Run(); } void MyService::modifyStream(::google::protobuf::RpcController* controller, -const ::OstProto::StreamConfigList* request, -::OstProto::Ack* response, -::google::protobuf::Closure* done) + const ::OstProto::StreamConfigList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done) { - unsigned int portIdx; + int portId; - qDebug("In %s", __PRETTY_FUNCTION__); + qDebug("In %s", __PRETTY_FUNCTION__); - portIdx = request->port_id().id(); - if (portIdx >= numPorts) - { - controller->SetFailed("invalid portid"); - goto _exit; - } + portId = request->port_id().id(); + if ((portId < 0) || (portId >= portInfo.size())) + goto _invalid_port; - for (int i = 0; i < request->stream_size(); i++) - { - int streamIndex; + for (int i = 0; i < request->stream_size(); i++) + { + StreamBase *stream; - streamIndex = getStreamIndex(portIdx, - request->stream(i).stream_id().id()); - if (streamIndex < 0) - continue; //! \todo (LOW): Partial status of RPC + stream = portInfo[portId]->stream(request->stream(i).stream_id().id()); + if (stream) + { + stream->protoDataCopyFrom(request->stream(i)); + portInfo[portId]->setDirty(); + } + } - portInfo[portIdx]->streamList[streamIndex]->protoDataCopyFrom( - request->stream(i)); + //! \todo(LOW): fill-in response "Ack"???? - //! \todo(LOW): fill-in response "Ack"???? - } - portInfo[portIdx]->setDirty(true); -_exit: - done->Run(); + done->Run(); + return; + +_invalid_port: + controller->SetFailed("invalid portid"); + done->Run(); } void MyService::startTx(::google::protobuf::RpcController* controller, -const ::OstProto::PortIdList* request, -::OstProto::Ack* response, -::google::protobuf::Closure* done) + const ::OstProto::PortIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done) { - qDebug("In %s", __PRETTY_FUNCTION__); + qDebug("In %s", __PRETTY_FUNCTION__); - // If any of the ports in the request are dirty, first update them - for (int i=0; i < request->port_id_size(); i++) - { - uint portIdx; + for (int i = 0; i < request->port_id_size(); i++) + { + int portId; - portIdx = request->port_id(i).id(); - if (portIdx >= numPorts) - continue; //! \todo (LOW): partial RPC? + portId = request->port_id(i).id(); + if ((portId < 0) || (portId >= portInfo.size())) + continue; //! \todo (LOW): partial RPC? - if (portInfo[portIdx]->isDirty()) - portInfo[portIdx]->update(); - } + portInfo[portId]->startTransmit(); + } - for (int i=0; i < request->port_id_size(); i++) - { - uint portIdx; + //! \todo (LOW): fill-in response "Ack"???? - portIdx = request->port_id(i).id(); - if (portIdx >= numPorts) - continue; //! \todo (LOW): partial RPC? - - portInfo[portIdx]->startTransmit(); - } - - //! \todo (LOW): fill-in response "Ack"???? - - done->Run(); + done->Run(); } void MyService::stopTx(::google::protobuf::RpcController* controller, -const ::OstProto::PortIdList* request, -::OstProto::Ack* response, -::google::protobuf::Closure* done) + const ::OstProto::PortIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done) { - qDebug("In %s", __PRETTY_FUNCTION__); + qDebug("In %s", __PRETTY_FUNCTION__); - for (int i=0; i < request->port_id_size(); i++) - { - uint portIdx; + for (int i = 0; i < request->port_id_size(); i++) + { + int portId; - portIdx = request->port_id(i).id(); - if (portIdx >= numPorts) - continue; //! \todo (LOW): partial RPC? + portId = request->port_id(i).id(); + if ((portId < 0) || (portId >= portInfo.size())) + continue; //! \todo (LOW): partial RPC? - portInfo[portIdx]->stopTransmit(); - } - //! \todo (LOW): fill-in response "Ack"???? - done->Run(); + portInfo[portId]->stopTransmit(); + } + + //! \todo (LOW): fill-in response "Ack"???? + + done->Run(); } void MyService::startCapture(::google::protobuf::RpcController* controller, -const ::OstProto::PortIdList* request, -::OstProto::Ack* response, -::google::protobuf::Closure* done) + const ::OstProto::PortIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done) { - qDebug("In %s", __PRETTY_FUNCTION__); + qDebug("In %s", __PRETTY_FUNCTION__); - for (int i=0; i < request->port_id_size(); i++) - { - uint portIdx; + for (int i = 0; i < request->port_id_size(); i++) + { + int portId; - portIdx = request->port_id(i).id(); - if (portIdx >= numPorts) - continue; //! \todo (LOW): partial RPC? + portId = request->port_id(i).id(); + if ((portId < 0) || (portId >= portInfo.size())) + continue; //! \todo (LOW): partial RPC? - portInfo[portIdx]->startCapture(); - } + portInfo[portId]->startCapture(); + } - done->Run(); + //! \todo (LOW): fill-in response "Ack"???? + + done->Run(); } void MyService::stopCapture(::google::protobuf::RpcController* controller, -const ::OstProto::PortIdList* request, -::OstProto::Ack* response, -::google::protobuf::Closure* done) + const ::OstProto::PortIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done) { - qDebug("In %s", __PRETTY_FUNCTION__); - for (int i=0; i < request->port_id_size(); i++) - { - uint portIdx; + qDebug("In %s", __PRETTY_FUNCTION__); + for (int i=0; i < request->port_id_size(); i++) + { + int portId; - portIdx = request->port_id(i).id(); - if (portIdx >= numPorts) - continue; //! \todo (LOW): partial RPC? + portId = request->port_id(i).id(); + if ((portId < 0) || (portId >= portInfo.size())) + continue; //! \todo (LOW): partial RPC? - portInfo[portIdx]->stopCapture(); - } + portInfo[portId]->stopCapture(); + } - done->Run(); + //! \todo (LOW): fill-in response "Ack"???? + + done->Run(); } void MyService::getCaptureBuffer(::google::protobuf::RpcController* controller, -const ::OstProto::PortId* request, -::OstProto::CaptureBuffer* response, -::google::protobuf::Closure* done) + const ::OstProto::PortId* request, + ::OstProto::CaptureBuffer* response, + ::google::protobuf::Closure* done) { - uint portIdx; - qDebug("In %s", __PRETTY_FUNCTION__); + int portId; - portIdx = request->id(); - if (portIdx >= numPorts) - { - controller->SetFailed("invalid portid"); - goto _exit; - } + qDebug("In %s", __PRETTY_FUNCTION__); - portInfo[portIdx]->stopCapture(); - static_cast(controller)->setBinaryBlob( - portInfo[portIdx]->captureFile()); + portId = request->id(); + if ((portId < 0) || (portId >= portInfo.size())) + goto _invalid_port; -_exit: - done->Run(); + portInfo[portId]->stopCapture(); + static_cast(controller)->setBinaryBlob( + portInfo[portId]->captureData()); + + done->Run(); + return; + +_invalid_port: + controller->SetFailed("invalid portid"); + done->Run(); } void MyService::getStats(::google::protobuf::RpcController* controller, -const ::OstProto::PortIdList* request, -::OstProto::PortStatsList* response, -::google::protobuf::Closure* done) + const ::OstProto::PortIdList* request, + ::OstProto::PortStatsList* response, + ::google::protobuf::Closure* done) { - //qDebug("In %s", __PRETTY_FUNCTION__); + //qDebug("In %s", __PRETTY_FUNCTION__); - for (int i=0; i < request->port_id_size(); i++) - { - uint portidx; - ::OstProto::PortStats *s; - OstProto::PortState *st; + for (int i = 0; i < request->port_id_size(); i++) + { + int portId; + AbstractPort::PortStats stats; + OstProto::PortStats *s; + OstProto::PortState *st; - portidx = request->port_id(i).id(); - if (portidx >= numPorts) - continue; //! \todo(LOW): partial rpc? + portId = request->port_id(i).id(); + if ((portId < 0) || (portId >= portInfo.size())) + continue; //! \todo(LOW): partial rpc? - s = response->add_port_stats(); - s->mutable_port_id()->set_id(request->port_id(i).id()); + s = response->add_port_stats(); + s->mutable_port_id()->set_id(request->port_id(i).id()); + + st = s->mutable_state(); + st->set_link_state(portInfo[portId]->linkState()); + st->set_is_transmit_on(portInfo[portId]->isTransmitOn()); + st->set_is_capture_on(portInfo[portId]->isCaptureOn()); + + portInfo[portId]->stats(&stats); #if 0 - if (portidx == 2) - { - qDebug("<%llu", portInfo[portidx]->epochStats.rxPkts); - qDebug(">%llu", portInfo[portidx]->stats.rxPkts); - } + if (portId == 2) + qDebug(">%llu", stats.rxPkts); #endif - portInfo[portidx]->updateLinkState(); + s->set_rx_pkts(stats.rxPkts); + s->set_rx_bytes(stats.rxBytes); + s->set_rx_pps(stats.rxPps); + s->set_rx_bps(stats.rxBps); - st = s->mutable_state(); - st->set_link_state(portInfo[portidx]->linkState); - st->set_is_transmit_on(portInfo[portidx]->transmitter.isRunning()); - st->set_is_capture_on(portInfo[portidx]->capturer.isRunning()); + s->set_tx_pkts(stats.txPkts); + s->set_tx_bytes(stats.txBytes); + s->set_tx_pps(stats.txPps); + s->set_tx_bps(stats.txBps); + } - s->set_rx_pkts(portInfo[portidx]->stats.rxPkts - - portInfo[portidx]->epochStats.rxPkts); - s->set_rx_bytes(portInfo[portidx]->stats.rxBytes - - portInfo[portidx]->epochStats.rxBytes); - s->set_rx_pkts_nic(portInfo[portidx]->stats.rxPktsNic - - portInfo[portidx]->epochStats.rxPktsNic); - s->set_rx_bytes_nic(portInfo[portidx]->stats.rxBytesNic - - portInfo[portidx]->epochStats.rxBytesNic); - s->set_rx_pps(portInfo[portidx]->stats.rxPps); - s->set_rx_bps(portInfo[portidx]->stats.rxBps); - - s->set_tx_pkts(portInfo[portidx]->stats.txPkts - - portInfo[portidx]->epochStats.txPkts); - s->set_tx_bytes(portInfo[portidx]->stats.txBytes - - portInfo[portidx]->epochStats.txBytes); - s->set_tx_pkts_nic(portInfo[portidx]->stats.txPktsNic - - portInfo[portidx]->epochStats.txPktsNic); - s->set_tx_bytes_nic(portInfo[portidx]->stats.txBytesNic - - portInfo[portidx]->epochStats.txBytesNic); - s->set_tx_pps(portInfo[portidx]->stats.txPps); - s->set_tx_bps(portInfo[portidx]->stats.txBps); - } - - done->Run(); + done->Run(); } void MyService::clearStats(::google::protobuf::RpcController* controller, -const ::OstProto::PortIdList* request, -::OstProto::Ack* response, -::google::protobuf::Closure* done) + const ::OstProto::PortIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done) { - qDebug("In %s", __PRETTY_FUNCTION__); + qDebug("In %s", __PRETTY_FUNCTION__); - for (int i=0; i < request->port_id_size(); i++) - { - uint portIdx; + for (int i = 0; i < request->port_id_size(); i++) + { + int portId; - portIdx = request->port_id(i).id(); - if (portIdx >= numPorts) - continue; //! \todo (LOW): partial RPC? + portId = request->port_id(i).id(); + if ((portId < 0) || (portId >= portInfo.size())) + continue; //! \todo (LOW): partial RPC? - portInfo[portIdx]->resetStats(); - } - //! \todo (LOW): fill-in response "Ack"???? + portInfo[portId]->resetStats(); + } - done->Run(); + //! \todo (LOW): fill-in response "Ack"???? + + done->Run(); } +/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */ diff --git a/server/myservice.h b/server/myservice.h index 4be1bdb..5d434ee 100644 --- a/server/myservice.h +++ b/server/myservice.h @@ -1,264 +1,85 @@ -#ifndef _MY_SERVICE_H -#define _MY_SERVICE_H - - -#if 0 -#include -#include -#endif - -#include "../common/protocol.pb.h" -#include "../common/streambase.h" -#include -#include -#include -#include -#include - -#include "../rpc/pbhelper.h" -#include "pcapextra.h" +#ifndef _MY_SERVICE_H +#define _MY_SERVICE_H -#ifdef Q_OS_WIN32 -#include -#endif +#include -#ifdef Q_OS_WIN32 -#define OID_GEN_MEDIA_CONNECT_STATUS 0x00010114 -#endif - -#define MAX_PKT_HDR_SIZE 1536 -#define MAX_STREAM_NAME_SIZE 64 - -//! 7 byte Preamble + 1 byte SFD + 4 byte FCS -#define ETH_FRAME_HDR_SIZE 12 - +#include "../common/protocol.pb.h" -class MyService; - -class StreamInfo : public StreamBase -{ - friend class MyService; - friend class PortInfo; - - OstProto::StreamId mStreamId; - +#define MAX_PKT_HDR_SIZE 1536 +#define MAX_STREAM_NAME_SIZE 64 + +class AbstractPort; + +class MyService: public OstProto::OstService +{ public: - StreamInfo(); - ~StreamInfo(); -}; - - -class PortInfo -{ - friend class MyService; - - class PortMonitorRx: public QThread - { - friend class PortInfo; - - PortInfo *port; -#ifdef Q_OS_WIN32 - PPACKET_OID_DATA oidData; -#endif - public: - PortMonitorRx(PortInfo *port); - static void callbackRx(u_char *state, - const struct pcap_pkthdr *header, const u_char *pkt_data); - void run(); - }; + MyService(); + virtual ~MyService(); - class PortMonitorTx: public QThread - { - friend class PortInfo; - - PortInfo *port; -#ifdef Q_OS_WIN32 - PPACKET_OID_DATA oidData; -#endif - public: - PortMonitorTx(PortInfo *port); - static void callbackTx(u_char *state, - const struct pcap_pkthdr *header, const u_char *pkt_data); - void run(); - }; + /* Methods provided by the service */ + virtual void getPortIdList(::google::protobuf::RpcController* controller, + const ::OstProto::Void* request, + ::OstProto::PortIdList* response, + ::google::protobuf::Closure* done); + virtual void getPortConfig(::google::protobuf::RpcController* controller, + const ::OstProto::PortIdList* request, + ::OstProto::PortConfigList* response, + ::google::protobuf::Closure* done); + virtual void getStreamIdList(::google::protobuf::RpcController* controller, + const ::OstProto::PortId* request, + ::OstProto::StreamIdList* response, + ::google::protobuf::Closure* done); + virtual void getStreamConfig(::google::protobuf::RpcController* controller, + const ::OstProto::StreamIdList* request, + ::OstProto::StreamConfigList* response, + ::google::protobuf::Closure* done); + virtual void addStream(::google::protobuf::RpcController* controller, + const ::OstProto::StreamIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done); + virtual void deleteStream(::google::protobuf::RpcController* controller, + const ::OstProto::StreamIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done); + virtual void modifyStream(::google::protobuf::RpcController* controller, + const ::OstProto::StreamConfigList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done); + virtual void startTx(::google::protobuf::RpcController* controller, + const ::OstProto::PortIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done); + virtual void stopTx(::google::protobuf::RpcController* controller, + const ::OstProto::PortIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done); + virtual void startCapture(::google::protobuf::RpcController* controller, + const ::OstProto::PortIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done); + virtual void stopCapture(::google::protobuf::RpcController* controller, + const ::OstProto::PortIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done); + virtual void getCaptureBuffer(::google::protobuf::RpcController* controller, + const ::OstProto::PortId* request, + ::OstProto::CaptureBuffer* response, + ::google::protobuf::Closure* done); + virtual void getStats(::google::protobuf::RpcController* controller, + const ::OstProto::PortIdList* request, + ::OstProto::PortStatsList* response, + ::google::protobuf::Closure* done); + virtual void clearStats(::google::protobuf::RpcController* controller, + const ::OstProto::PortIdList* request, + ::OstProto::Ack* response, + ::google::protobuf::Closure* done); - class PortTransmitter: public QThread - { - friend class PortInfo; +private: + /*! AbstractPort::id() and index into portInfo[] are same! */ + QList portInfo; - PortInfo *port; - int m_stop; +}; - public: - PortTransmitter(PortInfo *port); - void run(); - void stop(); - }; +#endif - class PortCapture: public QThread - { - friend class PortInfo; - - PortInfo *port; - int m_stop; - pcap_t *capHandle; - pcap_dumper_t *dumpHandle; - QTemporaryFile capFile; - - public: - PortCapture(PortInfo *port); - ~PortCapture(); - void run(); - void stop(); - QFile* captureFile(); - }; - -#ifdef Q_OS_WIN32 - LPADAPTER adapter; - PPACKET_OID_DATA oidData; -#endif - - OstProto::Port d; - OstProto::LinkState linkState; - - struct PortStats - { - quint64 rxPkts; - quint64 rxBytes; - quint64 rxPktsNic; - quint64 rxBytesNic; - quint64 rxPps; - quint64 rxBps; - - quint64 txPkts; - quint64 txBytes; - quint64 txPktsNic; - quint64 txBytesNic; - quint64 txPps; - quint64 txBps; - }; - - //! \todo Need lock for stats access/update - - - //! Stuff we need to maintain since PCAP doesn't as of now. As and when - // PCAP supports it, we'll remove from here - struct PcapExtra - { - - //! PCAP doesn't do any tx stats - quint64 txPkts; - quint64 txBytes; - - }; - - pcap_if_t *dev; - pcap_t *devHandleRx; - pcap_t *devHandleTx; - QList sendQueueList; - int returnToQIdx; // FIXME(MED): combine with sendQList - bool isSendQueueDirty; - PcapExtra pcapExtra; - PortMonitorRx monitorRx; - PortMonitorTx monitorTx; - PortTransmitter transmitter; - PortCapture capturer; - - struct PortStats epochStats; - struct PortStats stats; - struct timeval lastTsRx; //! used for Rate Stats calculations - struct timeval lastTsTx; //! used for Rate Stats calculations - - /*! StreamInfo::d::stream_id and index into streamList[] are NOT same! */ - QList streamList; - -public: - PortInfo(uint id, pcap_if_t *dev); - uint id() { return d.port_id().id(); } - void updateLinkState(); - bool isDirty() { return isSendQueueDirty; } - void setDirty(bool dirty) { isSendQueueDirty = dirty; } - void update(); - void startTransmit(); - void stopTransmit(); - void startCapture(); - void stopCapture(); - QFile* captureFile(); - void resetStats(); -}; - - -class MyService: public OstProto::OstService -{ - uint numPorts; - - /*! PortInfo::d::port_id and index into portInfo[] are same! */ - QList portInfo; - pcap_if_t *alldevs; - - int getStreamIndex(unsigned int portIdx,unsigned int streamId); - -public: - MyService(); - virtual ~MyService(); - - /* Methods provided by the service */ - virtual void getPortIdList(::google::protobuf::RpcController* controller, - const ::OstProto::Void* request, - ::OstProto::PortIdList* response, - ::google::protobuf::Closure* done); - virtual void getPortConfig(::google::protobuf::RpcController* controller, - const ::OstProto::PortIdList* request, - ::OstProto::PortConfigList* response, - ::google::protobuf::Closure* done); - virtual void getStreamIdList(::google::protobuf::RpcController* controller, - const ::OstProto::PortId* request, - ::OstProto::StreamIdList* response, - ::google::protobuf::Closure* done); - virtual void getStreamConfig(::google::protobuf::RpcController* controller, - const ::OstProto::StreamIdList* request, - ::OstProto::StreamConfigList* response, - ::google::protobuf::Closure* done); - virtual void addStream(::google::protobuf::RpcController* controller, - const ::OstProto::StreamIdList* request, - ::OstProto::Ack* response, - ::google::protobuf::Closure* done); - virtual void deleteStream(::google::protobuf::RpcController* controller, - const ::OstProto::StreamIdList* request, - ::OstProto::Ack* response, - ::google::protobuf::Closure* done); - virtual void modifyStream(::google::protobuf::RpcController* controller, - const ::OstProto::StreamConfigList* request, - ::OstProto::Ack* response, - ::google::protobuf::Closure* done); - virtual void startTx(::google::protobuf::RpcController* controller, - const ::OstProto::PortIdList* request, - ::OstProto::Ack* response, - ::google::protobuf::Closure* done); - virtual void stopTx(::google::protobuf::RpcController* controller, - const ::OstProto::PortIdList* request, - ::OstProto::Ack* response, - ::google::protobuf::Closure* done); - virtual void startCapture(::google::protobuf::RpcController* controller, - const ::OstProto::PortIdList* request, - ::OstProto::Ack* response, - ::google::protobuf::Closure* done); - virtual void stopCapture(::google::protobuf::RpcController* controller, - const ::OstProto::PortIdList* request, - ::OstProto::Ack* response, - ::google::protobuf::Closure* done); - virtual void getCaptureBuffer(::google::protobuf::RpcController* controller, - const ::OstProto::PortId* request, - ::OstProto::CaptureBuffer* response, - ::google::protobuf::Closure* done); - virtual void getStats(::google::protobuf::RpcController* controller, - const ::OstProto::PortIdList* request, - ::OstProto::PortStatsList* response, - ::google::protobuf::Closure* done); - virtual void clearStats(::google::protobuf::RpcController* controller, - const ::OstProto::PortIdList* request, - ::OstProto::Ack* response, - ::google::protobuf::Closure* done); -}; - -#endif +/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */ diff --git a/server/pcapextra.cpp b/server/pcapextra.cpp index 197e83b..8274d48 100644 --- a/server/pcapextra.cpp +++ b/server/pcapextra.cpp @@ -1,16 +1,11 @@ +#include "pcapextra.h" + #include // memcpy() #include // malloc(), free() -#include "pcapextra.h" /* NOTE: All code borrowed from WinPcap */ #ifndef Q_OS_WIN32 -int pcap_setmode(pcap_t *p, int mode) -{ - // no STAT mode in libpcap, so just return 0 to indicate success - return 0; -} - pcap_send_queue* pcap_sendqueue_alloc (u_int memsize) { pcap_send_queue *tqueue; @@ -61,111 +56,4 @@ int pcap_sendqueue_queue (pcap_send_queue *queue, } #endif -u_int ost_pcap_sendqueue_list_transmit(pcap_t *p, - QList sendQueueList, int returnToQIdx, int sync, - int *p_stop, quint64* p_pkts, quint64* p_bytes, - void (*pf_usleep)(ulong)) -{ - uint i, ret = 0; - ost_pcap_send_queue sq; - - for(i = 0; i < sendQueueList.size(); i++) - { -_restart: - sq = sendQueueList.at(i); - ret += ost_pcap_sendqueue_transmit(p, sq.sendQueue, sync, - p_stop, p_pkts, p_bytes, pf_usleep); - - if (*p_stop) - return ret; - - //! \todo (HIGH): Timing between subsequent sendQueues - } - - if (returnToQIdx >= 0) - { - i = returnToQIdx; - - //! \todo (HIGH) 1s fixed; Change this to ipg of last stream - (*pf_usleep)(1000000); - goto _restart; - } - - return ret; -} - -u_int ost_pcap_sendqueue_transmit(pcap_t *p, - pcap_send_queue *queue, int sync, - int *p_stop, quint64* p_pkts, quint64* p_bytes, - void (*pf_usleep)(ulong)) -{ - char* PacketBuff = queue->buffer; - int Size = queue->len; - - struct pcap_pkthdr *winpcap_hdr; - struct timeval ts; - char* EndOfUserBuff = (char *)PacketBuff + Size; - int ret; - - // Start from the first packet - winpcap_hdr = (struct pcap_pkthdr*)PacketBuff; - - if((char*)winpcap_hdr + winpcap_hdr->caplen + sizeof(struct pcap_pkthdr) > - EndOfUserBuff ) - { - // Malformed buffer - return 0; - } - - if (sync) - ts = winpcap_hdr->ts; - - while( true ){ - - if (*p_stop) - return (char*)winpcap_hdr - (char*)PacketBuff; - - if(winpcap_hdr->caplen ==0 || winpcap_hdr->caplen > 65536) - { - // Malformed header - return 0; - } - - // Send the packet - ret = pcap_sendpacket(p, - (unsigned char*)winpcap_hdr + sizeof(struct pcap_pkthdr), - winpcap_hdr->caplen); - - if(ret < 0){ - // Error sending the packet - return (char*)winpcap_hdr - (char*)PacketBuff; - } - - if (p_pkts) (*p_pkts)++; - if (p_bytes) (*p_bytes) += winpcap_hdr->caplen; - - // Step to the next packet in the buffer - //(char*)winpcap_hdr += winpcap_hdr->caplen + sizeof(struct pcap_pkthdr); - winpcap_hdr = (struct pcap_pkthdr*) ((char*)winpcap_hdr + - winpcap_hdr->caplen + sizeof(struct pcap_pkthdr)); - - // Check if the end of the user buffer has been reached - if( (char*)winpcap_hdr >= EndOfUserBuff ) - { - return (char*)winpcap_hdr - (char*)PacketBuff; - } - - if (sync) - { - long usec = (winpcap_hdr->ts.tv_sec-ts.tv_sec)*1000000 + - (winpcap_hdr->ts.tv_usec - ts.tv_usec); - - if (usec) - { - (*pf_usleep)(usec); - ts = winpcap_hdr->ts; - } - } - } -} diff --git a/server/pcapextra.h b/server/pcapextra.h index 5c597df..e0dd23b 100644 --- a/server/pcapextra.h +++ b/server/pcapextra.h @@ -1,36 +1,12 @@ #ifndef _PCAP_EXTRA_H #define _PCAP_EXTRA_H -#include -#include - -#include "pcap.h" - -struct ost_pcap_send_queue -{ - pcap_send_queue *sendQueue; - //! Used to track num of packets (and their sizes) in the - // send queue. Also used to find out actual num of pkts sent - // in case of partial send in pcap_sendqueue_transmit() - QList sendQueueCumLen; -}; - -// Common for all OS - *nix or Win32 -u_int ost_pcap_sendqueue_list_transmit(pcap_t *p, - QList sendQueueList, int returnToQIdx, int sync, - int *p_stop, quint64* p_pkts, quint64* p_bytes, - void (*pf_usleep)(ulong)); - -u_int ost_pcap_sendqueue_transmit (pcap_t *p, - pcap_send_queue *queue, int sync, - int *p_stop, quint64* p_pkts, quint64* p_bytes, - void (*pf_usleep)(ulong)); +#include +#include #ifndef Q_OS_WIN32 -// Only for non Win32 - -#define PCAP_OPENFLAG_PROMISCUOUS 1 +//#define PCAP_OPENFLAG_PROMISCUOUS 1 struct pcap_send_queue { @@ -39,15 +15,11 @@ struct pcap_send_queue char *buffer; }; -int pcap_setmode(pcap_t *p, int mode); -#define MODE_STAT 1 - pcap_send_queue* pcap_sendqueue_alloc (u_int memsize); void pcap_sendqueue_destroy (pcap_send_queue *queue); int pcap_sendqueue_queue (pcap_send_queue *queue, const struct pcap_pkthdr *pkt_header, const u_char *pkt_data); - #endif #endif diff --git a/server/pcapport.cpp b/server/pcapport.cpp new file mode 100644 index 0000000..86a5086 --- /dev/null +++ b/server/pcapport.cpp @@ -0,0 +1,407 @@ +#include "pcapport.h" + +pcap_if_t *PcapPort::deviceList_ = NULL; + +PcapPort::PcapPort(int id, const char *device) + : AbstractPort(id, device) +{ + monitorRx_ = new PortMonitor(device, kDirectionRx, &stats_); + monitorTx_ = new PortMonitor(device, kDirectionTx, &stats_); + transmitter_ = new PortTransmitter(device); + capturer_ = new PortCapturer(device); + + if (!deviceList_) + { + char errbuf[PCAP_ERRBUF_SIZE]; + + if (pcap_findalldevs(&deviceList_, errbuf) == -1) + qDebug("Error in pcap_findalldevs_ex: %s\n", errbuf); + } + + for (pcap_if_t *dev = deviceList_; dev != NULL; dev = dev->next) + { + if (strcmp(device, dev->name) == 0) + { + if (dev->description) + data_.set_description(dev->description); + + //! \todo set port IP addr also + } + } +} + +void PcapPort::init() +{ + if (!monitorTx_->isDirectional()) + transmitter_->useExternalStats(&stats_); + + transmitter_->setHandle(monitorRx_->handle()); + + monitorRx_->start(); + monitorTx_->start(); +} + +PcapPort::~PcapPort() +{ + delete capturer_; + delete transmitter_; + delete monitorTx_; + delete monitorRx_; +} + +PcapPort::PortMonitor::PortMonitor(const char *device, Direction direction, + AbstractPort::PortStats *stats) +{ + int ret; + char errbuf[PCAP_ERRBUF_SIZE]; + + direction_ = direction; + isDirectional_ = true; + stats_ = stats; + handle_ = pcap_open_live(device, 64 /* FIXME */, PCAP_OPENFLAG_PROMISCUOUS, + 1000 /* ms */, errbuf); + + if (handle_ == NULL) + goto _open_error; + + switch (direction_) + { + case kDirectionRx: + ret = pcap_setdirection(handle_, PCAP_D_IN); + break; + case kDirectionTx: + ret = pcap_setdirection(handle_, PCAP_D_OUT); + break; + default: + Q_ASSERT(false); + } + + if (ret < 0) + goto _set_direction_error; + + return; + +_set_direction_error: + qDebug("Error setting direction(%d) %s: %s\n", direction, device, + pcap_geterr(handle_)); + isDirectional_ = false; + return; + +_open_error: + qDebug("Error opening port %s: %s\n", device, pcap_geterr(handle_)); +} + +void PcapPort::PortMonitor::run() +{ + while (1) + { + int ret; + struct pcap_pkthdr *hdr; + const uchar *data; + + ret = pcap_next_ex(handle_, &hdr, &data); + switch (ret) + { + case 1: + switch (direction_) + { + case kDirectionRx: + stats_->rxPkts++; + stats_->rxBytes += hdr->len; + break; + + case kDirectionTx: + if (isDirectional_) + { + stats_->txPkts++; + stats_->txBytes += hdr->len; + } + break; + + default: + Q_ASSERT(false); + } + + //! \todo TODO pkt/bit rates + break; + case 0: + //qDebug("%s: timeout. continuing ...", __PRETTY_FUNCTION__); + continue; + case -1: + qWarning("%s: error reading packet (%d): %s", + __PRETTY_FUNCTION__, ret, pcap_geterr(handle_)); + break; + case -2: + default: + qFatal("%s: Unexpected return value %d", __PRETTY_FUNCTION__, ret); + } + } +} + +PcapPort::PortTransmitter::PortTransmitter(const char *device) +{ + char errbuf[PCAP_ERRBUF_SIZE]; + + returnToQIdx_ = -1; + stop_ = false; + stats_ = new AbstractPort::PortStats; + usingInternalStats_ = true; + handle_ = pcap_open_live(device, 64 /* FIXME */, PCAP_OPENFLAG_PROMISCUOUS, + 1000 /* ms */, errbuf); + + if (handle_ == NULL) + goto _open_error; + + usingInternalHandle_ = true; + + return; + +_open_error: + qDebug("Error opening port %s: %s\n", device, pcap_geterr(handle_)); + usingInternalHandle_ = false; +} + +void PcapPort::PortTransmitter::clearPacketList() +{ + Q_ASSERT(!isRunning()); + // \todo lock for sendQueueList + while(sendQueueList_.size()) + { + pcap_send_queue *sq = sendQueueList_.takeFirst(); + pcap_sendqueue_destroy(sq); + } +} + +bool PcapPort::PortTransmitter::appendToPacketList(long sec, long usec, + const uchar *packet, int length) +{ + bool op = true; + pcap_pkthdr pktHdr; + pcap_send_queue *sendQ; + + pktHdr.caplen = pktHdr.len = length; + pktHdr.ts.tv_sec = sec; + pktHdr.ts.tv_usec = usec; + + if (sendQueueList_.size()) + sendQ = sendQueueList_.last(); + else + sendQ = pcap_sendqueue_alloc(1*1024*1024); + + // Not enough space? Alloc another one! + if ((sendQ->len + length + sizeof(pcap_pkthdr)) > sendQ->maxlen) + { + sendQueueList_.append(sendQ); + + //! \todo (LOW): calculate sendqueue size + sendQ = pcap_sendqueue_alloc(1*1024*1024); + } + + if (pcap_sendqueue_queue(sendQ, &pktHdr, (u_char*) packet) < 0) + op = false; + + sendQueueList_.append(sendQ); + return op; +} + +void PcapPort::PortTransmitter::setHandle(pcap_t *handle) +{ + if (usingInternalHandle_) + pcap_close(handle_); + handle_ = handle; + usingInternalStats_ = false; +} + +void PcapPort::PortTransmitter::useExternalStats(AbstractPort::PortStats *stats) +{ + if (usingInternalStats_); + delete stats_; + stats_ = stats; + usingInternalStats_ = false; +} + +void PcapPort::PortTransmitter::run() +{ + //! \todo (MED) Stream Mode - continuous: define before implement + + // NOTE1: We can't use pcap_sendqueue_transmit() directly even on Win32 + // 'coz of 2 reasons - there's no way of stopping it before all packets + // in the sendQueue are sent out and secondly, stats are available only + // when all packets have been sent - no periodic updates + // + // NOTE2: Transmit on the Rx Handle so that we can receive it back + // on the Tx Handle to do stats + // + // NOTE3: Update pcapExtra counters - port TxStats will be updated in the + // 'stats callback' function so that both Rx and Tx stats are updated + // together + + const int kSyncTransmit = 1; + int i; + + for(i = 0; i < sendQueueList_.size(); i++) + { + int ret; +_restart: + ret = sendQueueTransmit(handle_, sendQueueList_.at(i), kSyncTransmit); + + if (ret < 0) + return; + + //! \todo (HIGH): Timing between subsequent sendQueues + } + + if (returnToQIdx_ >= 0) + { + i = returnToQIdx_; + + //! \todo (HIGH) 1s fixed; Change this to ipg of last stream + QThread::usleep(1000000); + goto _restart; + } +} + +void PcapPort::PortTransmitter::stop() +{ + stop_ = true; +} + +int PcapPort::PortTransmitter::sendQueueTransmit(pcap_t *p, + pcap_send_queue *queue, int sync) +{ + struct timeval ts; + struct pcap_pkthdr *hdr = (struct pcap_pkthdr*) queue->buffer; + char *end = queue->buffer + queue->len; + + if (sync) + ts = hdr->ts; + + while (1) + { + uchar *pkt = (uchar*)hdr + sizeof(*hdr); + int pktLen = hdr->caplen; + + if (stop_) + { + stop_ = false; + return -2; + } + + if(pktLen > 0) + pcap_sendpacket(p, pkt, pktLen); + + stats_->txPkts++; + stats_->txBytes += pktLen; + + // Step to the next packet in the buffer + hdr = (struct pcap_pkthdr*) ((uchar*)hdr + sizeof(*hdr) + pktLen); + pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr)); + + // Check if the end of the user buffer has been reached + if((char*) hdr >= end) + return 0; + + if (sync) + { + long usec = (hdr->ts.tv_sec - ts.tv_sec) * 1000000 + + (hdr->ts.tv_usec - ts.tv_usec); + + if (usec) + { + QThread::usleep(usec); + ts = hdr->ts; + } + } + } +} + +PcapPort::PortCapturer::PortCapturer(const char *device) +{ + device_ = QString::fromAscii(device); + stop_ = false; + + if (!capFile_.open()) + qWarning("Unable to open temp cap file"); + + qDebug("cap file = %s", capFile_.fileName().toAscii().constData()); + + dumpHandle_ = NULL; + handle_ = NULL; +} + +PcapPort::PortCapturer::~PortCapturer() +{ + capFile_.close(); +} + +void PcapPort::PortCapturer::run() +{ + char errbuf[PCAP_ERRBUF_SIZE]; + + qDebug("In %s", __PRETTY_FUNCTION__); + + if (!capFile_.isOpen()) + { + qWarning("temp cap file is not open"); + return; + } + + handle_ = pcap_open_live(device_.toAscii().constData(), 65535, + PCAP_OPENFLAG_PROMISCUOUS, 1000 /* ms */, errbuf); + if (handle_ == NULL) + { + qDebug("Error opening port %s: %s\n", + device_.toAscii().constData(), pcap_geterr(handle_)); + return; + } + + dumpHandle_ = pcap_dump_open(handle_, + capFile_.fileName().toAscii().constData()); + + while (1) + { + int ret; + struct pcap_pkthdr *hdr; + const uchar *data; + + ret = pcap_next_ex(handle_, &hdr, &data); + switch (ret) + { + case 1: + pcap_dump((uchar*) dumpHandle_, hdr, data); + break; + case 0: + // timeout: just go back to the loop + break; + case -1: + qWarning("%s: error reading packet (%d): %s", + __PRETTY_FUNCTION__, ret, pcap_geterr(handle_)); + break; + case -2: + default: + qFatal("%s: Unexpected return value %d", __PRETTY_FUNCTION__, ret); + } + + if (stop_) + { + stop_ = false; + break; + } + } + pcap_dump_close(dumpHandle_); + pcap_close(handle_); + dumpHandle_ = NULL; + handle_ = NULL; +} + +void PcapPort::PortCapturer::stop() +{ + stop_ = true; +} + +QFile* PcapPort::PortCapturer::captureFile() +{ + return &capFile_; +} + +/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */ diff --git a/server/pcapport.h b/server/pcapport.h new file mode 100644 index 0000000..8fee855 --- /dev/null +++ b/server/pcapport.h @@ -0,0 +1,121 @@ +#ifndef _SERVER_PCAP_PORT_H +#define _SERVER_PCAP_PORT_H + +#include +#include +#include + +#include "abstractport.h" + +class PcapPort : public AbstractPort +{ +public: + PcapPort(int id, const char *device); + ~PcapPort(); + + void init(); + + virtual void clearPacketList() { + transmitter_->clearPacketList(); + setPacketListLoopMode(false); + } + virtual bool appendToPacketList(long sec, long usec, const uchar *packet, + int length) { + return transmitter_->appendToPacketList(sec, usec, packet, length); + } + virtual void setPacketListLoopMode(bool loop) { + transmitter_->setPacketListLoopMode(loop); + } + + virtual void startTransmit() { + if (isDirty()) + updatePacketList(); + transmitter_->start(); + } + virtual void stopTransmit() { transmitter_->stop(); } + virtual bool isTransmitOn() { return transmitter_->isRunning(); } + + virtual void startCapture() { capturer_->start(); } + virtual void stopCapture() { capturer_->stop(); } + virtual bool isCaptureOn() { return capturer_->isRunning(); } + virtual QIODevice* captureData() { return capturer_->captureFile(); } + +protected: + enum Direction + { + kDirectionRx, + kDirectionTx + }; + + class PortMonitor: public QThread + { + public: + PortMonitor(const char *device, Direction direction, + AbstractPort::PortStats *stats); + void run(); + pcap_t* handle() { return handle_; } + Direction direction() { return direction_; } + bool isDirectional() { return isDirectional_; } + protected: + AbstractPort::PortStats *stats_; + private: + pcap_t *handle_; + Direction direction_; + bool isDirectional_; + }; + + class PortTransmitter: public QThread + { + public: + PortTransmitter(const char *device); + void clearPacketList(); + bool appendToPacketList(long sec, long usec, const uchar *packet, + int length); + void setPacketListLoopMode(bool loop) { + returnToQIdx_ = loop ? 0 : -1; + } + void setHandle(pcap_t *handle); + void useExternalStats(AbstractPort::PortStats *stats); + void run(); + void stop(); + private: + int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, int sync); + + QList sendQueueList_; + int returnToQIdx_; + bool usingInternalStats_; + AbstractPort::PortStats *stats_; + bool usingInternalHandle_; + pcap_t *handle_; + bool stop_; + }; + + class PortCapturer: public QThread + { + public: + PortCapturer(const char *device); + ~PortCapturer(); + void run(); + void stop(); + QFile* captureFile(); + + private: + QString device_; + bool stop_; + QTemporaryFile capFile_; + pcap_t *handle_; + pcap_dumper_t *dumpHandle_; + }; + + PortMonitor *monitorRx_; + PortMonitor *monitorTx_; +private: + PortTransmitter *transmitter_; + PortCapturer *capturer_; + + static pcap_if_t *deviceList_; +}; + +#endif + +/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */ diff --git a/server/portmanager.cpp b/server/portmanager.cpp new file mode 100644 index 0000000..bd7fd78 --- /dev/null +++ b/server/portmanager.cpp @@ -0,0 +1,57 @@ +#include "portmanager.h" + +#include + +#include "winpcapport.h" + +PortManager *PortManager::instance_ = NULL; + +PortManager::PortManager() +{ + int i; + pcap_if_t *deviceList; + pcap_if_t *device; + char errbuf[PCAP_ERRBUF_SIZE]; + + qDebug("Retrieving the device list from the local machine\n"); + + if (pcap_findalldevs(&deviceList, errbuf) == -1) + qDebug("Error in pcap_findalldevs_ex: %s\n", errbuf); + + for(device = deviceList, i = 0; device != NULL; device = device->next, i++) + { + AbstractPort *port; + +#ifdef Q_OS_WIN32 + port = new WinPcapPort(i, device->name); +#else + port = new PcapPort(i, device->name); +#endif + + port->init(); + portList_.append(port); + + qDebug("%d. %s", i, device->name); + if (device->description) + qDebug(" (%s)\n", device->description); + } + + pcap_freealldevs(deviceList); + + return; +} + +PortManager::~PortManager() +{ +} + +PortManager* PortManager::instance() +{ + if (!instance_) + instance_ = new PortManager; + + return instance_; +} + + +/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */ diff --git a/server/portmanager.h b/server/portmanager.h new file mode 100644 index 0000000..c71c6a6 --- /dev/null +++ b/server/portmanager.h @@ -0,0 +1,26 @@ +#ifndef _SERVER_PORT_MANAGER_H +#define _SERVER_PORT_MANAGER_H + +#include +#include "abstractport.h" + +class PortManager +{ +public: + PortManager(); + ~PortManager(); + + int portCount() { return portList_.size(); } + AbstractPort* port(int id) { return portList_[id]; } + + static PortManager* instance(); + +private: + QList portList_; + + static PortManager *instance_; +}; + +#endif + +/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */ diff --git a/server/winpcapport.cpp b/server/winpcapport.cpp new file mode 100644 index 0000000..7e04a42 --- /dev/null +++ b/server/winpcapport.cpp @@ -0,0 +1,144 @@ +#include "winpcapport.h" + +const uint OID_GEN_MEDIA_CONNECT_STATUS = 0x00010114; + +WinPcapPort::WinPcapPort(int id, const char *device) + : PcapPort(id, device) +{ + delete monitorRx_; + delete monitorTx_; + + monitorRx_ = new PortMonitor(device, kDirectionRx, &stats_); + monitorTx_ = new PortMonitor(device, kDirectionTx, &stats_); + + adapter_ = PacketOpenAdapter((CHAR*)device); + if (!adapter_) + qFatal("Unable to open adapter %s", device); + linkStateOid_ = (PPACKET_OID_DATA) malloc(sizeof(PACKET_OID_DATA) + + sizeof(uint)); + if (!linkStateOid_) + qFatal("failed to alloc oidData"); +} + +WinPcapPort::~WinPcapPort() +{ +} + +OstProto::LinkState WinPcapPort::linkState() +{ + memset(linkStateOid_, 0, sizeof(PACKET_OID_DATA) + sizeof(uint)); + + linkStateOid_->Oid = OID_GEN_MEDIA_CONNECT_STATUS; + linkStateOid_->Length = sizeof(uint); + + if (PacketRequest(adapter_, 0, linkStateOid_)) + { + uint state; + + if (linkStateOid_->Length == sizeof(state)) + { + memcpy((void*)&state, (void*)linkStateOid_->Data, + linkStateOid_->Length); + if (state == 0) + linkState_ = OstProto::LinkStateUp; + else if (state == 1) + linkState_ = OstProto::LinkStateDown; + } + } + + return linkState_; +} + +WinPcapPort::PortMonitor::PortMonitor(const char *device, Direction direction, + AbstractPort::PortStats *stats) + : PcapPort::PortMonitor(device, direction, stats) +{ + pcap_setmode(handle(), MODE_STAT); +} + +void WinPcapPort::PortMonitor::run() +{ + struct timeval lastTs; + quint64 lastTxPkts = 0; + quint64 lastTxBytes = 0; + + qWarning("in %s", __PRETTY_FUNCTION__); + + lastTs.tv_sec = 0; + lastTs.tv_usec = 0; + + while (1) + { + int ret; + struct pcap_pkthdr *hdr; + const uchar *data; + + ret = pcap_next_ex(handle(), &hdr, &data); + switch (ret) + { + case 1: + { + quint64 pkts = *((quint64*)(data + 0)); + quint64 bytes = *((quint64*)(data + 8)); + + // TODO: is it 12 or 16? + bytes -= pkts * 12; + + uint usec = (hdr->ts.tv_sec - lastTs.tv_sec) * 1000000 + + (hdr->ts.tv_usec - lastTs.tv_usec); + + switch (direction()) + { + case kDirectionRx: + stats_->rxPkts += pkts; + stats_->rxBytes += bytes; + stats_->rxPps = (pkts * 1000000) / usec; + stats_->rxBps = (bytes * 1000000) / usec; + break; + + case kDirectionTx: + if (isDirectional()) + { + stats_->txPkts += pkts; + stats_->txBytes += bytes; + } + else + { + // Assuming stats_->txXXX are updated externally + quint64 txPkts = stats_->txPkts; + quint64 txBytes = stats_->txBytes; + + pkts = txPkts - lastTxPkts; + bytes = txBytes - lastTxBytes; + + lastTxPkts = txPkts; + lastTxBytes = txBytes; + } + stats_->txPps = (pkts * 1000000) / usec; + stats_->txBps = (bytes * 1000000) / usec; + break; + + default: + Q_ASSERT(false); + } + + break; + } + case 0: + //qDebug("%s: timeout. continuing ...", __PRETTY_FUNCTION__); + continue; + case -1: + qWarning("%s: error reading packet (%d): %s", + __PRETTY_FUNCTION__, ret, pcap_geterr(handle())); + break; + case -2: + default: + qFatal("%s: Unexpected return value %d", __PRETTY_FUNCTION__, ret); + } + lastTs.tv_sec = hdr->ts.tv_sec; + lastTs.tv_usec = hdr->ts.tv_usec; + QThread::msleep(1000); + } +} + +/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */ diff --git a/server/winpcapport.h b/server/winpcapport.h new file mode 100644 index 0000000..100ad2a --- /dev/null +++ b/server/winpcapport.h @@ -0,0 +1,31 @@ +#ifndef _SERVER_WIN_PCAP_PORT_H +#define _SERVER_WIN_PCAP_PORT_H + +#include "pcapport.h" + +#include + +class WinPcapPort : public PcapPort +{ +public: + WinPcapPort(int id, const char *device); + ~WinPcapPort(); + + virtual OstProto::LinkState linkState(); + +protected: + class PortMonitor: public PcapPort::PortMonitor + { + public: + PortMonitor(const char *device, Direction direction, + AbstractPort::PortStats *stats); + void run(); + }; +private: + LPADAPTER adapter_; + PPACKET_OID_DATA linkStateOid_ ; +}; + +#endif + +/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */