Refactored send queue data structure and associated processing

This commit is contained in:
Srivats P. 2011-10-01 12:28:28 +05:30
parent 18e7b9b49c
commit 204e6efc2a
5 changed files with 109 additions and 81 deletions

View File

@ -570,7 +570,8 @@ bool AbstractProtocol::isProtocolFrameSizeVariable() const
}
/*!
Returns the number of frames required for the protocol to vary its fields
Returns the minimum number of frames required for the protocol to
vary its fields
This is the lowest common multiple (LCM) of the counts of all the varying
fields in the protocol. Use the AbstractProtocol::lcm() static utility

View File

@ -454,6 +454,10 @@ int StreamBase::frameVariableCount() const
proto = iter->next();
count = proto->protocolFrameVariableCount();
// correct count for mis-behaving protocols
if (count <= 0)
count = 1;
frameCount = AbstractProtocol::lcm(frameCount, count);
}
delete iter;

View File

@ -161,6 +161,9 @@ void AbstractPort::updatePacketListSequential()
long ipg1 = 0, ipg2 = 0;
long np1 = 0, np2 = 0;
// We derive n, x, y such that
// n * x + y = total number of packets to be sent
switch (streamList_[i]->sendUnit())
{
case OstProto::StreamControl::e_su_bursts:
@ -209,6 +212,8 @@ void AbstractPort::updatePacketListSequential()
if (n > 1)
loopNextPacketSet(x, n);
else if (n == 0)
x = 0;
for (int j = 0; j < (x+y); j++)
{

View File

@ -289,30 +289,28 @@ PcapPort::PortTransmitter::~PortTransmitter()
void PcapPort::PortTransmitter::clearPacketList()
{
Q_ASSERT(!isRunning());
// \todo lock for sendQueueList
while(sendQueueList_.size())
{
pcap_send_queue *sq = sendQueueList_.takeFirst();
pcap_sendqueue_destroy(sq);
}
sendQueueRepeat_.clear();
sendQueueGoto_.clear();
currentSendQueue_ = NULL;
currentPacketSetQIdx_ = -1;
currentPacketSetSize_ = -1;
currentPacketSetRepeat_ = 1;
currentPacketSetCount_ = 0;
// \todo lock for packetSequenceList
while(packetSequenceList_.size())
delete packetSequenceList_.takeFirst();
currentPacketSequence_ = NULL;
repeatSequenceStart_ = -1;
repeatSize_ = 0;
packetCount_ = 0;
setPacketListLoopMode(false, 0, 0);
}
void PcapPort::PortTransmitter::loopNextPacketSet(qint64 size, qint64 repeats)
{
// Trigger a new sendQ alloc on next packet
currentSendQueue_ = NULL;
currentPacketSetQIdx_ = sendQueueList_.size();
currentPacketSetSize_ = size;
currentPacketSetRepeat_ = repeats;
currentPacketSetCount_ = 0;
currentPacketSequence_ = new PacketSequence;
currentPacketSequence_->repeatCount_ = repeats;
repeatSequenceStart_ = packetSequenceList_.size();
repeatSize_ = size;
packetCount_ = 0;
packetSequenceList_.append(currentPacketSequence_);
}
bool PcapPort::PortTransmitter::appendToPacketList(long sec, long nsec,
@ -325,48 +323,51 @@ bool PcapPort::PortTransmitter::appendToPacketList(long sec, long nsec,
pktHdr.ts.tv_sec = sec;
pktHdr.ts.tv_usec = nsec/1000;
if ((currentSendQueue_ == NULL) ||
(currentSendQueue_->len + 2*sizeof(pcap_pkthdr) + length)
> currentSendQueue_->maxlen)
if (currentPacketSequence_ == NULL ||
!currentPacketSequence_->hasFreeSpace(2*sizeof(pcap_pkthdr)+length))
{
// Add a zero len packet at end of currentSendQueue_ for
// inter-sendQ timing
if (sendQueueList_.size())
if (packetSequenceList_.size())
{
pcap_pkthdr hdr = pktHdr;
hdr.caplen = 0;
pcap_sendqueue_queue(sendQueueList_.last(), &hdr, (u_char*)packet);
pcap_sendqueue_queue(packetSequenceList_.last()->sendQueue_,
&hdr, (u_char*)packet);
}
//! \todo (LOW): calculate sendqueue size
currentSendQueue_ = pcap_sendqueue_alloc(1*1024*1024);
currentPacketSequence_ = new PacketSequence;
sendQueueList_.append(currentSendQueue_);
sendQueueRepeat_.append(1);
sendQueueGoto_.append(-1);
packetSequenceList_.append(currentPacketSequence_);
// Validate that the pkt will fit inside the new currentSendQueue_
Q_ASSERT((length + sizeof(pcap_pkthdr)) < currentSendQueue_->maxlen);
Q_ASSERT(currentPacketSequence_->hasFreeSpace(
sizeof(pcap_pkthdr) + length));
}
if (pcap_sendqueue_queue(currentSendQueue_, &pktHdr, (u_char*) packet) < 0)
op = false;
currentPacketSetCount_++;
if (currentPacketSetCount_ == currentPacketSetSize_)
if (pcap_sendqueue_queue(currentPacketSequence_->sendQueue_, &pktHdr,
(u_char*) packet) < 0)
{
qDebug("i=%d, currentPacketSetQIdx_ = %d, currentPacketSetRepeat_ = %d",
sendQueueRepeat_.size(),
currentPacketSetQIdx_,
currentPacketSetRepeat_);
// End current sendQ
sendQueueGoto_.last() = currentPacketSetQIdx_;
sendQueueRepeat_.last() = currentPacketSetRepeat_;
currentPacketSetSize_ = -1;
// Trigger a new sendQ allocation
currentSendQueue_ = NULL;
op = false;
}
packetCount_++;
if (repeatSize_ > 0 && packetCount_ == repeatSize_)
{
qDebug("repeatSequenceStart_=%d, repeatSize_ = %llu",
repeatSequenceStart_, repeatSize_);
// Set the packetSequence repeatSize
Q_ASSERT(repeatSequenceStart_ >= 0);
Q_ASSERT(repeatSequenceStart_ < packetSequenceList_.size());
packetSequenceList_[repeatSequenceStart_]->repeatSize_ =
packetSequenceList_.size() - repeatSequenceStart_;
repeatSize_ = 0;
// End current pktSeq and trigger a new pktSeq allocation for next pkt
currentPacketSequence_ = NULL;
}
return op;
}
@ -407,44 +408,39 @@ void PcapPort::PortTransmitter::run()
int i;
long overHead = 0;
qDebug("sendQueueList_.size = %d", sendQueueList_.size());
if (sendQueueList_.size() <= 0)
qDebug("packetSequenceList_.size = %d", packetSequenceList_.size());
if (packetSequenceList_.size() <= 0)
return;
for(i = 0; i < sendQueueList_.size(); i++) {
qDebug("sendQ[%d]: rpt = %d, goto = %d", i,
sendQueueRepeat_.at(i), sendQueueGoto_.at(i));
for(i = 0; i < packetSequenceList_.size(); i++) {
qDebug("sendQ[%d]: rptCnt = %d, rptSz = %d", i,
packetSequenceList_.at(i)->repeatCount_,
packetSequenceList_.at(i)->repeatSize_);
}
for(i = 0; i < sendQueueList_.size(); i++)
for(i = 0; i < packetSequenceList_.size(); i++)
{
int ret;
_restart:
int idx = sendQueueGoto_.at(i);
int rpt = sendQueueRepeat_.at(i);
int sqi = i;
int rptSz = packetSequenceList_.at(i)->repeatSize_;
int rptCnt = packetSequenceList_.at(i)->repeatCount_;
for (int n = 0; n < rpt; n++)
for (int j = 0; j < rptCnt; j++)
{
Q_ASSERT(sqi <= i);
#if 0
if ((n & 0x3F) == 0)
qDebug("i = %d, n = %d/%d, sqi = %d\n", i, n, rpt, sqi);
#endif
ret = sendQueueTransmit(handle_, sendQueueList_.at(sqi), overHead,
kSyncTransmit);
if (ret < 0)
for (int k = 0; k < rptSz; k++)
{
qDebug("error %d in sendQueueTransmit()", ret);
qDebug("overHead = %ld", overHead);
stop_ = false;
return;
}
int ret = sendQueueTransmit(handle_,
packetSequenceList_.at(i+k)->sendQueue_,
overHead, kSyncTransmit);
sqi = (sqi == i) ? idx : sqi+1;
if (ret < 0)
{
qDebug("error %d in sendQueueTransmit()", ret);
qDebug("overHead = %ld", overHead);
stop_ = false;
return;
}
}
}
}

View File

@ -111,21 +111,43 @@ protected:
void run();
void stop();
private:
class PacketSequence
{
public:
PacketSequence() {
sendQueue_ = pcap_sendqueue_alloc(1*1024*1024);
repeatCount_ = 1;
repeatSize_ = 1;
}
~PacketSequence() {
pcap_sendqueue_destroy(sendQueue_);
}
bool hasFreeSpace(int size) {
if ((sendQueue_->len + size) <= sendQueue_->maxlen)
return true;
else
return false;
}
pcap_send_queue *sendQueue_;
int repeatCount_;
int repeatSize_;
};
void udelay(long usec);
int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, long &overHead,
int sync);
quint64 ticksFreq_;
int currentPacketSetQIdx_;
int currentPacketSetSize_;
int currentPacketSetRepeat_;
int currentPacketSetCount_;
pcap_send_queue* currentSendQueue_;
QList<pcap_send_queue*> sendQueueList_;
QList<int> sendQueueRepeat_;
QList<int> sendQueueGoto_;
QList<PacketSequence*> packetSequenceList_;
PacketSequence *currentPacketSequence_;
int repeatSequenceStart_;
quint64 repeatSize_;
quint64 packetCount_;
int returnToQIdx_;
long loopDelay_;
bool usingInternalStats_;
AbstractPort::PortStats *stats_;
bool usingInternalHandle_;