Snapshot before refactoring the send queue data structure

This commit is contained in:
Srivats P. 2011-09-26 20:47:27 +05:30
parent ca7a264b36
commit 18e7b9b49c
10 changed files with 244 additions and 69 deletions

View File

@ -569,6 +569,23 @@ bool AbstractProtocol::isProtocolFrameSizeVariable() const
return false; return false;
} }
/*!
Returns the number of frames required for the protocol to vary its fields
This is the lowest common multiple (LCM) of the counts of all the varying
fields in the protocol. Use the AbstractProtocol::lcm() static utility
function to calculate the LCM.
The default implementation returns 1 implying that the protocol has no
varying fields. A subclass should reimplement if it has varying fields
e.g. an IP protocol that increments/decrements the IP address with
every packet
*/
int AbstractProtocol::protocolFrameVariableCount() const
{
return 1;
}
/*! /*!
Returns true if the payload content for a protocol varies at run-time, Returns true if the payload content for a protocol varies at run-time,
false otherwise false otherwise
@ -823,3 +840,51 @@ out:
this function. See the SampleProtocol for an example this function. See the SampleProtocol for an example
*/ */
// Stein's binary GCD algo - from wikipedia
quint64 AbstractProtocol::gcd(quint64 u, quint64 v)
{
int shift;
/* GCD(0,x) := x */
if (u == 0 || v == 0)
return u | v;
/* Let shift := lg K, where K is the greatest power of 2
dividing both u and v. */
for (shift = 0; ((u | v) & 1) == 0; ++shift) {
u >>= 1;
v >>= 1;
}
while ((u & 1) == 0)
u >>= 1;
/* From here on, u is always odd. */
do {
while ((v & 1) == 0) /* Loop X */
v >>= 1;
/* Now u and v are both odd, so diff(u, v) is even.
Let u = min(u, v), v = diff(u, v)/2. */
if (u < v) {
v -= u;
} else {
quint64 diff = u - v;
u = v;
v = diff;
}
v >>= 1;
} while (v != 0);
return u << shift;
}
quint64 AbstractProtocol::lcm(quint64 u, quint64 v)
{
/* FIXME: LCM(0,x) := x */
if (u == 0 || v == 0)
return u | v;
return (u * v)/gcd(u, v);
}

View File

@ -139,6 +139,7 @@ public:
virtual bool isProtocolFrameValueVariable() const; virtual bool isProtocolFrameValueVariable() const;
virtual bool isProtocolFrameSizeVariable() const; virtual bool isProtocolFrameSizeVariable() const;
virtual int protocolFrameVariableCount() const;
bool isProtocolFramePayloadValueVariable() const; bool isProtocolFramePayloadValueVariable() const;
bool isProtocolFramePayloadSizeVariable() const; bool isProtocolFramePayloadSizeVariable() const;
@ -156,6 +157,9 @@ public:
virtual QWidget* configWidget() = 0; virtual QWidget* configWidget() = 0;
virtual void loadConfigWidget() = 0; virtual void loadConfigWidget() = 0;
virtual void storeConfigWidget() = 0; virtual void storeConfigWidget() = 0;
static quint64 lcm(quint64 u, quint64 v);
static quint64 gcd(quint64 u, quint64 v);
}; };
Q_DECLARE_OPERATORS_FOR_FLAGS(AbstractProtocol::FieldFlags); Q_DECLARE_OPERATORS_FOR_FLAGS(AbstractProtocol::FieldFlags);

View File

@ -611,6 +611,19 @@ bool Ip4Protocol::isProtocolFrameValueVariable() const
return false; return false;
} }
int Ip4Protocol::protocolFrameVariableCount() const
{
int count = 1;
if (data.src_ip_mode() != OstProto::Ip4::e_im_fixed)
count = AbstractProtocol::lcm(count, data.src_ip_count());
if (data.dst_ip_mode() != OstProto::Ip4::e_im_fixed)
count = AbstractProtocol::lcm(count, data.dst_ip_count());
return count;
}
quint32 Ip4Protocol::protocolFrameCksum(int streamIndex, quint32 Ip4Protocol::protocolFrameCksum(int streamIndex,
CksumType cksumType) const CksumType cksumType) const
{ {

View File

@ -102,6 +102,7 @@ public:
FieldAttrib attrib = FieldValue); FieldAttrib attrib = FieldValue);
virtual bool isProtocolFrameValueVariable() const; virtual bool isProtocolFrameValueVariable() const;
virtual int protocolFrameVariableCount() const;
virtual quint32 protocolFrameCksum(int streamIndex = 0, virtual quint32 protocolFrameCksum(int streamIndex = 0,
CksumType cksumType = CksumIp) const; CksumType cksumType = CksumIp) const;

