From aebb609e3791073472899882d3344975b9262b40 Mon Sep 17 00:00:00 2001 From: Srivats P Date: Fri, 21 Apr 2023 18:32:17 +0530 Subject: [PATCH] Change algo to infer next Ttag pkt (interleaved mode) The algo works for the following cases of interleaved streams - * pktListDuration < ttagTimeInterval * pktListDuration > ttagTimeInterval * some streams have Ttag, some don't - first stream has Ttag - first stream does NOT have Ttag * no streams have Ttag Changes for sequential mode are pending --- server/abstractport.cpp | 34 +++++++++++++++++++++++--- server/abstractport.h | 4 +++ server/packetsequence.h | 2 -- server/pcapport.h | 6 +++++ server/pcaptransmitter.cpp | 7 ++++++ server/pcaptransmitter.h | 1 + server/pcaptxthread.cpp | 50 ++++++++++++++++++++++++++++++-------- server/pcaptxthread.h | 11 +++++++-- 8 files changed, 97 insertions(+), 18 deletions(-) diff --git a/server/abstractport.cpp b/server/abstractport.cpp index b6adc36..9fd2c18 100644 --- a/server/abstractport.cpp +++ b/server/abstractport.cpp @@ -436,6 +436,8 @@ int AbstractPort::updatePacketListInterleaved() int numStreams = 0; quint64 minGap = ULLONG_MAX; quint64 duration = quint64(1e3); // 1000ns (1us) + + // TODO: convert the below to a QList of struct aggregating all list vars QList streamId; QList ibg1, ibg2; QList nb1, nb2; @@ -445,6 +447,7 @@ int AbstractPort::updatePacketListInterleaved() QList pktCount, burstCount; QList burstSize; QList isVariable; + QList hasTtag; QList pktBuf; QList pktLen; int activeStreamCount = 0; @@ -594,6 +597,8 @@ int AbstractPort::updatePacketListInterleaved() packetListAttrib += attrib; } + hasTtag.append(streamList_[i]->hasProtocol( + OstProto::Protocol::kSignFieldNumber)); numStreams++; } // for i @@ -629,6 +634,8 @@ int AbstractPort::updatePacketListInterleaved() // FIXME: Turbo still thinks it has to create implicit packet set for // interleaved mode - Turbo code should be changed once this is validated qint64 totalPkts = 0; + QSet ttagMarkerStreams; + QList ttagMarkers; do { for (int i = 0; i < numStreams; i++) @@ -637,6 +644,12 @@ int AbstractPort::updatePacketListInterleaved() if ((schedSec.at(i) > sec) || (schedNsec.at(i) > nsec)) continue; + // One marker per stream + if (hasTtag.at(i) && !ttagMarkerStreams.contains(i)) { + ttagMarkerStreams.insert(i); + ttagMarkers.append(totalPkts); + } + for (uint j = 0; j < burstSize[i]; j++) { pktCount[i]++; @@ -680,8 +693,9 @@ int AbstractPort::updatePacketListInterleaved() + (durNsec - lastPktTxNsec)) /totalPkts; loopNextPacketSet(totalPkts, 1, 0, avgDelay); - qDebug("Interleaved Packet Set of size %lld, repeat 1 and delay %ldns", - totalPkts, avgDelay); + qDebug("Interleaved PacketSet of size %lld, duration %llu.%09llu " + "repeat 1 and avg delay %ldns", + totalPkts, durSec, durNsec, avgDelay); // Reset working sched/counts before building the packet list sec = nsec = 0; @@ -722,7 +736,7 @@ int AbstractPort::updatePacketListInterleaved() if (len <= 0) continue; - qDebug("q(%d) sec = %llu nsec = %llu", i, sec, nsec); + qDebug("q(%d) TS = %llu.%09llu", i, sec, nsec); if (!appendToPacketList(sec, nsec, buf, len)) { clearPacketList(); // don't leave it half baked/inconsitent packetListAttrib.errorFlags |= FrameValueAttrib::OutOfMemoryError; @@ -767,10 +781,22 @@ int AbstractPort::updatePacketListInterleaved() delayNsec += long(1e9); delaySec--; } - qDebug("loop Delay = %lld/%lld", delaySec, delayNsec); + qDebug("loop Delay = %lld.%09lld", delaySec, delayNsec); setPacketListLoopMode(true, delaySec, delayNsec); } + // XXX: TTag repeat interval calculation: + // CASE 1. pktListDuration < kTtagTimeInterval: + // e.g. if pktListDuration is 1sec and TtagTimerInterval is 5s, we + // skip 5 times total packets before we repeat the markers + // CASE 2. pktListDuration > kTtagTimeInterval: + // e.g. if pktListDuration is 7sec and TtagTimerInterval is 5s, we + // skip repeat markers every pktList iteration + setPacketListTtagMarkers(ttagMarkers, ttagMarkers.isEmpty() ? 0 : + qMax(uint(kTtagTimeInterval_*1e9 + /(durSec*1e9+durNsec)), + 1U) * totalPkts); + _out_of_memory: isSendQueueDirty_ = false; diff --git a/server/abstractport.h b/server/abstractport.h index 68c8883..8849719 100644 --- a/server/abstractport.h +++ b/server/abstractport.h @@ -107,6 +107,8 @@ public: int length) = 0; virtual void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay) = 0; + virtual void setPacketListTtagMarkers(QList markers, + uint repeatInterval) = 0; int updatePacketList(); virtual void startTransmit() = 0; @@ -163,6 +165,8 @@ protected: StreamStats streamStats_; //! \todo Need lock for stats access/update + const uint kTtagTimeInterval_{5}; // in seconds + struct InterfaceInfo *interfaceInfo_; DeviceManager *deviceManager_; diff --git a/server/packetsequence.h b/server/packetsequence.h index d2da717..13a26bb 100644 --- a/server/packetsequence.h +++ b/server/packetsequence.h @@ -39,7 +39,6 @@ public: repeatSize_ = 1; usecDelay_ = 0; ttagL4CksumOffset_ = 0; - ttagPktInterval_ = 0; } ~PacketSequence() { pcap_sendqueue_destroy(sendQueue_); @@ -85,7 +84,6 @@ public: int repeatSize_; long usecDelay_; quint16 ttagL4CksumOffset_; // For ttag packets - qulonglong ttagPktInterval_; // ttag pkt once every X packets StreamStats streamStatsMeta_; private: diff --git a/server/pcapport.h b/server/pcapport.h index 591005f..89c452a 100644 --- a/server/pcapport.h +++ b/server/pcapport.h @@ -48,6 +48,7 @@ public: virtual void clearPacketList() { transmitter_->clearPacketList(); setPacketListLoopMode(false, 0, 0); + setPacketListTtagMarkers(QList(), 0); } virtual void loopNextPacketSet(qint64 size, qint64 repeats, long repeatDelaySec, long repeatDelayNsec) { @@ -62,6 +63,11 @@ public: { transmitter_->setPacketListLoopMode(loop, secDelay, nsecDelay); } + virtual void setPacketListTtagMarkers(QList markers, + uint repeatInterval) + { + transmitter_->setPacketListTtagMarkers(markers, repeatInterval); + } virtual void startTransmit() { Q_ASSERT(!isDirty()); diff --git a/server/pcaptransmitter.cpp b/server/pcaptransmitter.cpp index 29bfbd8..535e857 100644 --- a/server/pcaptransmitter.cpp +++ b/server/pcaptransmitter.cpp @@ -88,6 +88,13 @@ void PcapTransmitter::setPacketListLoopMode( txThread_.setPacketListLoopMode(loop, secDelay, nsecDelay); } +void PcapTransmitter::setPacketListTtagMarkers( + QList markers, + uint repeatInterval) +{ + txThread_.setPacketListTtagMarkers(markers, repeatInterval); +} + void PcapTransmitter::useExternalStats(AbstractPort::PortStats *stats) { txStats_.useExternalStats(stats); diff --git a/server/pcaptransmitter.h b/server/pcaptransmitter.h index ad4425d..9b7f710 100644 --- a/server/pcaptransmitter.h +++ b/server/pcaptransmitter.h @@ -42,6 +42,7 @@ public: bool appendToPacketList(long sec, long usec, const uchar *packet, int length); void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay); + void setPacketListTtagMarkers(QList markers, uint repeatInterval); void setHandle(pcap_t *handle); void useExternalStats(AbstractPort::PortStats *stats); diff --git a/server/pcaptxthread.cpp b/server/pcaptxthread.cpp index e219385..3604e73 100644 --- a/server/pcaptxthread.cpp +++ b/server/pcaptxthread.cpp @@ -23,6 +23,8 @@ along with this program. If not, see #include "statstuple.h" #include "timestamp.h" +#include + PcapTxThread::PcapTxThread(const char *device) { char errbuf[PCAP_ERRBUF_SIZE] = ""; @@ -133,8 +135,6 @@ void PcapTxThread::loopNextPacketSet(qint64 size, qint64 repeats, currentPacketSequence_->repeatCount_ = repeats; currentPacketSequence_->usecDelay_ = repeatDelaySec * long(1e6) + repeatDelayNsec/1000; - currentPacketSequence_->ttagPktInterval_ = - kTtagPktInterval*1e6/currentPacketSequence_->usecDelay_; repeatSequenceStart_ = packetSequenceList_.size(); repeatSize_ = size; @@ -205,8 +205,6 @@ bool PcapTxThread::appendToPacketList(long sec, long nsec, start->usecDelay_ = 0; start->repeatSize_ = packetSequenceList_.size() - repeatSequenceStart_; - start->ttagPktInterval_ = kTtagPktInterval*1e6 - /currentPacketSequence_->usecDelay_; } repeatSize_ = 0; @@ -227,6 +225,27 @@ void PcapTxThread::setPacketListLoopMode( loopDelay_ = secDelay*long(1e6) + nsecDelay/1000; } +void PcapTxThread::setPacketListTtagMarkers( + QList markers, + uint repeatInterval) +{ + // XXX: Empty markers => no streams have Ttag + firstTtagPkt_ = markers.isEmpty() ? -1 : int(markers.first()); + + // Calculate delta markers + ttagDeltaMarkers_.clear(); + for (int i = 1; i < markers.size(); i++) + ttagDeltaMarkers_.append(markers.at(i) - markers.at(i-1)); + if (!markers.isEmpty()) { + ttagDeltaMarkers_.append(repeatInterval - markers.last() + + markers.first()); + qDebug() << "TtagRepeatInterval:" << repeatInterval; + qDebug() << "FirstTtagPkt:" << firstTtagPkt_; + qDebug() << "TtagMarkers:" << ttagDeltaMarkers_; + } + +} + void PcapTxThread::setHandle(pcap_t *handle) { if (usingInternalHandle_) @@ -282,15 +301,23 @@ void PcapTxThread::run() packetSequenceList_.at(i)->repeatCount_, packetSequenceList_.at(i)->repeatSize_, packetSequenceList_.at(i)->usecDelay_); - qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld, ttagL4CksumOfs = %hu ttagPktIntvl = %llu", i, + qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld, ttagL4CksumOfs = %hu", i, packetSequenceList_.at(i)->packets_, packetSequenceList_.at(i)->usecDuration_, - packetSequenceList_.at(i)->ttagL4CksumOffset_, - packetSequenceList_.at(i)->ttagPktInterval_); + packetSequenceList_.at(i)->ttagL4CksumOffset_); } + qDebug() << "First Ttag: " << firstTtagPkt_ + << "Ttag Markers:" << ttagDeltaMarkers_; + lastStats_ = *stats_; // used for stream stats + // Init Ttag related vars. If no packets need ttag, firstTtagPkt_ is -1, + // so nextTagPkt_ is set to practically unreachable value (due to + // 64 bit counter wraparound time!) + ttagMarkerIndex_ = 0; + nextTtagPkt_ = stats_->pkts + firstTtagPkt_; + getTimeStamp(&startTime); state_ = kRunning; i = 0; @@ -416,6 +443,7 @@ double PcapTxThread::lastTxDuration() return lastTxDuration_; } +// FIXME: do we need 'seq' as func param? int PcapTxThread::sendQueueTransmit(pcap_t *p, PacketSequence *seq, long &overHead, int sync) { @@ -434,9 +462,7 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p, PacketSequence *seq, quint16 origCksum = 0; // Time for a T-Tag packet? - // FIXME: optimize per packet costly modulo operation - if (seq->ttagPktInterval_ - && ((stats_->pkts % seq->ttagPktInterval_) == 0)) { + if (stats_->pkts == nextTtagPkt_) { ttagPkt = true; // XXX: write 2xBytes instead of 1xHalf-word to avoid // potential alignment problem @@ -467,6 +493,10 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p, PacketSequence *seq, *cksum = qToBigEndian(quint16(~newCksum)); } ttagId_++; + nextTtagPkt_ += ttagDeltaMarkers_.at(ttagMarkerIndex_); + ttagMarkerIndex_++; + if (ttagMarkerIndex_ >= ttagDeltaMarkers_.size()) + ttagMarkerIndex_ = 0; } if (sync) { diff --git a/server/pcaptxthread.h b/server/pcaptxthread.h index e941c71..7596211 100644 --- a/server/pcaptxthread.h +++ b/server/pcaptxthread.h @@ -42,6 +42,7 @@ public: bool appendToPacketList(long sec, long usec, const uchar *packet, int length); void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay); + void setPacketListTtagMarkers(QList markers, uint repeatInterval); void setHandle(pcap_t *handle); @@ -80,7 +81,7 @@ private: quint64 packetListSize_; // count of pkts in packet List including repeats int returnToQIdx_; - quint64 loopDelay_; + quint64 loopDelay_; // in nanosecs void (*udelayFn_)(unsigned long); @@ -97,7 +98,13 @@ private: double lastTxDuration_{0.0}; // in secs - const ulong kTtagPktInterval{5}; // T-Tag pkt once every X sec + // XXX: Ttag Marker config derived; not updated during Tx + int firstTtagPkt_; + QList ttagDeltaMarkers_; + + // XXX: Ttag related; updated during Tx + int ttagMarkerIndex_; + quint64 nextTtagPkt_{0}; }; #endif