Packet Transmit Changes - not using pcap_sendqueue_transmit() any longer

This commit is contained in:
Srivats P. 2009-02-22 07:53:14 +00:00
parent 017cb75ae5
commit f13b0915d5
8 changed files with 154 additions and 69 deletions

View File

@ -12,7 +12,7 @@ PortStatsModel::PortStatsModel(PortGroupList *p, QObject *parent)
timer = new QTimer(); timer = new QTimer();
connect(timer, SIGNAL(timeout()), this, SLOT(updateStats())); connect(timer, SIGNAL(timeout()), this, SLOT(updateStats()));
timer->start(2000); timer->start(1000);
} }
int PortStatsModel::rowCount(const QModelIndex &parent) const int PortStatsModel::rowCount(const QModelIndex &parent) const

View File

@ -54,7 +54,7 @@ void PortStatsWindow::on_tbStopTransmit_clicked()
for (int i = 0; i < pgpl.size(); i++) for (int i = 0; i < pgpl.size(); i++)
{ {
pgl->portGroupByIndex(pgpl.at(i).portGroupId). pgl->portGroupByIndex(pgpl.at(i).portGroupId).
startTx(&pgpl[i].portList); stopTx(&pgpl[i].portList);
} }
} }

View File

@ -227,8 +227,8 @@ message StreamControl {
optional uint32 num_bursts = 4 [default = 1]; optional uint32 num_bursts = 4 [default = 1];
optional uint32 packets_per_burst = 5 [default = 10]; optional uint32 packets_per_burst = 5 [default = 10];
optional NextWhat next = 6 [default = e_nw_goto_next]; optional NextWhat next = 6 [default = e_nw_goto_next];
optional uint32 packets_per_sec = 7; optional uint32 packets_per_sec = 7 [default = 1];
optional uint32 bursts_per_sec = 8; optional uint32 bursts_per_sec = 8 [default = 1];
// TODO: Gaps? // TODO: Gaps?

View File

@ -12,6 +12,6 @@ FORMS += drone.ui
SOURCES += drone_main.cpp drone.cpp SOURCES += drone_main.cpp drone.cpp
SOURCES += myservice.cpp SOURCES += myservice.cpp
unix:SOURCES += pcapextra.cpp SOURCES += pcapextra.cpp
SOURCES += "..\common\protocol.pb.cc" SOURCES += "..\common\protocol.pb.cc"

View File

@ -558,8 +558,7 @@ PortInfo::PortInfo(uint id, pcap_if_t *dev)
resetStats(); resetStats();
// We'll create sendqueue later when required // We'll create sendqueue later when required
sendQueue = NULL; sendQueueList.clear();
pcapExtra.sendQueueCumLen.clear();
pcapExtra.txPkts = 0; pcapExtra.txPkts = 0;
pcapExtra.txBytes = 0; pcapExtra.txBytes = 0;
isSendQueueDirty=true; isSendQueueDirty=true;
@ -571,17 +570,21 @@ PortInfo::PortInfo(uint id, pcap_if_t *dev)
void PortInfo::update() void PortInfo::update()
{ {
uchar pktBuf[2000]; uchar pktBuf[2000];
pcap_pkthdr pktHdr; pcap_pkthdr pktHdr;
ost_pcap_send_queue sendQ;
qDebug("In %s", __FUNCTION__); qDebug("In %s", __FUNCTION__);
if (sendQueue) if (sendQueueList.size())
pcap_sendqueue_destroy(sendQueue); {
foreach(sendQ, sendQueueList)
pcap_sendqueue_destroy(sendQ.sendQueue);
}
// TODO(LOW): calculate sendqueue size // TODO(LOW): calculate sendqueue size
sendQueue = pcap_sendqueue_alloc(1*MB); sendQ.sendQueue = pcap_sendqueue_alloc(1*MB);
pcapExtra.sendQueueCumLen.clear(); sendQ.sendQueueCumLen.clear();
// First sort the streams by ordinalValue // First sort the streams by ordinalValue
qSort(streamList); qSort(streamList);
@ -590,32 +593,36 @@ void PortInfo::update()
{ {
if (streamList[i].d.core().is_enabled()) if (streamList[i].d.core().is_enabled())
{ {
int numPackets, numBursts; long numPackets, numBursts;
long ipg; long ibg, ipg;
switch (streamList[i].d.control().unit()) switch (streamList[i].d.control().unit())
{ {
case OstProto::StreamControl::e_su_bursts: case OstProto::StreamControl::e_su_bursts:
numBursts = streamList[i].d.control().num_bursts(); numBursts = streamList[i].d.control().num_bursts();
numPackets = streamList[i].d.control().packets_per_burst(); numPackets = streamList[i].d.control().packets_per_burst();
ibg = 1000000/streamList[i].d.control().bursts_per_sec();
ipg = 0;
break; break;
case OstProto::StreamControl::e_su_packets: case OstProto::StreamControl::e_su_packets:
numBursts = 1; numBursts = 1;
numPackets = streamList[i].d.control().num_packets(); numPackets = streamList[i].d.control().num_packets();
ibg = 0;
ipg = 1000000/streamList[i].d.control().packets_per_sec(); ipg = 1000000/streamList[i].d.control().packets_per_sec();
qDebug("ipg = %ld\n", ipg);
break; break;
default: default:
qWarning("Unhandled stream control unit %d", qWarning("Unhandled stream control unit %d",
streamList[i].d.control().unit()); streamList[i].d.control().unit());
continue; continue;
} }
qDebug("numBursts = %ld, numPackets = %ld\n",
numBursts, numPackets);
qDebug("ibg = %ld, ipg = %ld\n", ibg, ipg);
pktHdr.ts.tv_sec = 0; pktHdr.ts.tv_sec = 0;
pktHdr.ts.tv_usec = 0; pktHdr.ts.tv_usec = 0;
for (int j = 0; j < numBursts; j++) for (int j = 0; j < numBursts; j++)
{ {
// FIXME(HI): IBG rate (bursts_per_sec)
for (int k = 0; k < numPackets; k++) for (int k = 0; k < numPackets; k++)
{ {
int len; int len;
@ -632,20 +639,45 @@ void PortInfo::update()
pktHdr.ts.tv_usec -= 1000000; pktHdr.ts.tv_usec -= 1000000;
} }
if (-1 == pcap_sendqueue_queue(sendQueue, &pktHdr, // Not enough space? Alloc another one!
if ((sendQ.sendQueue->len + len + sizeof(pcap_pkthdr))
> sendQ.sendQueue->maxlen)
{
sendQueueList.append(sendQ);
// TODO(LOW): calculate sendqueue size
sendQ.sendQueue = pcap_sendqueue_alloc(1*MB);
sendQ.sendQueueCumLen.clear();
#if 0
pktHdr.ts.tv_sec = 0;
pktHdr.ts.tv_usec = 0;
#endif
}
if (-1 == pcap_sendqueue_queue(sendQ.sendQueue, &pktHdr,
(u_char*) pktBuf)) (u_char*) pktBuf))
{ {
qDebug("[port %d] sendqueue_queue() failed for " qDebug("[port %d] sendqueue_queue() failed for "
"streamidx %d\n", id(), i); "streamidx %d\n", id(), i);
} }
else else
pcapExtra.sendQueueCumLen.append(sendQueue->len); sendQ.sendQueueCumLen.append(sendQ.sendQueue->len);
} }
} }
pktHdr.ts.tv_usec += ibg;
if (pktHdr.ts.tv_usec > 1000000)
{
pktHdr.ts.tv_sec++;
pktHdr.ts.tv_usec -= 1000000;
}
} }
} }
} }
// The last alloc'ed sendQ appended here
sendQueueList.append(sendQ);
isSendQueueDirty = false; isSendQueueDirty = false;
} }
@ -656,6 +688,7 @@ void PortInfo::startTransmit()
void PortInfo::stopTransmit() void PortInfo::stopTransmit()
{ {
transmitter.stop();
} }
void PortInfo::resetStats() void PortInfo::resetStats()
@ -1012,53 +1045,33 @@ PortInfo::PortTransmitter::PortTransmitter(PortInfo *port)
void PortInfo::PortTransmitter::run() void PortInfo::PortTransmitter::run()
{ {
uint bytes, pkts;
// TODO(HI): Stream Mode - one pass/continuous // TODO(HI): Stream Mode - one pass/continuous
// NOTE: Transmit on the Rx Handle so that we can receive it back
// NOTE1: We can't use pcap_sendqueue_transmit() directly even on Win32
// 'coz of 2 reasons - there's no way of stopping it before all packets
// in the sendQueue are sent out and secondly, stats are available only
// when all packets have been sent - no periodic updates
//
// NOTE2: Transmit on the Rx Handle so that we can receive it back
// on the Tx Handle to do stats // on the Tx Handle to do stats
bytes = pcap_sendqueue_transmit(port->devHandleRx, port->sendQueue, true); //
if (bytes < port->sendQueue->len) // NOTE3: Update pcapExtra counters - port TxStats will be updated in the
{
qDebug("port %d: sent (%d/%d) error %s. TxStats may be inconsistent",
port->id(), bytes, port->sendQueue->len,
pcap_geterr(port->devHandleTx));
// parse sendqueue using 'bytes' to get actual pkts sent
#if 0
// FIXME(LOW): Get this working
pkts = qUpperBound(pcapExtra.sendQueueCumLen, bytes);
#else
for (int i = 0; i < port->pcapExtra.sendQueueCumLen.size(); i++)
{
if (port->pcapExtra.sendQueueCumLen.at(i) > bytes)
{
pkts = i;
break;
}
}
#endif
}
else
{
qDebug("port %d: sent (%d/%d) bytes\n", port->id(), bytes,
port->sendQueue->len);
pkts = port->pcapExtra.sendQueueCumLen.size();
}
// pcap_sendqueue_transmit() returned 'bytes' includes size of pcap_pkthdr
// - adjust for it
if (bytes)
bytes -= pkts * sizeof(pcap_pkthdr);
#ifdef Q_OS_WIN32
// Update pcapExtra counters - port TxStats will be updated in the
// 'stats callback' function so that both Rx and Tx stats are updated // 'stats callback' function so that both Rx and Tx stats are updated
// together // together
port->pcapExtra.txPkts += pkts;
port->pcapExtra.txBytes += bytes; m_stop = 0;
#endif ost_pcap_sendqueue_list_transmit(port->devHandleRx, port->sendQueueList,
true, &m_stop, &port->pcapExtra.txPkts, &port->pcapExtra.txBytes,
QThread::usleep);
m_stop = 0;
} }
void PortInfo::PortTransmitter::stop()
{
m_stop = 1;
}
/*--------------- MyService ---------------*/ /*--------------- MyService ---------------*/
int MyService::getStreamIndex(unsigned int portIdx, int MyService::getStreamIndex(unsigned int portIdx,
@ -1455,11 +1468,14 @@ const ::OstProto::PortIdList* request,
s = response->add_port_stats(); s = response->add_port_stats();
s->mutable_port_id()->set_id(request->port_id(i).id()); s->mutable_port_id()->set_id(request->port_id(i).id());
#if 0
if (portidx == 2) if (portidx == 2)
{ {
qDebug("<%llu", portInfo[portidx]->epochStats.rxPkts); qDebug("<%llu", portInfo[portidx]->epochStats.rxPkts);
qDebug(">%llu", portInfo[portidx]->stats.rxPkts); qDebug(">%llu", portInfo[portidx]->stats.rxPkts);
} }
#endif
s->set_rx_pkts(portInfo[portidx]->stats.rxPkts - s->set_rx_pkts(portInfo[portidx]->stats.rxPkts -
portInfo[portidx]->epochStats.rxPkts); portInfo[portidx]->epochStats.rxPkts);
s->set_rx_bytes(portInfo[portidx]->stats.rxBytes - s->set_rx_bytes(portInfo[portidx]->stats.rxBytes -

View File

@ -87,11 +87,13 @@ class PortInfo
{ {
friend class PortInfo; friend class PortInfo;
PortInfo *port; PortInfo *port;
int m_stop;
public: public:
PortTransmitter(PortInfo *port); PortTransmitter(PortInfo *port);
void run(); void run();
void stop();
}; };
OstProto::Port d; OstProto::Port d;
@ -114,25 +116,23 @@ class PortInfo
}; };
//! \todo Need lock for stats access/update //! \todo Need lock for stats access/update
//! Stuff we need to maintain since PCAP doesn't as of now. As and when //! Stuff we need to maintain since PCAP doesn't as of now. As and when
// PCAP supports it, we'll remove from here // PCAP supports it, we'll remove from here
struct PcapExtra struct PcapExtra
{ {
//! Used to track num of packets (and their sizes) in the
// send queue. Also used to find out actual num of pkts sent
// in case of partial send in pcap_sendqueue_transmit()
QList<uint> sendQueueCumLen;
//! PCAP doesn't do any tx stats //! PCAP doesn't do any tx stats
quint64 txPkts; quint64 txPkts;
quint64 txBytes; quint64 txBytes;
}; };
pcap_if_t *dev; pcap_if_t *dev;
pcap_t *devHandleRx; pcap_t *devHandleRx;
pcap_t *devHandleTx; pcap_t *devHandleTx;
pcap_send_queue* sendQueue; QList<ost_pcap_send_queue> sendQueueList;
bool isSendQueueDirty; bool isSendQueueDirty;
PcapExtra pcapExtra; PcapExtra pcapExtra;
PortMonitorRx monitorRx; PortMonitorRx monitorRx;

View File

@ -4,6 +4,7 @@
/* NOTE: All code borrowed from WinPcap */ /* NOTE: All code borrowed from WinPcap */
#ifndef Q_OS_WIN32
int pcap_setmode(pcap_t *p, int mode) int pcap_setmode(pcap_t *p, int mode)
{ {
// no STAT mode in libpcap, so just return 0 to indicate success // no STAT mode in libpcap, so just return 0 to indicate success
@ -58,13 +59,36 @@ int pcap_sendqueue_queue (pcap_send_queue *queue,
return 0; return 0;
} }
#endif
u_int pcap_sendqueue_transmit (pcap_t *p, pcap_send_queue *queue, int sync) u_int ost_pcap_sendqueue_list_transmit(pcap_t *p,
QList<ost_pcap_send_queue> sendQueueList, int sync,
int *p_stop, quint64* p_pkts, quint64* p_bytes,
void (*pf_usleep)(ulong))
{
uint ret = 0;
foreach(ost_pcap_send_queue sq, sendQueueList)
{
ret += ost_pcap_sendqueue_transmit(p, sq.sendQueue, sync,
p_stop, p_pkts, p_bytes, pf_usleep);
// TODO(HI): Timing between subsequent sendQueues
}
return ret;
}
u_int ost_pcap_sendqueue_transmit(pcap_t *p,
pcap_send_queue *queue, int sync,
int *p_stop, quint64* p_pkts, quint64* p_bytes,
void (*pf_usleep)(ulong))
{ {
char* PacketBuff = queue->buffer; char* PacketBuff = queue->buffer;
int Size = queue->len; int Size = queue->len;
struct pcap_pkthdr *winpcap_hdr; struct pcap_pkthdr *winpcap_hdr;
struct timeval ts;
char* EndOfUserBuff = (char *)PacketBuff + Size; char* EndOfUserBuff = (char *)PacketBuff + Size;
int ret; int ret;
@ -78,8 +102,14 @@ u_int pcap_sendqueue_transmit (pcap_t *p, pcap_send_queue *queue, int sync)
return 0; return 0;
} }
if (sync)
ts = winpcap_hdr->ts;
while( true ){ while( true ){
if (*p_stop)
return (char*)winpcap_hdr - (char*)PacketBuff;
if(winpcap_hdr->caplen ==0 || winpcap_hdr->caplen > 65536) if(winpcap_hdr->caplen ==0 || winpcap_hdr->caplen > 65536)
{ {
// Malformed header // Malformed header
@ -96,6 +126,9 @@ u_int pcap_sendqueue_transmit (pcap_t *p, pcap_send_queue *queue, int sync)
return (char*)winpcap_hdr - (char*)PacketBuff; return (char*)winpcap_hdr - (char*)PacketBuff;
} }
if (p_pkts) (*p_pkts)++;
if (p_bytes) (*p_bytes) += winpcap_hdr->caplen;
// Step to the next packet in the buffer // Step to the next packet in the buffer
//(char*)winpcap_hdr += winpcap_hdr->caplen + sizeof(struct pcap_pkthdr); //(char*)winpcap_hdr += winpcap_hdr->caplen + sizeof(struct pcap_pkthdr);
winpcap_hdr = (struct pcap_pkthdr*) ((char*)winpcap_hdr + winpcap_hdr = (struct pcap_pkthdr*) ((char*)winpcap_hdr +
@ -106,6 +139,18 @@ u_int pcap_sendqueue_transmit (pcap_t *p, pcap_send_queue *queue, int sync)
{ {
return (char*)winpcap_hdr - (char*)PacketBuff; return (char*)winpcap_hdr - (char*)PacketBuff;
} }
if (sync)
{
long usec = (winpcap_hdr->ts.tv_sec-ts.tv_sec)*1000000 +
(winpcap_hdr->ts.tv_usec - ts.tv_usec);
if (usec)
{
(*pf_usleep)(usec);
ts = winpcap_hdr->ts;
}
}
} }
} }

View File

@ -1,10 +1,35 @@
#ifndef _PCAP_EXTRA_H #ifndef _PCAP_EXTRA_H
#define _PCAP_EXTRA_H #define _PCAP_EXTRA_H
#ifndef Q_OS_WIN32 #include <Qt>
#include <QList>
#include "pcap.h" #include "pcap.h"
struct ost_pcap_send_queue
{
pcap_send_queue *sendQueue;
//! Used to track num of packets (and their sizes) in the
// send queue. Also used to find out actual num of pkts sent
// in case of partial send in pcap_sendqueue_transmit()
QList<uint> sendQueueCumLen;
};
// Common for all OS - *nix or Win32
u_int ost_pcap_sendqueue_list_transmit(pcap_t *p,
QList<ost_pcap_send_queue> sendQueueList, int sync,
int *p_stop, quint64* p_pkts, quint64* p_bytes,
void (*pf_usleep)(ulong));
u_int ost_pcap_sendqueue_transmit (pcap_t *p,
pcap_send_queue *queue, int sync,
int *p_stop, quint64* p_pkts, quint64* p_bytes,
void (*pf_usleep)(ulong));
#ifndef Q_OS_WIN32
// Only for non Win32
#define PCAP_OPENFLAG_PROMISCUOUS 1 #define PCAP_OPENFLAG_PROMISCUOUS 1
struct pcap_send_queue struct pcap_send_queue
@ -21,7 +46,6 @@ pcap_send_queue* pcap_sendqueue_alloc (u_int memsize);
void pcap_sendqueue_destroy (pcap_send_queue *queue); void pcap_sendqueue_destroy (pcap_send_queue *queue);
int pcap_sendqueue_queue (pcap_send_queue *queue, int pcap_sendqueue_queue (pcap_send_queue *queue,
const struct pcap_pkthdr *pkt_header, const u_char *pkt_data); const struct pcap_pkthdr *pkt_header, const u_char *pkt_data);
u_int pcap_sendqueue_transmit (pcap_t *p, pcap_send_queue *queue, int sync);
#endif #endif