refactored sendQueue code

This commit is contained in:
Srivats P. 2011-10-08 15:55:57 +05:30
parent 204e6efc2a
commit 0223f39994
5 changed files with 110 additions and 45 deletions

View File

@ -882,9 +882,19 @@ quint64 AbstractProtocol::gcd(quint64 u, quint64 v)
quint64 AbstractProtocol::lcm(quint64 u, quint64 v) quint64 AbstractProtocol::lcm(quint64 u, quint64 v)
{ {
/* FIXME: LCM(0,x) := x */ #if 0
/* LCM(0,x) := x */
if (u == 0 || v == 0) if (u == 0 || v == 0)
return u | v; return u | v;
#else
/* For our use case, neither u nor v can ever be 0, the minimum
value is 1; we do this correction silently here */
if (u == 0) u = 1;
if (v == 0) v = 1;
if (u == 1 || v == 1)
return (u * v);
#endif
return (u * v)/gcd(u, v); return (u * v)/gcd(u, v);
} }

View File

@ -152,14 +152,18 @@ void AbstractPort::updatePacketListSequential()
{ {
if (streamList_[i]->isEnabled()) if (streamList_[i]->isEnabled())
{ {
long n, x, y; int len;
long burstSize; ulong n, x, y;
ulong burstSize;
double ibg = 0; double ibg = 0;
long ibg1 = 0, ibg2 = 0; quint64 ibg1 = 0, ibg2 = 0;
long nb1 = 0, nb2 = 0; quint64 nb1 = 0, nb2 = 0;
double ipg = 0; double ipg = 0;
long ipg1 = 0, ipg2 = 0; quint64 ipg1 = 0, ipg2 = 0;
long np1 = 0, np2 = 0; quint64 npx1 = 0, npx2 = 0;
quint64 npy1 = 0, npy2 = 0;
quint64 loopDelay;
ulong frameVariableCount = streamList_[i]->frameVariableCount();
// We derive n, x, y such that // We derive n, x, y such that
// n * x + y = total number of packets to be sent // n * x + y = total number of packets to be sent
@ -168,32 +172,37 @@ void AbstractPort::updatePacketListSequential()
{ {
case OstProto::StreamControl::e_su_bursts: case OstProto::StreamControl::e_su_bursts:
burstSize = streamList_[i]->burstSize(); burstSize = streamList_[i]->burstSize();
x = AbstractProtocol::lcm(streamList_[i]->frameVariableCount(), x = AbstractProtocol::lcm(frameVariableCount, burstSize);
burstSize); n = ulong(burstSize * streamList_[i]->burstRate()
n = ulong(burstSize * streamList_[i]->burstRate()) / x; * streamList_[i]->numBursts()) / x;
y = ulong(burstSize * streamList_[i]->burstRate()) % x; y = ulong(burstSize * streamList_[i]->burstRate()
* streamList_[i]->numBursts()) % x;
if (streamList_[i]->burstRate() > 0) if (streamList_[i]->burstRate() > 0)
{ {
ibg = 1e9/double(streamList_[i]->burstRate()); ibg = 1e9/double(streamList_[i]->burstRate());
ibg1 = long(ceil(ibg)); ibg1 = quint64(ceil(ibg));
ibg2 = long(floor(ibg)); ibg2 = quint64(floor(ibg));
nb1 = long((ibg - double(ibg2)) * double(x)); nb1 = quint64((ibg - double(ibg2)) * double(x));
nb2 = x - nb1; nb2 = x - nb1;
} }
loopDelay = ibg2;
break; break;
case OstProto::StreamControl::e_su_packets: case OstProto::StreamControl::e_su_packets:
x = streamList_[i]->frameVariableCount(); x = frameVariableCount;
n = streamList_[i]->numPackets() / x; n = streamList_[i]->numPackets() / x;
y = streamList_[i]->numPackets() % x; y = streamList_[i]->numPackets() % x;
burstSize = x + y; burstSize = x + y;
if (streamList_[i]->packetRate() > 0) if (streamList_[i]->packetRate() > 0)
{ {
ipg = 1e9/double(streamList_[i]->packetRate()); ipg = 1e9/double(streamList_[i]->packetRate());
ipg1 = long(ceil(ipg)); ipg1 = quint64(ceil(ipg));
ipg2 = long(floor(ipg)); ipg2 = quint64(floor(ipg));
np1 = long((ipg - double(ipg2)) * double(x)); npx1 = quint64((ipg - double(ipg2)) * double(x));
np2 = x - np1; npx2 = x - npx1;
npy1 = quint64((ipg - double(ipg2)) * double(y));
npy2 = y - npy1;
} }
loopDelay = ipg2;
break; break;
default: default:
qWarning("Unhandled stream control unit %d", qWarning("Unhandled stream control unit %d",
@ -201,25 +210,37 @@ void AbstractPort::updatePacketListSequential()
continue; continue;
} }
qDebug("\nframeVariableCount = %d", qDebug("\nframeVariableCount = %lu", frameVariableCount);
streamList_[i]->frameVariableCount()); qDebug("n = %lu, x = %lu, y = %lu, burstSize = %lu",
qDebug("n = %ld, x = %ld, y = %ld, burstSize = %ld",
n, x, y, burstSize); n, x, y, burstSize);
qDebug("ibg = %g, ibg1/nb1 = %ld/%ld ibg2/nb2 = %ld/%ld",
ibg, ibg1, nb1, ibg2, nb2); qDebug("ibg = %g", ibg);
qDebug("ipg = %g, ipg1/np1 = %ld/%ld ipg2/np2 = %ld/%ld\n", qDebug("ibg1 = %" PRIu64, ibg1);
ipg, ipg1, np1, ipg2, np2); qDebug("nb1 = %" PRIu64, nb1);
qDebug("ibg2 = %" PRIu64, ibg2);
qDebug("nb2 = %" PRIu64 "\n", nb2);
qDebug("ipg = %g", ipg);
qDebug("ipg1 = %" PRIu64, ipg1);
qDebug("npx1 = %" PRIu64, npx1);
qDebug("npy1 = %" PRIu64, npy1);
qDebug("ipg2 = %" PRIu64, ipg2);
qDebug("npx2 = %" PRIu64, npx2);
qDebug("npy2 = %" PRIu64 "\n", npy2);
if (n > 1) if (n > 1)
loopNextPacketSet(x, n); loopNextPacketSet(x, n, 0, loopDelay);
else if (n == 0) else if (n == 0)
x = 0; x = 0;
for (int j = 0; j < (x+y); j++) for (uint j = 0; j < (x+y); j++)
{ {
int len;
len = streamList_[i]->frameValue(pktBuf_, sizeof(pktBuf_), j); if (j == 0 || frameVariableCount > 1)
{
len = streamList_[i]->frameValue(
pktBuf_, sizeof(pktBuf_), j);
}
if (len <= 0) if (len <= 0)
continue; continue;
@ -228,10 +249,10 @@ void AbstractPort::updatePacketListSequential()
appendToPacketList(sec, nsec, pktBuf_, len); appendToPacketList(sec, nsec, pktBuf_, len);
if ((j % burstSize) == 0) if ((j > 0) && (((j+1) % burstSize) == 0))
{ {
nsec += (j < nb1) ? ibg1 : ibg2; nsec += (j < nb1) ? ibg1 : ibg2;
while (nsec >= 1e9) while (nsec >= long(1e9))
{ {
sec++; sec++;
nsec -= long(1e9); nsec -= long(1e9);
@ -239,8 +260,12 @@ void AbstractPort::updatePacketListSequential()
} }
else else
{ {
nsec += (j < np1) ? ipg1 : ipg2; if (j < x)
while (nsec >= 1e9) nsec += (j < npx1) ? ipg1 : ipg2;
else
nsec += ((j-x) < npy1) ? ipg1 : ipg2;
while (nsec >= long(1e9))
{ {
sec++; sec++;
nsec -= long(1e9); nsec -= long(1e9);

View File

@ -71,7 +71,8 @@ public:
void setDirty() { isSendQueueDirty_ = true; } void setDirty() { isSendQueueDirty_ = true; }
virtual void clearPacketList() = 0; virtual void clearPacketList() = 0;
virtual void loopNextPacketSet(qint64 size, qint64 repeats) = 0; virtual void loopNextPacketSet(qint64 size, qint64 repeats,
long repeatDelaySec, long repeatDelayNsec) = 0;
virtual bool appendToPacketList(long sec, long nsec, const uchar *packet, virtual bool appendToPacketList(long sec, long nsec, const uchar *packet,
int length) = 0; int length) = 0;
virtual void setPacketListLoopMode(bool loop, virtual void setPacketListLoopMode(bool loop,

View File

@ -301,10 +301,13 @@ void PcapPort::PortTransmitter::clearPacketList()
setPacketListLoopMode(false, 0, 0); setPacketListLoopMode(false, 0, 0);
} }
void PcapPort::PortTransmitter::loopNextPacketSet(qint64 size, qint64 repeats) void PcapPort::PortTransmitter::loopNextPacketSet(qint64 size, qint64 repeats,
long repeatDelaySec, long repeatDelayNsec)
{ {
currentPacketSequence_ = new PacketSequence; currentPacketSequence_ = new PacketSequence;
currentPacketSequence_->repeatCount_ = repeats; currentPacketSequence_->repeatCount_ = repeats;
currentPacketSequence_->usecDelay_ = repeatDelaySec * long(1e6)
+ repeatDelayNsec/1000;
repeatSequenceStart_ = packetSequenceList_.size(); repeatSequenceStart_ = packetSequenceList_.size();
repeatSize_ = size; repeatSize_ = size;
@ -328,7 +331,7 @@ bool PcapPort::PortTransmitter::appendToPacketList(long sec, long nsec,
{ {
// Add a zero len packet at end of currentSendQueue_ for // Add a zero len packet at end of currentSendQueue_ for
// inter-sendQ timing // inter-sendQ timing
if (packetSequenceList_.size()) if (currentPacketSequence_ != NULL)
{ {
pcap_pkthdr hdr = pktHdr; pcap_pkthdr hdr = pktHdr;
hdr.caplen = 0; hdr.caplen = 0;
@ -360,8 +363,16 @@ bool PcapPort::PortTransmitter::appendToPacketList(long sec, long nsec,
// Set the packetSequence repeatSize // Set the packetSequence repeatSize
Q_ASSERT(repeatSequenceStart_ >= 0); Q_ASSERT(repeatSequenceStart_ >= 0);
Q_ASSERT(repeatSequenceStart_ < packetSequenceList_.size()); Q_ASSERT(repeatSequenceStart_ < packetSequenceList_.size());
packetSequenceList_[repeatSequenceStart_]->repeatSize_ =
packetSequenceList_.size() - repeatSequenceStart_; if (currentPacketSequence_ != packetSequenceList_[repeatSequenceStart_])
{
PacketSequence *start = packetSequenceList_[repeatSequenceStart_];
currentPacketSequence_->usecDelay_ = start->usecDelay_;
start->usecDelay_ = 0;
start->repeatSize_ =
packetSequenceList_.size() - repeatSequenceStart_;
}
repeatSize_ = 0; repeatSize_ = 0;
@ -413,9 +424,10 @@ void PcapPort::PortTransmitter::run()
return; return;
for(i = 0; i < packetSequenceList_.size(); i++) { for(i = 0; i < packetSequenceList_.size(); i++) {
qDebug("sendQ[%d]: rptCnt = %d, rptSz = %d", i, qDebug("sendQ[%d]: rptCnt = %d, rptSz = %d usecDelay = %ld", i,
packetSequenceList_.at(i)->repeatCount_, packetSequenceList_.at(i)->repeatCount_,
packetSequenceList_.at(i)->repeatSize_); packetSequenceList_.at(i)->repeatSize_,
packetSequenceList_.at(i)->usecDelay_);
} }
for(i = 0; i < packetSequenceList_.size(); i++) for(i = 0; i < packetSequenceList_.size(); i++)
@ -433,7 +445,19 @@ _restart:
packetSequenceList_.at(i+k)->sendQueue_, packetSequenceList_.at(i+k)->sendQueue_,
overHead, kSyncTransmit); overHead, kSyncTransmit);
if (ret < 0) if (ret >= 0)
{
long usecs = packetSequenceList_.at(i+k)->usecDelay_
+ overHead;
if (usecs)
{
udelay(usecs);
overHead = 0;
}
else
overHead = usecs;
}
else
{ {
qDebug("error %d in sendQueueTransmit()", ret); qDebug("error %d in sendQueueTransmit()", ret);
qDebug("overHead = %ld", overHead); qDebug("overHead = %ld", overHead);

View File

@ -42,8 +42,10 @@ public:
transmitter_->clearPacketList(); transmitter_->clearPacketList();
setPacketListLoopMode(false, 0, 0); setPacketListLoopMode(false, 0, 0);
} }
virtual void loopNextPacketSet(qint64 size, qint64 repeats) { virtual void loopNextPacketSet(qint64 size, qint64 repeats,
transmitter_->loopNextPacketSet(size, repeats); long repeatDelaySec, long repeatDelayNsec) {
transmitter_->loopNextPacketSet(size, repeats,
repeatDelaySec, repeatDelayNsec);
} }
virtual bool appendToPacketList(long sec, long nsec, const uchar *packet, virtual bool appendToPacketList(long sec, long nsec, const uchar *packet,
int length) { int length) {
@ -99,7 +101,8 @@ protected:
PortTransmitter(const char *device); PortTransmitter(const char *device);
~PortTransmitter(); ~PortTransmitter();
void clearPacketList(); void clearPacketList();
void loopNextPacketSet(qint64 size, qint64 repeats); void loopNextPacketSet(qint64 size, qint64 repeats,
long repeatDelaySec, long repeatDelayNsec);
bool appendToPacketList(long sec, long usec, const uchar *packet, bool appendToPacketList(long sec, long usec, const uchar *packet,
int length); int length);
void setPacketListLoopMode(bool loop, long secDelay, long nsecDelay) { void setPacketListLoopMode(bool loop, long secDelay, long nsecDelay) {
@ -119,6 +122,7 @@ protected:
sendQueue_ = pcap_sendqueue_alloc(1*1024*1024); sendQueue_ = pcap_sendqueue_alloc(1*1024*1024);
repeatCount_ = 1; repeatCount_ = 1;
repeatSize_ = 1; repeatSize_ = 1;
usecDelay_ = 0;
} }
~PacketSequence() { ~PacketSequence() {
pcap_sendqueue_destroy(sendQueue_); pcap_sendqueue_destroy(sendQueue_);
@ -132,6 +136,7 @@ protected:
pcap_send_queue *sendQueue_; pcap_send_queue *sendQueue_;
int repeatCount_; int repeatCount_;
int repeatSize_; int repeatSize_;
long usecDelay_;
}; };
void udelay(long usec); void udelay(long usec);