View File

@ -440,6 +440,27 @@ _exit:
return true; return true;
} }
int StreamBase::frameVariableCount() const
{
ProtocolListIterator *iter;
quint64 frameCount = 1;
iter = createProtocolListIterator();
while (iter->hasNext())
{
AbstractProtocol *proto;
int count;
proto = iter->next();
count = proto->protocolFrameVariableCount();
frameCount = AbstractProtocol::lcm(frameCount, count);
}
delete iter;
return frameCount;
}
// frameProtocolLength() returns the sum of all the individual protocol sizes // frameProtocolLength() returns the sum of all the individual protocol sizes
// which may be different from frameLen() // which may be different from frameLen()
int StreamBase::frameProtocolLength(int frameIndex) const int StreamBase::frameProtocolLength(int frameIndex) const

View File

@ -138,6 +138,7 @@ public:
bool isFrameVariable() const; bool isFrameVariable() const;
bool isFrameSizeVariable() const; bool isFrameSizeVariable() const;
int frameVariableCount() const;
int frameProtocolLength(int frameIndex) const; int frameProtocolLength(int frameIndex) const;
int frameCount() const; int frameCount() const;
int frameValue(uchar *buf, int bufMaxSize, int frameIndex) const; int frameValue(uchar *buf, int bufMaxSize, int frameIndex) const;

View File

