From f13b0915d57f5347aadb5fc4f12f3b053374ace8 Mon Sep 17 00:00:00 2001 From: "Srivats P." Date: Sun, 22 Feb 2009 07:53:14 +0000 Subject: [PATCH] Packet Transmit Changes - not using pcap_sendqueue_transmit() any longer --- client/portstatsmodel.cpp | 2 +- client/portstatswindow.cpp | 2 +- common/protocol.proto | 4 +- server/drone.pro | 2 +- server/myservice.cpp | 126 +++++++++++++++++++++---------------- server/myservice.h | 12 ++-- server/pcapextra.cpp | 47 +++++++++++++- server/pcapextra.h | 28 ++++++++- 8 files changed, 154 insertions(+), 69 deletions(-) diff --git a/client/portstatsmodel.cpp b/client/portstatsmodel.cpp index 7676e42..bcc0f54 100644 --- a/client/portstatsmodel.cpp +++ b/client/portstatsmodel.cpp @@ -12,7 +12,7 @@ PortStatsModel::PortStatsModel(PortGroupList *p, QObject *parent) timer = new QTimer(); connect(timer, SIGNAL(timeout()), this, SLOT(updateStats())); - timer->start(2000); + timer->start(1000); } int PortStatsModel::rowCount(const QModelIndex &parent) const diff --git a/client/portstatswindow.cpp b/client/portstatswindow.cpp index 53be8b7..d1340a6 100644 --- a/client/portstatswindow.cpp +++ b/client/portstatswindow.cpp @@ -54,7 +54,7 @@ void PortStatsWindow::on_tbStopTransmit_clicked() for (int i = 0; i < pgpl.size(); i++) { pgl->portGroupByIndex(pgpl.at(i).portGroupId). - startTx(&pgpl[i].portList); + stopTx(&pgpl[i].portList); } } diff --git a/common/protocol.proto b/common/protocol.proto index 04c7a70..7038517 100644 --- a/common/protocol.proto +++ b/common/protocol.proto @@ -227,8 +227,8 @@ message StreamControl { optional uint32 num_bursts = 4 [default = 1]; optional uint32 packets_per_burst = 5 [default = 10]; optional NextWhat next = 6 [default = e_nw_goto_next]; - optional uint32 packets_per_sec = 7; - optional uint32 bursts_per_sec = 8; + optional uint32 packets_per_sec = 7 [default = 1]; + optional uint32 bursts_per_sec = 8 [default = 1]; // TODO: Gaps? diff --git a/server/drone.pro b/server/drone.pro index 315a0e2..15d69b1 100644 --- a/server/drone.pro +++ b/server/drone.pro @@ -12,6 +12,6 @@ FORMS += drone.ui SOURCES += drone_main.cpp drone.cpp SOURCES += myservice.cpp -unix:SOURCES += pcapextra.cpp +SOURCES += pcapextra.cpp SOURCES += "..\common\protocol.pb.cc" diff --git a/server/myservice.cpp b/server/myservice.cpp index eef1610..dd19090 100644 --- a/server/myservice.cpp +++ b/server/myservice.cpp @@ -558,8 +558,7 @@ PortInfo::PortInfo(uint id, pcap_if_t *dev) resetStats(); // We'll create sendqueue later when required - sendQueue = NULL; - pcapExtra.sendQueueCumLen.clear(); + sendQueueList.clear(); pcapExtra.txPkts = 0; pcapExtra.txBytes = 0; isSendQueueDirty=true; @@ -571,17 +570,21 @@ PortInfo::PortInfo(uint id, pcap_if_t *dev) void PortInfo::update() { - uchar pktBuf[2000]; - pcap_pkthdr pktHdr; + uchar pktBuf[2000]; + pcap_pkthdr pktHdr; + ost_pcap_send_queue sendQ; qDebug("In %s", __FUNCTION__); - if (sendQueue) - pcap_sendqueue_destroy(sendQueue); + if (sendQueueList.size()) + { + foreach(sendQ, sendQueueList) + pcap_sendqueue_destroy(sendQ.sendQueue); + } // TODO(LOW): calculate sendqueue size - sendQueue = pcap_sendqueue_alloc(1*MB); - pcapExtra.sendQueueCumLen.clear(); + sendQ.sendQueue = pcap_sendqueue_alloc(1*MB); + sendQ.sendQueueCumLen.clear(); // First sort the streams by ordinalValue qSort(streamList); @@ -590,32 +593,36 @@ void PortInfo::update() { if (streamList[i].d.core().is_enabled()) { - int numPackets, numBursts; - long ipg; + long numPackets, numBursts; + long ibg, ipg; switch (streamList[i].d.control().unit()) { case OstProto::StreamControl::e_su_bursts: numBursts = streamList[i].d.control().num_bursts(); numPackets = streamList[i].d.control().packets_per_burst(); + ibg = 1000000/streamList[i].d.control().bursts_per_sec(); + ipg = 0; break; case OstProto::StreamControl::e_su_packets: numBursts = 1; numPackets = streamList[i].d.control().num_packets(); + ibg = 0; ipg = 1000000/streamList[i].d.control().packets_per_sec(); - qDebug("ipg = %ld\n", ipg); break; default: qWarning("Unhandled stream control unit %d", streamList[i].d.control().unit()); continue; } + qDebug("numBursts = %ld, numPackets = %ld\n", + numBursts, numPackets); + qDebug("ibg = %ld, ipg = %ld\n", ibg, ipg); pktHdr.ts.tv_sec = 0; pktHdr.ts.tv_usec = 0; for (int j = 0; j < numBursts; j++) { - // FIXME(HI): IBG rate (bursts_per_sec) for (int k = 0; k < numPackets; k++) { int len; @@ -632,20 +639,45 @@ void PortInfo::update() pktHdr.ts.tv_usec -= 1000000; } - if (-1 == pcap_sendqueue_queue(sendQueue, &pktHdr, + // Not enough space? Alloc another one! + if ((sendQ.sendQueue->len + len + sizeof(pcap_pkthdr)) + > sendQ.sendQueue->maxlen) + { + sendQueueList.append(sendQ); + + // TODO(LOW): calculate sendqueue size + sendQ.sendQueue = pcap_sendqueue_alloc(1*MB); + sendQ.sendQueueCumLen.clear(); + +#if 0 + pktHdr.ts.tv_sec = 0; + pktHdr.ts.tv_usec = 0; +#endif + } + + if (-1 == pcap_sendqueue_queue(sendQ.sendQueue, &pktHdr, (u_char*) pktBuf)) { qDebug("[port %d] sendqueue_queue() failed for " "streamidx %d\n", id(), i); } else - pcapExtra.sendQueueCumLen.append(sendQueue->len); + sendQ.sendQueueCumLen.append(sendQ.sendQueue->len); } } + pktHdr.ts.tv_usec += ibg; + if (pktHdr.ts.tv_usec > 1000000) + { + pktHdr.ts.tv_sec++; + pktHdr.ts.tv_usec -= 1000000; + } } } } + // The last alloc'ed sendQ appended here + sendQueueList.append(sendQ); + isSendQueueDirty = false; } @@ -656,6 +688,7 @@ void PortInfo::startTransmit() void PortInfo::stopTransmit() { + transmitter.stop(); } void PortInfo::resetStats() @@ -1012,53 +1045,33 @@ PortInfo::PortTransmitter::PortTransmitter(PortInfo *port) void PortInfo::PortTransmitter::run() { - uint bytes, pkts; - // TODO(HI): Stream Mode - one pass/continuous - // NOTE: Transmit on the Rx Handle so that we can receive it back + + // NOTE1: We can't use pcap_sendqueue_transmit() directly even on Win32 + // 'coz of 2 reasons - there's no way of stopping it before all packets + // in the sendQueue are sent out and secondly, stats are available only + // when all packets have been sent - no periodic updates + // + // NOTE2: Transmit on the Rx Handle so that we can receive it back // on the Tx Handle to do stats - bytes = pcap_sendqueue_transmit(port->devHandleRx, port->sendQueue, true); - if (bytes < port->sendQueue->len) - { - qDebug("port %d: sent (%d/%d) error %s. TxStats may be inconsistent", - port->id(), bytes, port->sendQueue->len, - pcap_geterr(port->devHandleTx)); - - // parse sendqueue using 'bytes' to get actual pkts sent -#if 0 - // FIXME(LOW): Get this working - pkts = qUpperBound(pcapExtra.sendQueueCumLen, bytes); -#else - for (int i = 0; i < port->pcapExtra.sendQueueCumLen.size(); i++) - { - if (port->pcapExtra.sendQueueCumLen.at(i) > bytes) - { - pkts = i; - break; - } - } -#endif - } - else - { - qDebug("port %d: sent (%d/%d) bytes\n", port->id(), bytes, - port->sendQueue->len); - pkts = port->pcapExtra.sendQueueCumLen.size(); - } - - // pcap_sendqueue_transmit() returned 'bytes' includes size of pcap_pkthdr - // - adjust for it - if (bytes) - bytes -= pkts * sizeof(pcap_pkthdr); -#ifdef Q_OS_WIN32 - // Update pcapExtra counters - port TxStats will be updated in the + // + // NOTE3: Update pcapExtra counters - port TxStats will be updated in the // 'stats callback' function so that both Rx and Tx stats are updated // together - port->pcapExtra.txPkts += pkts; - port->pcapExtra.txBytes += bytes; -#endif + + m_stop = 0; + ost_pcap_sendqueue_list_transmit(port->devHandleRx, port->sendQueueList, + true, &m_stop, &port->pcapExtra.txPkts, &port->pcapExtra.txBytes, + QThread::usleep); + m_stop = 0; } +void PortInfo::PortTransmitter::stop() +{ + m_stop = 1; +} + + /*--------------- MyService ---------------*/ int MyService::getStreamIndex(unsigned int portIdx, @@ -1455,11 +1468,14 @@ const ::OstProto::PortIdList* request, s = response->add_port_stats(); s->mutable_port_id()->set_id(request->port_id(i).id()); +#if 0 if (portidx == 2) { qDebug("<%llu", portInfo[portidx]->epochStats.rxPkts); qDebug(">%llu", portInfo[portidx]->stats.rxPkts); } +#endif + s->set_rx_pkts(portInfo[portidx]->stats.rxPkts - portInfo[portidx]->epochStats.rxPkts); s->set_rx_bytes(portInfo[portidx]->stats.rxBytes - diff --git a/server/myservice.h b/server/myservice.h index 6155cac..7f1cb88 100644 --- a/server/myservice.h +++ b/server/myservice.h @@ -87,11 +87,13 @@ class PortInfo { friend class PortInfo; - PortInfo *port; + PortInfo *port; + int m_stop; public: PortTransmitter(PortInfo *port); void run(); + void stop(); }; OstProto::Port d; @@ -114,25 +116,23 @@ class PortInfo }; //! \todo Need lock for stats access/update + //! Stuff we need to maintain since PCAP doesn't as of now. As and when // PCAP supports it, we'll remove from here struct PcapExtra { - //! Used to track num of packets (and their sizes) in the - // send queue. Also used to find out actual num of pkts sent - // in case of partial send in pcap_sendqueue_transmit() - QList sendQueueCumLen; //! PCAP doesn't do any tx stats quint64 txPkts; quint64 txBytes; + }; pcap_if_t *dev; pcap_t *devHandleRx; pcap_t *devHandleTx; - pcap_send_queue* sendQueue; + QList sendQueueList; bool isSendQueueDirty; PcapExtra pcapExtra; PortMonitorRx monitorRx; diff --git a/server/pcapextra.cpp b/server/pcapextra.cpp index f3dd255..78d57e0 100644 --- a/server/pcapextra.cpp +++ b/server/pcapextra.cpp @@ -4,6 +4,7 @@ /* NOTE: All code borrowed from WinPcap */ +#ifndef Q_OS_WIN32 int pcap_setmode(pcap_t *p, int mode) { // no STAT mode in libpcap, so just return 0 to indicate success @@ -58,13 +59,36 @@ int pcap_sendqueue_queue (pcap_send_queue *queue, return 0; } +#endif -u_int pcap_sendqueue_transmit (pcap_t *p, pcap_send_queue *queue, int sync) +u_int ost_pcap_sendqueue_list_transmit(pcap_t *p, + QList sendQueueList, int sync, + int *p_stop, quint64* p_pkts, quint64* p_bytes, + void (*pf_usleep)(ulong)) +{ + uint ret = 0; + + foreach(ost_pcap_send_queue sq, sendQueueList) + { + ret += ost_pcap_sendqueue_transmit(p, sq.sendQueue, sync, + p_stop, p_pkts, p_bytes, pf_usleep); + + // TODO(HI): Timing between subsequent sendQueues + } + + return ret; +} + +u_int ost_pcap_sendqueue_transmit(pcap_t *p, + pcap_send_queue *queue, int sync, + int *p_stop, quint64* p_pkts, quint64* p_bytes, + void (*pf_usleep)(ulong)) { char* PacketBuff = queue->buffer; int Size = queue->len; struct pcap_pkthdr *winpcap_hdr; + struct timeval ts; char* EndOfUserBuff = (char *)PacketBuff + Size; int ret; @@ -78,8 +102,14 @@ u_int pcap_sendqueue_transmit (pcap_t *p, pcap_send_queue *queue, int sync) return 0; } + if (sync) + ts = winpcap_hdr->ts; + while( true ){ + if (*p_stop) + return (char*)winpcap_hdr - (char*)PacketBuff; + if(winpcap_hdr->caplen ==0 || winpcap_hdr->caplen > 65536) { // Malformed header @@ -96,6 +126,9 @@ u_int pcap_sendqueue_transmit (pcap_t *p, pcap_send_queue *queue, int sync) return (char*)winpcap_hdr - (char*)PacketBuff; } + if (p_pkts) (*p_pkts)++; + if (p_bytes) (*p_bytes) += winpcap_hdr->caplen; + // Step to the next packet in the buffer //(char*)winpcap_hdr += winpcap_hdr->caplen + sizeof(struct pcap_pkthdr); winpcap_hdr = (struct pcap_pkthdr*) ((char*)winpcap_hdr + @@ -106,6 +139,18 @@ u_int pcap_sendqueue_transmit (pcap_t *p, pcap_send_queue *queue, int sync) { return (char*)winpcap_hdr - (char*)PacketBuff; } + + if (sync) + { + long usec = (winpcap_hdr->ts.tv_sec-ts.tv_sec)*1000000 + + (winpcap_hdr->ts.tv_usec - ts.tv_usec); + + if (usec) + { + (*pf_usleep)(usec); + ts = winpcap_hdr->ts; + } + } } } diff --git a/server/pcapextra.h b/server/pcapextra.h index 25ae4bf..3d2f8f4 100644 --- a/server/pcapextra.h +++ b/server/pcapextra.h @@ -1,10 +1,35 @@ #ifndef _PCAP_EXTRA_H #define _PCAP_EXTRA_H -#ifndef Q_OS_WIN32 +#include +#include #include "pcap.h" +struct ost_pcap_send_queue +{ + pcap_send_queue *sendQueue; + //! Used to track num of packets (and their sizes) in the + // send queue. Also used to find out actual num of pkts sent + // in case of partial send in pcap_sendqueue_transmit() + QList sendQueueCumLen; +}; + +// Common for all OS - *nix or Win32 +u_int ost_pcap_sendqueue_list_transmit(pcap_t *p, + QList sendQueueList, int sync, + int *p_stop, quint64* p_pkts, quint64* p_bytes, + void (*pf_usleep)(ulong)); + +u_int ost_pcap_sendqueue_transmit (pcap_t *p, + pcap_send_queue *queue, int sync, + int *p_stop, quint64* p_pkts, quint64* p_bytes, + void (*pf_usleep)(ulong)); + +#ifndef Q_OS_WIN32 +// Only for non Win32 + + #define PCAP_OPENFLAG_PROMISCUOUS 1 struct pcap_send_queue @@ -21,7 +46,6 @@ pcap_send_queue* pcap_sendqueue_alloc (u_int memsize); void pcap_sendqueue_destroy (pcap_send_queue *queue); int pcap_sendqueue_queue (pcap_send_queue *queue, const struct pcap_pkthdr *pkt_header, const u_char *pkt_data); -u_int pcap_sendqueue_transmit (pcap_t *p, pcap_send_queue *queue, int sync); #endif