From 18e7b9b49c93176f8a893cc185859f0faca455a8 Mon Sep 17 00:00:00 2001 From: "Srivats P." Date: Mon, 26 Sep 2011 20:47:27 +0530 Subject: [PATCH] Snapshot before refactoring the send queue data structure --- common/abstractprotocol.cpp | 65 ++++++++++++++++++++++++ common/abstractprotocol.h | 4 ++ common/ip4.cpp | 13 +++++ common/ip4.h | 1 + common/streambase.cpp | 21 ++++++++ common/streambase.h | 1 + server/abstractport.cpp | 99 +++++++++++++++++++------------------ server/abstractport.h | 1 + server/pcapport.cpp | 97 ++++++++++++++++++++++++++++-------- server/pcapport.h | 11 +++++ 10 files changed, 244 insertions(+), 69 deletions(-) diff --git a/common/abstractprotocol.cpp b/common/abstractprotocol.cpp index ef80783..7aa64ac 100644 --- a/common/abstractprotocol.cpp +++ b/common/abstractprotocol.cpp @@ -569,6 +569,23 @@ bool AbstractProtocol::isProtocolFrameSizeVariable() const return false; } +/*! + Returns the number of frames required for the protocol to vary its fields + + This is the lowest common multiple (LCM) of the counts of all the varying + fields in the protocol. Use the AbstractProtocol::lcm() static utility + function to calculate the LCM. + + The default implementation returns 1 implying that the protocol has no + varying fields. A subclass should reimplement if it has varying fields + e.g. an IP protocol that increments/decrements the IP address with + every packet +*/ +int AbstractProtocol::protocolFrameVariableCount() const +{ + return 1; +} + /*! Returns true if the payload content for a protocol varies at run-time, false otherwise @@ -823,3 +840,51 @@ out: this function. See the SampleProtocol for an example */ +// Stein's binary GCD algo - from wikipedia +quint64 AbstractProtocol::gcd(quint64 u, quint64 v) +{ + int shift; + + /* GCD(0,x) := x */ + if (u == 0 || v == 0) + return u | v; + + /* Let shift := lg K, where K is the greatest power of 2 + dividing both u and v. */ + for (shift = 0; ((u | v) & 1) == 0; ++shift) { + u >>= 1; + v >>= 1; + } + + while ((u & 1) == 0) + u >>= 1; + + /* From here on, u is always odd. */ + do { + while ((v & 1) == 0) /* Loop X */ + v >>= 1; + + /* Now u and v are both odd, so diff(u, v) is even. + Let u = min(u, v), v = diff(u, v)/2. */ + if (u < v) { + v -= u; + } else { + quint64 diff = u - v; + u = v; + v = diff; + } + v >>= 1; + } while (v != 0); + + return u << shift; +} + +quint64 AbstractProtocol::lcm(quint64 u, quint64 v) +{ + /* FIXME: LCM(0,x) := x */ + if (u == 0 || v == 0) + return u | v; + + return (u * v)/gcd(u, v); +} + diff --git a/common/abstractprotocol.h b/common/abstractprotocol.h index 07350cc..eb6dd81 100644 --- a/common/abstractprotocol.h +++ b/common/abstractprotocol.h @@ -139,6 +139,7 @@ public: virtual bool isProtocolFrameValueVariable() const; virtual bool isProtocolFrameSizeVariable() const; + virtual int protocolFrameVariableCount() const; bool isProtocolFramePayloadValueVariable() const; bool isProtocolFramePayloadSizeVariable() const; @@ -156,6 +157,9 @@ public: virtual QWidget* configWidget() = 0; virtual void loadConfigWidget() = 0; virtual void storeConfigWidget() = 0; + + static quint64 lcm(quint64 u, quint64 v); + static quint64 gcd(quint64 u, quint64 v); }; Q_DECLARE_OPERATORS_FOR_FLAGS(AbstractProtocol::FieldFlags); diff --git a/common/ip4.cpp b/common/ip4.cpp index 197cc8e..a7d6ef7 100644 --- a/common/ip4.cpp +++ b/common/ip4.cpp @@ -611,6 +611,19 @@ bool Ip4Protocol::isProtocolFrameValueVariable() const return false; } +int Ip4Protocol::protocolFrameVariableCount() const +{ + int count = 1; + + if (data.src_ip_mode() != OstProto::Ip4::e_im_fixed) + count = AbstractProtocol::lcm(count, data.src_ip_count()); + + if (data.dst_ip_mode() != OstProto::Ip4::e_im_fixed) + count = AbstractProtocol::lcm(count, data.dst_ip_count()); + + return count; +} + quint32 Ip4Protocol::protocolFrameCksum(int streamIndex, CksumType cksumType) const { diff --git a/common/ip4.h b/common/ip4.h index 006d9ca..6f89b09 100644 --- a/common/ip4.h +++ b/common/ip4.h @@ -102,6 +102,7 @@ public: FieldAttrib attrib = FieldValue); virtual bool isProtocolFrameValueVariable() const; + virtual int protocolFrameVariableCount() const; virtual quint32 protocolFrameCksum(int streamIndex = 0, CksumType cksumType = CksumIp) const; diff --git a/common/streambase.cpp b/common/streambase.cpp index 93d82d1..8c049e2 100644 --- a/common/streambase.cpp +++ b/common/streambase.cpp @@ -440,6 +440,27 @@ _exit: return true; } +int StreamBase::frameVariableCount() const +{ + ProtocolListIterator *iter; + quint64 frameCount = 1; + + iter = createProtocolListIterator(); + while (iter->hasNext()) + { + AbstractProtocol *proto; + int count; + + proto = iter->next(); + count = proto->protocolFrameVariableCount(); + + frameCount = AbstractProtocol::lcm(frameCount, count); + } + delete iter; + + return frameCount; +} + // frameProtocolLength() returns the sum of all the individual protocol sizes // which may be different from frameLen() int StreamBase::frameProtocolLength(int frameIndex) const diff --git a/common/streambase.h b/common/streambase.h index 1d6c5f5..9ef3ba1 100644 --- a/common/streambase.h +++ b/common/streambase.h @@ -138,6 +138,7 @@ public: bool isFrameVariable() const; bool isFrameSizeVariable() const; + int frameVariableCount() const; int frameProtocolLength(int frameIndex) const; int frameCount() const; int frameValue(uchar *buf, int bufMaxSize, int frameIndex) const; diff --git a/server/abstractport.cpp b/server/abstractport.cpp index f375cad..8d1fa00 100644 --- a/server/abstractport.cpp +++ b/server/abstractport.cpp @@ -21,10 +21,12 @@ along with this program. If not, see #include "abstractport.h" +#include "../common/streambase.h" +#include "../common/abstractprotocol.h" + #include #include -#include "../common/streambase.h" #include #include #include @@ -136,8 +138,6 @@ void AbstractPort::updatePacketList() void AbstractPort::updatePacketListSequential() { - int len; - bool isVariable; long sec = 0; long nsec = 0; @@ -152,7 +152,8 @@ void AbstractPort::updatePacketListSequential() { if (streamList_[i]->isEnabled()) { - long numPackets, numBursts; + long n, x, y; + long burstSize; double ibg = 0; long ibg1 = 0, ibg2 = 0; long nb1 = 0, nb2 = 0; @@ -163,27 +164,32 @@ void AbstractPort::updatePacketListSequential() switch (streamList_[i]->sendUnit()) { case OstProto::StreamControl::e_su_bursts: - numBursts = streamList_[i]->numBursts(); - numPackets = streamList_[i]->burstSize(); + burstSize = streamList_[i]->burstSize(); + x = AbstractProtocol::lcm(streamList_[i]->frameVariableCount(), + burstSize); + n = ulong(burstSize * streamList_[i]->burstRate()) / x; + y = ulong(burstSize * streamList_[i]->burstRate()) % x; if (streamList_[i]->burstRate() > 0) { ibg = 1e9/double(streamList_[i]->burstRate()); ibg1 = long(ceil(ibg)); ibg2 = long(floor(ibg)); - nb1 = long((ibg - double(ibg2)) * double(numBursts)); - nb2= numBursts - nb1; + nb1 = long((ibg - double(ibg2)) * double(x)); + nb2 = x - nb1; } break; case OstProto::StreamControl::e_su_packets: - numBursts = 1; - numPackets = streamList_[i]->numPackets(); + x = streamList_[i]->frameVariableCount(); + n = streamList_[i]->numPackets() / x; + y = streamList_[i]->numPackets() % x; + burstSize = x + y; if (streamList_[i]->packetRate() > 0) { ipg = 1e9/double(streamList_[i]->packetRate()); ipg1 = long(ceil(ipg)); ipg2 = long(floor(ipg)); - np1 = long((ipg - double(ipg2)) * double(numPackets)); - np2= numPackets - np1; + np1 = long((ipg - double(ipg2)) * double(x)); + np2 = x - np1; } break; default: @@ -191,56 +197,51 @@ void AbstractPort::updatePacketListSequential() streamList_[i]->sendUnit()); continue; } - qDebug("numBursts = %ld, numPackets = %ld\n", - numBursts, numPackets); - qDebug("ibg = %g, ibg1/nb1 = %ld/%ld ibg2/nb2 = %ld/%ld\n", + + qDebug("\nframeVariableCount = %d", + streamList_[i]->frameVariableCount()); + qDebug("n = %ld, x = %ld, y = %ld, burstSize = %ld", + n, x, y, burstSize); + qDebug("ibg = %g, ibg1/nb1 = %ld/%ld ibg2/nb2 = %ld/%ld", ibg, ibg1, nb1, ibg2, nb2); qDebug("ipg = %g, ipg1/np1 = %ld/%ld ipg2/np2 = %ld/%ld\n", ipg, ipg1, np1, ipg2, np2); - if (streamList_[i]->isFrameVariable()) - { - isVariable = true; - len = 0; // avoid compiler warning; get len value for each pkt - } - else - { - isVariable = false; - len = streamList_[i]->frameValue(pktBuf_, sizeof(pktBuf_), 0); - } + if (n > 1) + loopNextPacketSet(x, n); - for (int j = 0; j < numBursts; j++) + for (int j = 0; j < (x+y); j++) { - for (int k = 0; k < numPackets; k++) + int len; + + len = streamList_[i]->frameValue(pktBuf_, sizeof(pktBuf_), j); + if (len <= 0) + continue; + + qDebug("q(%d, %d) sec = %lu nsec = %lu", + i, j, sec, nsec); + + appendToPacketList(sec, nsec, pktBuf_, len); + + if ((j % burstSize) == 0) { - if (isVariable) - { - len = streamList_[i]->frameValue(pktBuf_, - sizeof(pktBuf_), j * numPackets + k); - } - if (len <= 0) - continue; - - qDebug("q(%d, %d, %d) sec = %lu nsec = %lu", - i, j, k, sec, nsec); - - appendToPacketList(sec, nsec, pktBuf_, len); - - nsec += (k < np1) ? ipg1 : ipg2; + nsec += (j < nb1) ? ibg1 : ibg2; while (nsec >= 1e9) { sec++; nsec -= long(1e9); } - } // for (numPackets) - - nsec += (j < nb1) ? ibg1 : ibg2; - while (nsec >= 1e9) - { - sec++; - nsec -= long(1e9); } - } // for (numBursts) + else + { + nsec += (j < np1) ? ipg1 : ipg2; + while (nsec >= 1e9) + { + sec++; + nsec -= long(1e9); + } + } + } switch(streamList_[i]->nextWhat()) { diff --git a/server/abstractport.h b/server/abstractport.h index c04affe..3d0f3ce 100644 --- a/server/abstractport.h +++ b/server/abstractport.h @@ -71,6 +71,7 @@ public: void setDirty() { isSendQueueDirty_ = true; } virtual void clearPacketList() = 0; + virtual void loopNextPacketSet(qint64 size, qint64 repeats) = 0; virtual bool appendToPacketList(long sec, long nsec, const uchar *packet, int length) = 0; virtual void setPacketListLoopMode(bool loop, diff --git a/server/pcapport.cpp b/server/pcapport.cpp index 3f1434b..da7360c 100644 --- a/server/pcapport.cpp +++ b/server/pcapport.cpp @@ -295,43 +295,79 @@ void PcapPort::PortTransmitter::clearPacketList() pcap_send_queue *sq = sendQueueList_.takeFirst(); pcap_sendqueue_destroy(sq); } + sendQueueRepeat_.clear(); + sendQueueGoto_.clear(); + currentSendQueue_ = NULL; + currentPacketSetQIdx_ = -1; + currentPacketSetSize_ = -1; + currentPacketSetRepeat_ = 1; + currentPacketSetCount_ = 0; setPacketListLoopMode(false, 0, 0); } +void PcapPort::PortTransmitter::loopNextPacketSet(qint64 size, qint64 repeats) +{ + // Trigger a new sendQ alloc on next packet + currentSendQueue_ = NULL; + currentPacketSetQIdx_ = sendQueueList_.size(); + currentPacketSetSize_ = size; + currentPacketSetRepeat_ = repeats; + currentPacketSetCount_ = 0; +} + bool PcapPort::PortTransmitter::appendToPacketList(long sec, long nsec, const uchar *packet, int length) { bool op = true; pcap_pkthdr pktHdr; - pcap_send_queue *sendQ; pktHdr.caplen = pktHdr.len = length; pktHdr.ts.tv_sec = sec; pktHdr.ts.tv_usec = nsec/1000; - sendQ = sendQueueList_.isEmpty() ? NULL : sendQueueList_.last(); - - if ((sendQ == NULL) || - (sendQ->len + 2*sizeof(pcap_pkthdr) + length) > sendQ->maxlen) + if ((currentSendQueue_ == NULL) || + (currentSendQueue_->len + 2*sizeof(pcap_pkthdr) + length) + > currentSendQueue_->maxlen) { - // Add a zero len packet at end of sendQ for inter-sendQ timing - if (sendQ) + // Add a zero len packet at end of currentSendQueue_ for + // inter-sendQ timing + if (sendQueueList_.size()) { pcap_pkthdr hdr = pktHdr; hdr.caplen = 0; - pcap_sendqueue_queue(sendQ, &hdr, (u_char*)packet); + pcap_sendqueue_queue(sendQueueList_.last(), &hdr, (u_char*)packet); } //! \todo (LOW): calculate sendqueue size - sendQ = pcap_sendqueue_alloc(1*1024*1024); - sendQueueList_.append(sendQ); + currentSendQueue_ = pcap_sendqueue_alloc(1*1024*1024); - // Validate that the pkt will fit inside the new sendQ - Q_ASSERT((length + sizeof(pcap_pkthdr)) < sendQ->maxlen); + sendQueueList_.append(currentSendQueue_); + sendQueueRepeat_.append(1); + sendQueueGoto_.append(-1); + + // Validate that the pkt will fit inside the new currentSendQueue_ + Q_ASSERT((length + sizeof(pcap_pkthdr)) < currentSendQueue_->maxlen); } - if (pcap_sendqueue_queue(sendQ, &pktHdr, (u_char*) packet) < 0) + if (pcap_sendqueue_queue(currentSendQueue_, &pktHdr, (u_char*) packet) < 0) op = false; + currentPacketSetCount_++; + if (currentPacketSetCount_ == currentPacketSetSize_) + { + qDebug("i=%d, currentPacketSetQIdx_ = %d, currentPacketSetRepeat_ = %d", + sendQueueRepeat_.size(), + currentPacketSetQIdx_, + currentPacketSetRepeat_); + // End current sendQ + sendQueueGoto_.last() = currentPacketSetQIdx_; + sendQueueRepeat_.last() = currentPacketSetRepeat_; + currentPacketSetSize_ = -1; + + // Trigger a new sendQ allocation + currentSendQueue_ = NULL; + } + + return op; } @@ -375,19 +411,40 @@ void PcapPort::PortTransmitter::run() if (sendQueueList_.size() <= 0) return; + for(i = 0; i < sendQueueList_.size(); i++) { + qDebug("sendQ[%d]: rpt = %d, goto = %d", i, + sendQueueRepeat_.at(i), sendQueueGoto_.at(i)); + } + for(i = 0; i < sendQueueList_.size(); i++) { int ret; + _restart: - ret = sendQueueTransmit(handle_, sendQueueList_.at(i), overHead, + int idx = sendQueueGoto_.at(i); + int rpt = sendQueueRepeat_.at(i); + int sqi = i; + + for (int n = 0; n < rpt; n++) + { + Q_ASSERT(sqi <= i); + +#if 0 + if ((n & 0x3F) == 0) + qDebug("i = %d, n = %d/%d, sqi = %d\n", i, n, rpt, sqi); +#endif + ret = sendQueueTransmit(handle_, sendQueueList_.at(sqi), overHead, kSyncTransmit); - if (ret < 0) - { - qDebug("error %d in sendQueueTransmit()", ret); - qDebug("overHead = %ld", overHead); - stop_ = false; - return; + if (ret < 0) + { + qDebug("error %d in sendQueueTransmit()", ret); + qDebug("overHead = %ld", overHead); + stop_ = false; + return; + } + + sqi = (sqi == i) ? idx : sqi+1; } } diff --git a/server/pcapport.h b/server/pcapport.h index cd96f6b..daf1507 100644 --- a/server/pcapport.h +++ b/server/pcapport.h @@ -42,6 +42,9 @@ public: transmitter_->clearPacketList(); setPacketListLoopMode(false, 0, 0); } + virtual void loopNextPacketSet(qint64 size, qint64 repeats) { + transmitter_->loopNextPacketSet(size, repeats); + } virtual bool appendToPacketList(long sec, long nsec, const uchar *packet, int length) { return transmitter_->appendToPacketList(sec, nsec, packet, length); @@ -96,6 +99,7 @@ protected: PortTransmitter(const char *device); ~PortTransmitter(); void clearPacketList(); + void loopNextPacketSet(qint64 size, qint64 repeats); bool appendToPacketList(long sec, long usec, const uchar *packet, int length); void setPacketListLoopMode(bool loop, long secDelay, long nsecDelay) { @@ -112,7 +116,14 @@ protected: int sync); quint64 ticksFreq_; + int currentPacketSetQIdx_; + int currentPacketSetSize_; + int currentPacketSetRepeat_; + int currentPacketSetCount_; + pcap_send_queue* currentSendQueue_; QList sendQueueList_; + QList sendQueueRepeat_; + QList sendQueueGoto_; int returnToQIdx_; long loopDelay_; bool usingInternalStats_;