@ -21,10 +21,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "abstractport.h" #include "abstractport.h"
#include "../common/streambase.h"
#include "../common/abstractprotocol.h"
#include <QString> #include <QString>
#include <QIODevice> #include <QIODevice>
#include "../common/streambase.h"
#include <inttypes.h> #include <inttypes.h>
#include <limits.h> #include <limits.h>
#include <math.h> #include <math.h>
@ -136,8 +138,6 @@ void AbstractPort::updatePacketList()
void AbstractPort::updatePacketListSequential() void AbstractPort::updatePacketListSequential()
{ {
int len;
bool isVariable;
long sec = 0; long sec = 0;
long nsec = 0; long nsec = 0;
@ -152,7 +152,8 @@ void AbstractPort::updatePacketListSequential()
{ {
if (streamList_[i]->isEnabled()) if (streamList_[i]->isEnabled())
{ {
long numPackets, numBursts; long n, x, y;
long burstSize;
double ibg = 0; double ibg = 0;
long ibg1 = 0, ibg2 = 0; long ibg1 = 0, ibg2 = 0;
long nb1 = 0, nb2 = 0; long nb1 = 0, nb2 = 0;
@ -163,27 +164,32 @@ void AbstractPort::updatePacketListSequential()
switch (streamList_[i]->sendUnit()) switch (streamList_[i]->sendUnit())
{ {
case OstProto::StreamControl::e_su_bursts: case OstProto::StreamControl::e_su_bursts:
numBursts = streamList_[i]->numBursts(); burstSize = streamList_[i]->burstSize();
numPackets = streamList_[i]->burstSize(); x = AbstractProtocol::lcm(streamList_[i]->frameVariableCount(),
burstSize);
n = ulong(burstSize * streamList_[i]->burstRate()) / x;
y = ulong(burstSize * streamList_[i]->burstRate()) % x;
if (streamList_[i]->burstRate() > 0) if (streamList_[i]->burstRate() > 0)
{ {
ibg = 1e9/double(streamList_[i]->burstRate()); ibg = 1e9/double(streamList_[i]->burstRate());
ibg1 = long(ceil(ibg)); ibg1 = long(ceil(ibg));
ibg2 = long(floor(ibg)); ibg2 = long(floor(ibg));
nb1 = long((ibg - double(ibg2)) * double(numBursts)); nb1 = long((ibg - double(ibg2)) * double(x));
nb2= numBursts - nb1; nb2 = x - nb1;
} }
break; break;
case OstProto::StreamControl::e_su_packets: case OstProto::StreamControl::e_su_packets:
numBursts = 1; x = streamList_[i]->frameVariableCount();
numPackets = streamList_[i]->numPackets(); n = streamList_[i]->numPackets() / x;
y = streamList_[i]->numPackets() % x;
burstSize = x + y;
if (streamList_[i]->packetRate() > 0) if (streamList_[i]->packetRate() > 0)
{ {
ipg = 1e9/double(streamList_[i]->packetRate()); ipg = 1e9/double(streamList_[i]->packetRate());
ipg1 = long(ceil(ipg)); ipg1 = long(ceil(ipg));
ipg2 = long(floor(ipg)); ipg2 = long(floor(ipg));
np1 = long((ipg - double(ipg2)) * double(numPackets)); np1 = long((ipg - double(ipg2)) * double(x));
np2= numPackets - np1; np2 = x - np1;
} }
break; break;
default: default:
@ -191,56 +197,51 @@ void AbstractPort::updatePacketListSequential()
streamList_[i]->sendUnit()); streamList_[i]->sendUnit());
continue; continue;
} }
qDebug("numBursts = %ld, numPackets = %ld\n",
numBursts, numPackets); qDebug("\nframeVariableCount = %d",
qDebug("ibg = %g, ibg1/nb1 = %ld/%ld ibg2/nb2 = %ld/%ld\n", streamList_[i]->frameVariableCount());
qDebug("n = %ld, x = %ld, y = %ld, burstSize = %ld",
n, x, y, burstSize);
qDebug("ibg = %g, ibg1/nb1 = %ld/%ld ibg2/nb2 = %ld/%ld",
ibg, ibg1, nb1, ibg2, nb2); ibg, ibg1, nb1, ibg2, nb2);
qDebug("ipg = %g, ipg1/np1 = %ld/%ld ipg2/np2 = %ld/%ld\n", qDebug("ipg = %g, ipg1/np1 = %ld/%ld ipg2/np2 = %ld/%ld\n",
ipg, ipg1, np1, ipg2, np2); ipg, ipg1, np1, ipg2, np2);
if (streamList_[i]->isFrameVariable()) if (n > 1)
{ loopNextPacketSet(x, n);
isVariable = true;
len = 0; // avoid compiler warning; get len value for each pkt
}
else
{
isVariable = false;
len = streamList_[i]->frameValue(pktBuf_, sizeof(pktBuf_), 0);
}
for (int j = 0; j < numBursts; j++) for (int j = 0; j < (x+y); j++)
{ {
for (int k = 0; k < numPackets; k++) int len;
{
if (isVariable) len = streamList_[i]->frameValue(pktBuf_, sizeof(pktBuf_), j);
{
len = streamList_[i]->frameValue(pktBuf_,
sizeof(pktBuf_), j * numPackets + k);
}
if (len <= 0) if (len <= 0)
continue; continue;
qDebug("q(%d, %d, %d) sec = %lu nsec = %lu", qDebug("q(%d, %d) sec = %lu nsec = %lu",
i, j, k, sec, nsec); i, j, sec, nsec);
appendToPacketList(sec, nsec, pktBuf_, len); appendToPacketList(sec, nsec, pktBuf_, len);
nsec += (k < np1) ? ipg1 : ipg2; if ((j % burstSize) == 0)
while (nsec >= 1e9)
{ {
sec++;
nsec -= long(1e9);
}
} // for (numPackets)
nsec += (j < nb1) ? ibg1 : ibg2; nsec += (j < nb1) ? ibg1 : ibg2;
while (nsec >= 1e9) while (nsec >= 1e9)
{ {
sec++; sec++;
nsec -= long(1e9); nsec -= long(1e9);
} }
} // for (numBursts) }
else
{
nsec += (j < np1) ? ipg1 : ipg2;
while (nsec >= 1e9)
{
sec++;
nsec -= long(1e9);
}
}
}
switch(streamList_[i]->nextWhat()) switch(streamList_[i]->nextWhat())
{ {

View File

@ -71,6 +71,7 @@ public:
void setDirty() { isSendQueueDirty_ = true; } void setDirty() { isSendQueueDirty_ = true; }
virtual void clearPacketList() = 0; virtual void clearPacketList() = 0;
virtual void loopNextPacketSet(qint64 size, qint64 repeats) = 0;
virtual bool appendToPacketList(long sec, long nsec, const uchar *packet, virtual bool appendToPacketList(long sec, long nsec, const uchar *packet,
int length) = 0; int length) = 0;
virtual void setPacketListLoopMode(bool loop, virtual void setPacketListLoopMode(bool loop,

View File

@ -295,43 +295,79 @@ void PcapPort::PortTransmitter::clearPacketList()
pcap_send_queue *sq = sendQueueList_.takeFirst(); pcap_send_queue *sq = sendQueueList_.takeFirst();
pcap_sendqueue_destroy(sq); pcap_sendqueue_destroy(sq);
} }
sendQueueRepeat_.clear();
sendQueueGoto_.clear();
currentSendQueue_ = NULL;
currentPacketSetQIdx_ = -1;
currentPacketSetSize_ = -1;
currentPacketSetRepeat_ = 1;
currentPacketSetCount_ = 0;
setPacketListLoopMode(false, 0, 0); setPacketListLoopMode(false, 0, 0);
} }
void PcapPort::PortTransmitter::loopNextPacketSet(qint64 size, qint64 repeats)
{
// Trigger a new sendQ alloc on next packet
currentSendQueue_ = NULL;
currentPacketSetQIdx_ = sendQueueList_.size();
currentPacketSetSize_ = size;
currentPacketSetRepeat_ = repeats;
currentPacketSetCount_ = 0;
}
bool PcapPort::PortTransmitter::appendToPacketList(long sec, long nsec, bool PcapPort::PortTransmitter::appendToPacketList(long sec, long nsec,
const uchar *packet, int length) const uchar *packet, int length)
{ {
bool op = true; bool op = true;
pcap_pkthdr pktHdr; pcap_pkthdr pktHdr;
pcap_send_queue *sendQ;
pktHdr.caplen = pktHdr.len = length; pktHdr.caplen = pktHdr.len = length;
pktHdr.ts.tv_sec = sec; pktHdr.ts.tv_sec = sec;
pktHdr.ts.tv_usec = nsec/1000; pktHdr.ts.tv_usec = nsec/1000;
sendQ = sendQueueList_.isEmpty() ? NULL : sendQueueList_.last(); if ((currentSendQueue_ == NULL) ||
(currentSendQueue_->len + 2*sizeof(pcap_pkthdr) + length)
if ((sendQ == NULL) || > currentSendQueue_->maxlen)
(sendQ->len + 2*sizeof(pcap_pkthdr) + length) > sendQ->maxlen)
{ {
// Add a zero len packet at end of sendQ for inter-sendQ timing // Add a zero len packet at end of currentSendQueue_ for
if (sendQ) // inter-sendQ timing
if (sendQueueList_.size())
{ {
pcap_pkthdr hdr = pktHdr; pcap_pkthdr hdr = pktHdr;
hdr.caplen = 0; hdr.caplen = 0;
pcap_sendqueue_queue(sendQ, &hdr, (u_char*)packet); pcap_sendqueue_queue(sendQueueList_.last(), &hdr, (u_char*)packet);
} }
//! \todo (LOW): calculate sendqueue size //! \todo (LOW): calculate sendqueue size
sendQ = pcap_sendqueue_alloc(1*1024*1024); currentSendQueue_ = pcap_sendqueue_alloc(1*1024*1024);
sendQueueList_.append(sendQ);
// Validate that the pkt will fit inside the new sendQ sendQueueList_.append(currentSendQueue_);
Q_ASSERT((length + sizeof(pcap_pkthdr)) < sendQ->maxlen); sendQueueRepeat_.append(1);
sendQueueGoto_.append(-1);
// Validate that the pkt will fit inside the new currentSendQueue_
Q_ASSERT((length + sizeof(pcap_pkthdr)) < currentSendQueue_->maxlen);
} }
if (pcap_sendqueue_queue(sendQ, &pktHdr, (u_char*) packet) < 0) if (pcap_sendqueue_queue(currentSendQueue_, &pktHdr, (u_char*) packet) < 0)
op = false; op = false;
currentPacketSetCount_++;
if (currentPacketSetCount_ == currentPacketSetSize_)
{
qDebug("i=%d, currentPacketSetQIdx_ = %d, currentPacketSetRepeat_ = %d",
sendQueueRepeat_.size(),
currentPacketSetQIdx_,
currentPacketSetRepeat_);
// End current sendQ
sendQueueGoto_.last() = currentPacketSetQIdx_;
sendQueueRepeat_.last() = currentPacketSetRepeat_;
currentPacketSetSize_ = -1;
// Trigger a new sendQ allocation
currentSendQueue_ = NULL;
}
return op; return op;
} }
@ -375,11 +411,29 @@ void PcapPort::PortTransmitter::run()
if (sendQueueList_.size() <= 0) if (sendQueueList_.size() <= 0)
return; return;
for(i = 0; i < sendQueueList_.size(); i++) {
qDebug("sendQ[%d]: rpt = %d, goto = %d", i,
sendQueueRepeat_.at(i), sendQueueGoto_.at(i));
}
for(i = 0; i < sendQueueList_.size(); i++) for(i = 0; i < sendQueueList_.size(); i++)
{ {
int ret; int ret;
_restart: _restart:
ret = sendQueueTransmit(handle_, sendQueueList_.at(i), overHead, int idx = sendQueueGoto_.at(i);
int rpt = sendQueueRepeat_.at(i);
int sqi = i;
for (int n = 0; n < rpt; n++)
{
Q_ASSERT(sqi <= i);
#if 0
if ((n & 0x3F) == 0)
qDebug("i = %d, n = %d/%d, sqi = %d\n", i, n, rpt, sqi);
#endif
ret = sendQueueTransmit(handle_, sendQueueList_.at(sqi), overHead,
kSyncTransmit); kSyncTransmit);
if (ret < 0) if (ret < 0)
@ -389,6 +443,9 @@ _restart:
stop_ = false; stop_ = false;
return; return;
} }
sqi = (sqi == i) ? idx : sqi+1;
}
} }
if (returnToQIdx_ >= 0) if (returnToQIdx_ >= 0)

View File

@ -42,6 +42,9 @@ public:
transmitter_->clearPacketList(); transmitter_->clearPacketList();
setPacketListLoopMode(false, 0, 0); setPacketListLoopMode(false, 0, 0);
} }
virtual void loopNextPacketSet(qint64 size, qint64 repeats) {
transmitter_->loopNextPacketSet(size, repeats);
}
virtual bool appendToPacketList(long sec, long nsec, const uchar *packet, virtual bool appendToPacketList(long sec, long nsec, const uchar *packet,
int length) { int length) {
return transmitter_->appendToPacketList(sec, nsec, packet, length); return transmitter_->appendToPacketList(sec, nsec, packet, length);
@ -96,6 +99,7 @@ protected:
PortTransmitter(const char *device); PortTransmitter(const char *device);
~PortTransmitter(); ~PortTransmitter();
void clearPacketList(); void clearPacketList();
void loopNextPacketSet(qint64 size, qint64 repeats);
bool appendToPacketList(long sec, long usec, const uchar *packet, bool appendToPacketList(long sec, long usec, const uchar *packet,
int length); int length);
void setPacketListLoopMode(bool loop, long secDelay, long nsecDelay) { void setPacketListLoopMode(bool loop, long secDelay, long nsecDelay) {
@ -112,7 +116,14 @@ protected:
int sync); int sync);
quint64 ticksFreq_; quint64 ticksFreq_;
int currentPacketSetQIdx_;
int currentPacketSetSize_;
int currentPacketSetRepeat_;
int currentPacketSetCount_;
pcap_send_queue* currentSendQueue_;
QList<pcap_send_queue*> sendQueueList_; QList<pcap_send_queue*> sendQueueList_;
QList<int> sendQueueRepeat_;
QList<int> sendQueueGoto_;
int returnToQIdx_; int returnToQIdx_;
long loopDelay_; long loopDelay_;
bool usingInternalStats_; bool usingInternalStats_;