From 204e6efc2a9b4cd4a6983f2bd25c73f952431d90 Mon Sep 17 00:00:00 2001 From: "Srivats P." Date: Sat, 1 Oct 2011 12:28:28 +0530 Subject: [PATCH] Refactored send queue data structure and associated processing --- common/abstractprotocol.cpp | 3 +- common/streambase.cpp | 4 ++ server/abstractport.cpp | 5 ++ server/pcapport.cpp | 140 ++++++++++++++++++------------------ server/pcapport.h | 38 +++++++--- 5 files changed, 109 insertions(+), 81 deletions(-) diff --git a/common/abstractprotocol.cpp b/common/abstractprotocol.cpp index 7aa64ac..8ff9529 100644 --- a/common/abstractprotocol.cpp +++ b/common/abstractprotocol.cpp @@ -570,7 +570,8 @@ bool AbstractProtocol::isProtocolFrameSizeVariable() const } /*! - Returns the number of frames required for the protocol to vary its fields + Returns the minimum number of frames required for the protocol to + vary its fields This is the lowest common multiple (LCM) of the counts of all the varying fields in the protocol. Use the AbstractProtocol::lcm() static utility diff --git a/common/streambase.cpp b/common/streambase.cpp index 8c049e2..ab43117 100644 --- a/common/streambase.cpp +++ b/common/streambase.cpp @@ -454,6 +454,10 @@ int StreamBase::frameVariableCount() const proto = iter->next(); count = proto->protocolFrameVariableCount(); + // correct count for mis-behaving protocols + if (count <= 0) + count = 1; + frameCount = AbstractProtocol::lcm(frameCount, count); } delete iter; diff --git a/server/abstractport.cpp b/server/abstractport.cpp index 8d1fa00..3bf9148 100644 --- a/server/abstractport.cpp +++ b/server/abstractport.cpp @@ -161,6 +161,9 @@ void AbstractPort::updatePacketListSequential() long ipg1 = 0, ipg2 = 0; long np1 = 0, np2 = 0; + // We derive n, x, y such that + // n * x + y = total number of packets to be sent + switch (streamList_[i]->sendUnit()) { case OstProto::StreamControl::e_su_bursts: @@ -209,6 +212,8 @@ void AbstractPort::updatePacketListSequential() if (n > 1) loopNextPacketSet(x, n); + else if (n == 0) + x = 0; for (int j = 0; j < (x+y); j++) { diff --git a/server/pcapport.cpp b/server/pcapport.cpp index da7360c..8a82b1c 100644 --- a/server/pcapport.cpp +++ b/server/pcapport.cpp @@ -289,30 +289,28 @@ PcapPort::PortTransmitter::~PortTransmitter() void PcapPort::PortTransmitter::clearPacketList() { Q_ASSERT(!isRunning()); - // \todo lock for sendQueueList - while(sendQueueList_.size()) - { - pcap_send_queue *sq = sendQueueList_.takeFirst(); - pcap_sendqueue_destroy(sq); - } - sendQueueRepeat_.clear(); - sendQueueGoto_.clear(); - currentSendQueue_ = NULL; - currentPacketSetQIdx_ = -1; - currentPacketSetSize_ = -1; - currentPacketSetRepeat_ = 1; - currentPacketSetCount_ = 0; + // \todo lock for packetSequenceList + while(packetSequenceList_.size()) + delete packetSequenceList_.takeFirst(); + + currentPacketSequence_ = NULL; + repeatSequenceStart_ = -1; + repeatSize_ = 0; + packetCount_ = 0; + setPacketListLoopMode(false, 0, 0); } void PcapPort::PortTransmitter::loopNextPacketSet(qint64 size, qint64 repeats) { - // Trigger a new sendQ alloc on next packet - currentSendQueue_ = NULL; - currentPacketSetQIdx_ = sendQueueList_.size(); - currentPacketSetSize_ = size; - currentPacketSetRepeat_ = repeats; - currentPacketSetCount_ = 0; + currentPacketSequence_ = new PacketSequence; + currentPacketSequence_->repeatCount_ = repeats; + + repeatSequenceStart_ = packetSequenceList_.size(); + repeatSize_ = size; + packetCount_ = 0; + + packetSequenceList_.append(currentPacketSequence_); } bool PcapPort::PortTransmitter::appendToPacketList(long sec, long nsec, @@ -325,48 +323,51 @@ bool PcapPort::PortTransmitter::appendToPacketList(long sec, long nsec, pktHdr.ts.tv_sec = sec; pktHdr.ts.tv_usec = nsec/1000; - if ((currentSendQueue_ == NULL) || - (currentSendQueue_->len + 2*sizeof(pcap_pkthdr) + length) - > currentSendQueue_->maxlen) + if (currentPacketSequence_ == NULL || + !currentPacketSequence_->hasFreeSpace(2*sizeof(pcap_pkthdr)+length)) { // Add a zero len packet at end of currentSendQueue_ for // inter-sendQ timing - if (sendQueueList_.size()) + if (packetSequenceList_.size()) { pcap_pkthdr hdr = pktHdr; hdr.caplen = 0; - pcap_sendqueue_queue(sendQueueList_.last(), &hdr, (u_char*)packet); + pcap_sendqueue_queue(packetSequenceList_.last()->sendQueue_, + &hdr, (u_char*)packet); } //! \todo (LOW): calculate sendqueue size - currentSendQueue_ = pcap_sendqueue_alloc(1*1024*1024); + currentPacketSequence_ = new PacketSequence; - sendQueueList_.append(currentSendQueue_); - sendQueueRepeat_.append(1); - sendQueueGoto_.append(-1); + packetSequenceList_.append(currentPacketSequence_); // Validate that the pkt will fit inside the new currentSendQueue_ - Q_ASSERT((length + sizeof(pcap_pkthdr)) < currentSendQueue_->maxlen); + Q_ASSERT(currentPacketSequence_->hasFreeSpace( + sizeof(pcap_pkthdr) + length)); } - if (pcap_sendqueue_queue(currentSendQueue_, &pktHdr, (u_char*) packet) < 0) - op = false; - - currentPacketSetCount_++; - if (currentPacketSetCount_ == currentPacketSetSize_) + if (pcap_sendqueue_queue(currentPacketSequence_->sendQueue_, &pktHdr, + (u_char*) packet) < 0) { - qDebug("i=%d, currentPacketSetQIdx_ = %d, currentPacketSetRepeat_ = %d", - sendQueueRepeat_.size(), - currentPacketSetQIdx_, - currentPacketSetRepeat_); - // End current sendQ - sendQueueGoto_.last() = currentPacketSetQIdx_; - sendQueueRepeat_.last() = currentPacketSetRepeat_; - currentPacketSetSize_ = -1; - - // Trigger a new sendQ allocation - currentSendQueue_ = NULL; + op = false; } + packetCount_++; + if (repeatSize_ > 0 && packetCount_ == repeatSize_) + { + qDebug("repeatSequenceStart_=%d, repeatSize_ = %llu", + repeatSequenceStart_, repeatSize_); + + // Set the packetSequence repeatSize + Q_ASSERT(repeatSequenceStart_ >= 0); + Q_ASSERT(repeatSequenceStart_ < packetSequenceList_.size()); + packetSequenceList_[repeatSequenceStart_]->repeatSize_ = + packetSequenceList_.size() - repeatSequenceStart_; + + repeatSize_ = 0; + + // End current pktSeq and trigger a new pktSeq allocation for next pkt + currentPacketSequence_ = NULL; + } return op; } @@ -407,44 +408,39 @@ void PcapPort::PortTransmitter::run() int i; long overHead = 0; - qDebug("sendQueueList_.size = %d", sendQueueList_.size()); - if (sendQueueList_.size() <= 0) + qDebug("packetSequenceList_.size = %d", packetSequenceList_.size()); + if (packetSequenceList_.size() <= 0) return; - for(i = 0; i < sendQueueList_.size(); i++) { - qDebug("sendQ[%d]: rpt = %d, goto = %d", i, - sendQueueRepeat_.at(i), sendQueueGoto_.at(i)); + for(i = 0; i < packetSequenceList_.size(); i++) { + qDebug("sendQ[%d]: rptCnt = %d, rptSz = %d", i, + packetSequenceList_.at(i)->repeatCount_, + packetSequenceList_.at(i)->repeatSize_); } - for(i = 0; i < sendQueueList_.size(); i++) + for(i = 0; i < packetSequenceList_.size(); i++) { - int ret; _restart: - int idx = sendQueueGoto_.at(i); - int rpt = sendQueueRepeat_.at(i); - int sqi = i; + int rptSz = packetSequenceList_.at(i)->repeatSize_; + int rptCnt = packetSequenceList_.at(i)->repeatCount_; - for (int n = 0; n < rpt; n++) + for (int j = 0; j < rptCnt; j++) { - Q_ASSERT(sqi <= i); - -#if 0 - if ((n & 0x3F) == 0) - qDebug("i = %d, n = %d/%d, sqi = %d\n", i, n, rpt, sqi); -#endif - ret = sendQueueTransmit(handle_, sendQueueList_.at(sqi), overHead, - kSyncTransmit); - - if (ret < 0) + for (int k = 0; k < rptSz; k++) { - qDebug("error %d in sendQueueTransmit()", ret); - qDebug("overHead = %ld", overHead); - stop_ = false; - return; - } + int ret = sendQueueTransmit(handle_, + packetSequenceList_.at(i+k)->sendQueue_, + overHead, kSyncTransmit); - sqi = (sqi == i) ? idx : sqi+1; + if (ret < 0) + { + qDebug("error %d in sendQueueTransmit()", ret); + qDebug("overHead = %ld", overHead); + stop_ = false; + return; + } + } } } diff --git a/server/pcapport.h b/server/pcapport.h index daf1507..f046670 100644 --- a/server/pcapport.h +++ b/server/pcapport.h @@ -111,21 +111,43 @@ protected: void run(); void stop(); private: + + class PacketSequence + { + public: + PacketSequence() { + sendQueue_ = pcap_sendqueue_alloc(1*1024*1024); + repeatCount_ = 1; + repeatSize_ = 1; + } + ~PacketSequence() { + pcap_sendqueue_destroy(sendQueue_); + } + bool hasFreeSpace(int size) { + if ((sendQueue_->len + size) <= sendQueue_->maxlen) + return true; + else + return false; + } + pcap_send_queue *sendQueue_; + int repeatCount_; + int repeatSize_; + }; + void udelay(long usec); int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, long &overHead, int sync); quint64 ticksFreq_; - int currentPacketSetQIdx_; - int currentPacketSetSize_; - int currentPacketSetRepeat_; - int currentPacketSetCount_; - pcap_send_queue* currentSendQueue_; - QList sendQueueList_; - QList sendQueueRepeat_; - QList sendQueueGoto_; + QList packetSequenceList_; + PacketSequence *currentPacketSequence_; + int repeatSequenceStart_; + quint64 repeatSize_; + quint64 packetCount_; + int returnToQIdx_; long loopDelay_; + bool usingInternalStats_; AbstractPort::PortStats *stats_; bool usingInternalHandle_;