Compare commits

...

50 Commits

Author SHA1 Message Date
Srivats P
ca956c3c18 Fix extra whitespace in latency feature added code 2023-05-26 11:07:27 +05:30
Srivats P
f98c8af594 Comment out L4Cksum code from pcap tx thread
We are not rewriting L4Cksum for ttag packets at the moment. See
comment in packetsequence.h

Commenting out this code doesn't seem to observably improve tx
performance though.

The latency code seems to reduce stream stats max tx performance
by around 3 to 5%. Recovering this may be done separately as part
of overall performance optimzation of Tx code.
2023-05-08 15:31:20 +05:30
Srivats P
accc47fa34 Don't use avgDelay in interleaved mode
The interleaved mode's single packet set MUST always have 0 delay
for accurate rate.

Before latency code, the interleaved packet set was added implicitly
and had 0 delay. Latency code added explicit packet set and used
avgDelay for the set. The avgDelay was needed for the original algo
used to determine when to send ttag packets. That algo is no longer
used (ttag markers are now explicitly configured by AbstractPort).

Turbo still needs avgDelay for other use, but changes will be made in
Turbo code for that.
2023-05-08 12:01:56 +05:30
Srivats P
6f71844f7c Make win32 specific changes for per-stream latency
There are 2 changes -
1. Encode txPort in ttag packets and use it at TxTtagStats and
RxPcapStats to identify Tx and Rx packets respectively
2. Don't use pcap_sendqueue_transmit() if stream timing is in use -
since we can't modify TTAG packets inside that API
2023-05-06 13:15:37 +05:30
Srivats P
bd2a4715dc Fix windows build issues 2023-05-01 15:31:30 +05:30
Srivats P
4886739da6 Compile out timing debug prints
They can be compiled in if required
2023-04-30 11:50:54 +05:30
Srivats P
80a4578847 Fix latency timers not getting started/stopped
In a previous commit, we start/stop these timers based on number of
ports tracking stream stats triggered by RPCs. However, timers cannot
be triggered across threads (RPC thread and main thread in this case).

This fix uses a queued connection to post it to the other queue.
2023-04-30 10:40:49 +05:30
Srivats P
390ffae2b4 Fix tx ttag pcap filter capture filter syntax
The capture filter was not getting compiled earlier
2023-04-29 12:33:34 +05:30
Srivats P
d17ab7ab42 Use TtagTimeInterval to determine ttag markers
This is for interlaved mode; sequential mode was already using it
2023-04-29 12:08:26 +05:30
Srivats P
5abd6fb962 Don't use udiffTimeStamp() with struct timeval
It will fail to build for non-Linux platforms where TimeStamp is NOT timeval
2023-04-27 12:50:22 +05:30
Srivats P
3c0bc067fa Retain seq as param for sendQueueTransmit for now
If and when we remove PacketSequence::ttagL4ChecksumOffset we will take a call
if we should revert back to passing seq->sendQueue instead of seq at that time
2023-04-27 12:48:37 +05:30
Srivats P
223e44a6e3 Run stream latency timers only when required
Required => At least one port is tracking stream stats

Also changed some optimization FIXMEs as TODOs
2023-04-27 12:30:36 +05:30
Srivats P
6108de9b4f Remove comment about trying read-write lock
The stream timingHash is read by getStreamStats() while it is read/write
for processRecords(), the latter is a more frequent operation so there's
no real benefit of using a read-write lock instead of simple mutex.
2023-04-27 11:15:26 +05:30
Srivats P
e761bfa5c4 Make app QObject parent of StreamTiming singleton 2023-04-27 11:09:31 +05:30
Srivats P
b9345463c4 Add thread name for PcapRxStats
PcapTxTtagStats thread name has also been shortened since names beyond 16
chars are truncated.
2023-04-26 12:22:26 +05:30
Srivats P
dc6c4963a2 Change stream timing GC timer to CLOCK_REALTIME 2023-04-26 11:38:00 +05:30
Srivats P
cded62246e Don't fix incorrect checksum for Ttag packets
See comments in code
2023-04-26 10:01:24 +05:30
Srivats P
10befe0a66 Remove PcapTxTtagStats::handle_ member
PcaptxTtagStats inherits from PcapSession which already includes a protected
handle_ member.

This removal was likely left off when PcapTxTtagStats started inheriting from
PcapSession.
2023-04-24 17:26:50 +05:30
Srivats P
4394c7ffee Rework sequential mode build for new ttag algo
The previous commit changed the algo to determine which packets were Ttag'd,
but changes were done only for interleaved mode.

This commit adds the changes required for sequential mode.
2023-04-24 17:05:40 +05:30
Srivats P
ef1c166e7f Change algo to infer next Ttag pkt (interleaved mode)
The algo works for the following cases of interleaved streams -
 * pktListDuration < ttagTimeInterval
 * pktListDuration > ttagTimeInterval
 * some streams have Ttag, some don't
    - first stream has Ttag
    - first stream does NOT have Ttag
 * no streams have Ttag

Changes for sequential mode are pending
2023-04-21 18:32:17 +05:30
Srivats P
d1d2a5c1b5 Fix infinite loop in building interleaved streams
Incorrect timestamp comparison was leading to infinite loop
2023-04-21 17:11:43 +05:30
Srivats P
f56ce2e2ec Add explicit packet set for interleaved streams
Interleaved mode used an implicitly added packet set in both base and Turbo
code. This has been chaned to use an explicit mode to keep things consistent.

Turbo code still has the implicit packet set related code - that needs to be
removed, once the explicit packet set code is validated and tested.
2023-04-17 11:59:00 +05:30
Srivats P
c91475d416 Use qFrom/ToBigEndian instead of ntohs/htons
For consistency with rest of the code
2023-04-13 18:10:43 +05:30
Srivats P
b2ad3c5d08 Recompute L4 checksum on-the-fly for TTag packets 2023-04-12 15:47:55 +05:30
Srivats P
896371b987 Calculate L4 checksum offset for TTag packets 2023-04-11 18:09:14 +05:30
Srivats P
f5bb2e5d80 Rename delay as latency in Protobuf/RPC
The GUI uses the term 'latency', so it's better if the API alsos use the same
term instead of 'delay'
2023-04-04 18:34:20 +05:30
Srivats P
7cfccd686e Fix StreamTiming TxRxKey
makeKey was incorrect by mistake
2023-04-04 13:22:18 +05:30
Srivats P
2e502434db GUI changes to display avg latency
At this time we only show per-guid latency aggregated across all ports
2023-04-04 13:19:53 +05:30
Srivats P
aa140cd32a Process pending before fetching streamTiming delay 2023-04-03 12:58:58 +05:30
Srivats P
3b499263ec Fix MacOS build break by removing unused member
PcapTxTtagStats::lastPcapStats_ was unused because debugStats() was moved to
PcapSession, but removing this member var was left out
2023-04-03 12:54:13 +05:30
Srivats P
c378600baf Fix streamTiming garbage collection infinite loop 2023-04-03 12:47:30 +05:30
Srivats P
68734c44ca Delete PcapTxTtagSession::debugStats
debugStats() was moved to base class PcapSession earlier, but removing it
from here was missed out
2023-03-31 18:21:36 +05:30
Srivats P
05a9dd5743 Rename PcapRxStats::id_ as PcapRxStats::portId_
Better code clarity
2023-03-31 16:58:52 +05:30
Srivats P
05335b31d5 Integrate StreamTiming with the code
Bugs found during integration were fixed and minor code improvements were made
such as using consts, const params, renaming members etc.
2023-03-31 16:55:03 +05:30
Srivats P
f4c21e1ae4 Change StreamTiming::timing_ from QList to QHash
Using QList meant we need to know the port count in the constructor - which is
difficult to know because StreamTiming is designed as a singleton
2023-03-29 17:13:12 +05:30
Srivats P
757d3f1b24 Add initial cut of StreamTiming class
This singleton class will keep track of Ttag timing across all ports and GUIDs.

A bunch of FIXMEs/TODOs are pending for this class implementation; also this
class has not been hooked up to the rest of the code yet.
2023-03-28 16:06:28 +05:30
Srivats P
7e30ef5541 Fix another MacOS build break
Break was due to following warnings (being promoted to errors) -
 * id_ private member was unused
 * tv_usec is int on MacOS (but long on Linux)
2023-03-23 15:55:35 +05:30
Srivats P
f7b6b46a5d Move debugStats() from PcapRx to base PcapSession
With this change, other classes that use PcapSession as base can also
use debugStats(), if required
2023-03-23 15:42:41 +05:30
Srivats P
823f01557b Ignore failures when stopping stream stats tracking
Stop everything irrespective of any failures
2023-03-22 16:42:54 +05:30
Srivats P
682d0cc5c9 Create a Tx Ttag tracker thread
For now we are just debug printing timestamp with T-TagId and GUID. We
need to store this tuple and compare when we Rx the same - this will be
in a upcoming commit.
2023-03-22 16:32:41 +05:30
Srivats P
f1cfaa6e89 Fix MacOS build break
For some reason udiffTimeStamp is not defined for MacOS. To be investigated
later.
2023-03-22 16:29:51 +05:30
Srivats P
072dfcdc3b Add an incrementing tag id to T-Tag packets 2023-03-18 16:34:47 +05:30
Srivats P
90a3731a90 Add infra to update L4 checksum for T-Tag packets
Pending
 * Calculate L4 checksum offset instead of hardcoded value
 * Recalculate packet L4 checksum instead of using 0 value
2023-03-18 15:01:11 +05:30
Srivats P
5d4a19174e Change T-Tag on the fly 2023-03-17 12:45:59 +05:30
Srivats P
2104936b69 Don't create implicit packet sets for Tx
As part of Turbo changes, we made changes to create explicit packet
sets, but for the base code we continued creating implicit packet
sets for some cases. With this change we don't create any implicit
packet set.

This change needs to be tested thoroughly for multiple cases.
2023-03-17 12:36:29 +05:30
Srivats P
5dc1b851cc Reduce vertical whitespace in sendQueueTransmit() 2023-03-16 12:10:42 +05:30
Srivats P
8e25669a0e Add T-Tag placeholder in sign protocol 2023-03-16 11:30:02 +05:30
Srivats P
ebccc44cdf Reformat TxThread/run {} to use less vertical space 2023-03-08 18:14:45 +05:30
Srivats P
f3a9b507b0 Update comments about implict packetset 2023-03-08 18:10:09 +05:30
Srivats P
620004d46b Fix werror warning about 0 being signed by default 2023-03-08 18:09:34 +05:30
33 changed files with 1549 additions and 170 deletions

View File

@ -209,7 +209,7 @@ void DeviceGroupDialog::updateTotalDeviceCount()
void DeviceGroupDialog::updateIp4Gateway()
{
quint32 net = ip4Address->value() & (~0 << (32 - ip4PrefixLength->value()));
quint32 net = ip4Address->value() & (~0UL << (32 - ip4PrefixLength->value()));
ip4Gateway->setValue(net | 0x01);
}

View File

@ -51,6 +51,7 @@ enum {
kAvgRxFrameRate,
kAvgTxBitRate,
kAvgRxBitRate,
kAvgLatency,
kMaxAggrStreamStats
};
static QStringList aggrStatTitles = QStringList()
@ -61,7 +62,8 @@ static QStringList aggrStatTitles = QStringList()
<< "Avg\nTx PktRate"
<< "Avg\nRx PktRate"
<< "Avg\nTx BitRate"
<< "Avg\nRx BitRate";
<< "Avg\nRx BitRate"
<< "Avg\nLatency";
static const uint kAggrGuid = 0xffffffff;
@ -184,6 +186,12 @@ QVariant StreamStatsModel::data(const QModelIndex &index, int role) const
(aggrGuidStats_.value(guid).rxBytes
+ 24 * aggrGuidStats_.value(guid).rxPkts) * 8
/ aggrGuidStats_.value(guid).txDuration);
case kAvgLatency:
return aggrGuidStats_.value(guid).latencyCount <= 0
|| aggrGuidStats_.value(guid).latencySum <= 0 ? QString("-") :
XLocale().toTimeIntervalString(
aggrGuidStats_.value(guid).latencySum
/ aggrGuidStats_.value(guid).latencyCount);
default:
break;
};
@ -258,6 +266,7 @@ void StreamStatsModel::appendStreamStatsList(
ss.txPkts = s.tx_pkts();
ss.rxBytes = s.rx_bytes();
ss.txBytes = s.tx_bytes();
ss.rxLatency = s.latency();
aggrPort.rxPkts += ss.rxPkts;
aggrPort.txPkts += ss.txPkts;
@ -271,6 +280,10 @@ void StreamStatsModel::appendStreamStatsList(
aggrGuid.txBytes += ss.txBytes;
if (s.tx_duration() > aggrGuid.txDuration)
aggrGuid.txDuration = s.tx_duration(); // XXX: use largest or avg?
if (ss.rxLatency) {
aggrGuid.latencySum += ss.rxLatency;
aggrGuid.latencyCount++;
}
aggrAggr.rxPkts += ss.rxPkts;
aggrAggr.txPkts += ss.txPkts;
@ -279,6 +292,10 @@ void StreamStatsModel::appendStreamStatsList(
aggrAggr.txBytes += ss.txBytes;
if (aggrGuid.txDuration > aggrAggr.txDuration)
aggrAggr.txDuration = aggrGuid.txDuration;
if (ss.rxLatency) {
aggrAggr.latencySum += ss.rxLatency;
aggrAggr.latencyCount++;
}
if (!portList_.contains(pgp))
portList_.append(pgp);

View File

@ -57,6 +57,7 @@ private:
quint64 txPkts;
quint64 rxBytes;
quint64 txBytes;
quint64 rxLatency;
};
struct AggrGuidStats {
quint64 rxPkts;
@ -65,6 +66,8 @@ private:
quint64 txBytes;
qint64 pktLoss;
double txDuration;
quint64 latencySum;
uint latencyCount;
};
QList<Guid> guidList_;
QList<PortGroupPort> portList_;

View File

@ -100,6 +100,22 @@ public:
return QObject::tr("%L1 bps").arg(bps, 0, 'f', 4);
}
QString toTimeIntervalString(qint64 nanosecs) const
{
QString text;
if (nanosecs >= 1e9)
return QObject::tr("%L1 s").arg(nanosecs/1e9, 0, 'f', 3);
if (nanosecs >= 1e6)
return QObject::tr("%L1 ms").arg(nanosecs/1e6, 0, 'f', 3);
if (nanosecs >= 1e3)
return QObject::tr("%L1 us").arg(nanosecs/1e3, 0, 'f', 3);
return QObject::tr("%L1 ns").arg(nanosecs);
}
};
#endif

29
common/debugdefs.h Normal file
View File

@ -0,0 +1,29 @@
/*
Copyright (C) 2023 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#ifndef _DEBUG_DEFS_H
#define _DEBUG_DEFS_H
#if 0
#define timingDebug(fmt, ...) qDebug("TIMING:" fmt, __VA_ARGS__)
#else
#define timingDebug(...)
#endif
#endif

View File

@ -113,6 +113,7 @@ SOURCES += \
udp.cpp \
textproto.cpp \
hexdump.cpp \
packet.cpp \
payload.cpp \
sample.cpp \
sign.cpp \

117
common/packet.cpp Normal file
View File

@ -0,0 +1,117 @@
/*
Copyright (C) 2023 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#include "packet.h"
using namespace Packet;
quint16 Packet::l4ChecksumOffset(const uchar *pktData, int pktLen)
{
Parser parser(pktData, pktLen);
quint16 offset = kEthTypeOffset;
// Skip VLANs, if any
quint16 ethType = parser.field16(offset);
if (!parser.ok()) return 0;
// TODO: support 802.3 frames
if (ethType <= 1500)
return 0;
while (kVlanEthTypes.contains(ethType)) {
offset += kVlanTagSize;
ethType = parser.field16(offset);
if (!parser.ok()) return 0;
}
offset += kEthTypeSize;
// XXX: offset now points to Eth payload
// Skip MPLS tags, if any
if (ethType == kMplsEthType) {
while (1) {
quint32 mplsTag = parser.field32(offset);
if (!parser.ok()) return 0;
offset += kMplsTagSize;
if (mplsTag & 0x100) { // BOS bit
quint32 nextWord = parser.field32(offset);
if (!parser.ok()) return 0;
if (nextWord == 0) { // PW Control Word
offset += kMplsTagSize;
ethType = 0;
break;
}
quint8 firstPayloadNibble = nextWord >> 28;
if (firstPayloadNibble == 0x4)
ethType = kIp4EthType;
else if (firstPayloadNibble == 0x6)
ethType = kIp6EthType;
else
ethType = 0;
break;
}
}
}
quint8 ipProto = 0;
if (ethType == kIp4EthType) {
ipProto = parser.field8(offset + kIp4ProtocolOffset);
if (!parser.ok()) return 0;
quint8 ipHdrLen = parser.field8(offset) & 0x0F;
if (!parser.ok()) return 0;
offset += 4*ipHdrLen;
} else if (ethType == kIp6EthType) {
ipProto = parser.field8(offset + kIp6NextHeaderOffset);
if (!parser.ok()) return 0;
offset += kIp6HeaderSize;
// XXX: offset now points to IPv6 payload
// Skip IPv6 extension headers, if any
while (kIp6ExtensionHeaders.contains(ipProto)) {
ipProto = parser.field8(offset + kIp6ExtNextHeaderOffset);
if (!parser.ok()) return 0;
quint16 extHdrLen = parser.field8(offset + kIp6ExtLengthOffset);
offset += 8 + 8*extHdrLen;
}
} else {
// Non-IP
// TODO: support MPLS PW with Eth payload
return 0;
}
// XXX: offset now points to IP payload
if (ipProto == kIpProtoTcp) {
parser.field16(offset + kTcpChecksumOffset);
if (!parser.ok()) return 0;
return offset + kTcpChecksumOffset;
} else if (ipProto == kIpProtoUdp) {
parser.field16(offset + kUdpChecksumOffset);
if (!parser.ok()) return 0;
return offset + kUdpChecksumOffset;
}
// No L4
return 0;
}

112
common/packet.h Normal file
View File

@ -0,0 +1,112 @@
/*
Copyright (C) 2023 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#ifndef _PACKET_H
#define _PACKET_H
#include <QSet>
#include <QtGlobal>
namespace Packet {
class Parser {
public:
Parser(const uchar *data, int length)
: pktData_(data), pktLen_(length) {}
quint8 field8(int offset) {
if (offset >= pktLen_) {
ok_ = false;
return 0;
}
ok_ = true;
return pktData_[offset];
}
quint16 field16(int offset) {
if (offset + 1 >= pktLen_) {
ok_ = false;
return 0;
}
ok_ = true;
return pktData_[offset] << 8
| pktData_[offset+1];
}
quint32 field32(int offset) {
if (offset + 3 >= pktLen_) {
ok_ = false;
return 0;
}
ok_ = true;
return pktData_[offset] << 24
| pktData_[offset+1] << 16
| pktData_[offset+2] << 8
| pktData_[offset+3];
}
bool ok() {
return ok_;
}
private:
const uchar *pktData_;
int pktLen_;
bool ok_{false};
};
quint16 l4ChecksumOffset(const uchar *pktData, int pktLen);
//
// Constants
//
// Ethernet
const quint16 kEthTypeOffset = 12;
const quint16 kEthTypeSize = 2;
const quint16 kIp4EthType = 0x0800;
const quint16 kIp6EthType = 0x86dd;
const quint16 kMplsEthType = 0x8847;
const QSet<quint16> kVlanEthTypes = {0x8100, 0x9100, 0x88a8};
// VLAN
const quint16 kVlanTagSize = 4;
// MPLS
const quint16 kMplsTagSize = 4;
// IPv4
const quint16 kIp4ProtocolOffset = 9;
// IPv6
const quint16 kIp6HeaderSize = 40;
const quint16 kIp6NextHeaderOffset = 6;
// IPv6 Extension Header
const quint16 kIp6ExtNextHeaderOffset = 0;
const quint16 kIp6ExtLengthOffset = 1;
// IPv4/IPv6 Proto/NextHeader values
const quint8 kIpProtoTcp = 6;
const quint8 kIpProtoUdp = 17;
const QSet<quint8> kIp6ExtensionHeaders = {0, 60, 43, 44, 51, 50, 60, 135}; // FIXME: use names
// TCP
const quint16 kTcpChecksumOffset = 16;
// UDP
const quint16 kUdpChecksumOffset = 6;
};
#endif

View File

@ -292,6 +292,7 @@ message StreamStats {
required StreamGuid stream_guid = 2;
optional double tx_duration = 3; // in seconds
optional uint64 latency = 4; // in nanoseconds
optional uint64 rx_pkts = 11;
optional uint64 rx_bytes = 12;

View File

@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "sign.h"
#include "../common/streambase.h"
SignProtocol::SignProtocol(StreamBase *stream, AbstractProtocol *parent)
: AbstractProtocol(stream, parent)
{
@ -76,7 +78,9 @@ AbstractProtocol::FieldFlags SignProtocol::fieldFlags(int index) const
switch (index)
{
case sign_magic:
case sign_tlv_tx_port:
case sign_tlv_guid:
case sign_tlv_ttag:
case sign_tlv_end:
break;
@ -116,6 +120,52 @@ QVariant SignProtocol::fieldData(int index, FieldAttrib attrib,
}
break;
}
case sign_tlv_ttag:
{
switch(attrib)
{
case FieldName:
return QString("T-Tag");
case FieldValue:
return 0;
case FieldTextValue:
return QString("%1").arg(0);
case FieldFrameValue:
{
QByteArray fv;
fv.resize(2);
fv[0] = 0;
fv[1] = kTypeLenTtagPlaceholder;
return fv;
}
default:
break;
}
break;
}
case sign_tlv_tx_port:
{
switch(attrib)
{
case FieldName:
return QString("TxPort");
case FieldValue:
return mpStream->portId();
case FieldTextValue:
return QString("%1").arg(mpStream->portId());
case FieldFrameValue:
{
QByteArray fv;
fv.resize(2);
fv[0] = mpStream->portId() & 0xFF;
fv[1] = kTypeLenTxPort;
return fv;
}
default:
break;
}
break;
}
case sign_tlv_guid:
{
quint32 guid = data.stream_guid() & 0xFFFFFF;
@ -217,3 +267,27 @@ bool SignProtocol::packetGuid(const uchar *pkt, int pktLen, uint *guid)
}
return false;
}
bool SignProtocol::packetTtagId(const uchar *pkt, int pktLen, uint *ttagId, uint *guid)
{
bool ret = false;
const uchar *p = pkt + pktLen - sizeof(kSignMagic);
quint32 magic = qFromBigEndian<quint32>(p);
if (magic != kSignMagic)
return ret;
*guid = kInvalidGuid;
p--;
while (*p != kTypeLenEnd) {
if (*p == kTypeLenTtag) {
*ttagId = *(p - 1);
ret = true;
} else if (*p == kTypeLenGuid) {
*guid = qFromBigEndian<quint32>(p - 3) >> 8;
} else if (*p == kTypeLenTxPort) {
*ttagId |= uint(*(p - 1)) << 8;
}
p -= 1 + (*p >> 5); // move to next TLV
}
return ret;
}

View File

@ -23,6 +23,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "abstractprotocol.h"
#include "sign.pb.h"
#include <limits.h>
/*
Sign Protocol is expected at the end of the frame (just before the Eth FCS)
---+--------+-------+
@ -42,6 +44,9 @@ TLVs are encoded as
Defined TLVs
Type = 0, Len = 0 (0x00): End of TLVs
Type = 1, Len = 3 (0x61): Stream GUID
Type = 2, Len = 1 (0x22): T-Tag Placeholder (0 value)
Type = 3, Len = 1 (0x23): T-Tag with actual value
Type = 4, Len = 1 (0x24): Tx Port Id
*/
class SignProtocol : public AbstractProtocol
@ -51,7 +56,9 @@ public:
{
// Frame Fields
sign_tlv_end = 0,
sign_tlv_tx_port,
sign_tlv_guid,
sign_tlv_ttag,
sign_magic,
// Meta Fields
@ -83,11 +90,19 @@ public:
static quint32 magic();
static bool packetGuid(const uchar *pkt, int pktLen, uint *guid);
static bool packetTtagId(const uchar *pkt, int pktLen, uint *ttagId, uint *guid);
// XXX: Any change in kTypeLenXXX or magic value should also be done in
// TxThread/Ttag code as well where hardcoded values are used
static const quint32 kMaxGuid = 0x00ffffff;
static const quint32 kInvalidGuid = UINT_MAX;
static const quint8 kTypeLenTtagPlaceholder = 0x22;
static const quint8 kTypeLenTtag = 0x23;
private:
static const quint32 kSignMagic = 0x1d10c0da; // coda! (unicode - 0x1d10c)
static const quint8 kTypeLenEnd = 0x00;
static const quint8 kTypeLenGuid = 0x61;
static const quint8 kTypeLenTxPort = 0x24;
OstProto::Sign data;
};

View File

@ -75,9 +75,8 @@ public:
quint32 id() const;
bool setId(quint32 id);
quint32 portId() { return portId_;}
#if 0 // FIXME(HI): needed?
quint32 portId()
{ return mCore->port_id();}
bool setPortId(quint32 id)
{ mCore->set_port_id(id); return true;}
#endif

View File

@ -25,6 +25,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "devicemanager.h"
#include "interfaceinfo.h"
#include "packetbuffer.h"
#include "streamtiming.h"
#include <QString>
#include <QIODevice>
@ -54,6 +55,8 @@ AbstractPort::AbstractPort(int id, const char *device)
maxStatsValue_ = ULLONG_MAX; // assume 64-bit stats
memset((void*) &stats_, 0, sizeof(stats_));
resetStats();
streamTiming_ = StreamTiming::instance();
}
AbstractPort::~AbstractPort()
@ -197,6 +200,13 @@ void AbstractPort::addNote(QString note)
bool AbstractPort::setTrackStreamStats(bool enable)
{
// XXX: This function is called by modify() in context of the RPC
// thread (1 thread per connected client), but the StreamTiming
// singleton resides in the main thread and its' start/stop methods
// start/stop timers which cannot be done across Qt Threads. Hence
// this slightly hacky way of invoking those methods
QMetaObject::invokeMethod(streamTiming_, enable ? "start" : "stop",
Qt::QueuedConnection, Q_ARG(uint, id()));
data_.set_is_tracking_stream_stats(enable);
return true;
@ -232,6 +242,9 @@ int AbstractPort::updatePacketList()
int AbstractPort::updatePacketListSequential()
{
quint64 duration = 0; // in nanosec
quint64 totalPkts = 0;
QList<uint> ttagMarkers;
FrameValueAttrib packetListAttrib;
long sec = 0;
long nsec = 0;
@ -259,6 +272,8 @@ int AbstractPort::updatePacketListSequential()
quint64 npy1 = 0, npy2 = 0;
quint64 loopDelay;
ulong frameVariableCount = streamList_[i]->frameVariableCount();
bool hasTtag = streamList_[i]->hasProtocol(
OstProto::Protocol::kSignFieldNumber);
// We derive n, x, y such that
// n * x + y = total number of packets to be sent
@ -331,6 +346,7 @@ int AbstractPort::updatePacketListSequential()
else if (n == 0)
x = 0;
quint64 pktCount = n*x + y;
for (uint j = 0; j < (x+y); j++)
{
@ -383,6 +399,15 @@ int AbstractPort::updatePacketListSequential()
}
}
// Add a Ttag marker after every kTtagTimeInterval_ worth of pkts
if (hasTtag) {
uint ttagPktInterval = kTtagTimeInterval_*1e9/loopDelay;
for (uint k = 0; k < pktCount; k += ttagPktInterval)
ttagMarkers.append(totalPkts + k);
}
totalPkts += pktCount;
duration += pktCount*loopDelay; // in nanosecs
switch(streamList_[i]->nextWhat())
{
case StreamBase::e_nw_stop:
@ -420,6 +445,10 @@ int AbstractPort::updatePacketListSequential()
_out_of_memory:
_stop_no_more_pkts:
// See comments in updatePacketListInterleaved() for calc explanation
setPacketListTtagMarkers(ttagMarkers, ttagMarkers.isEmpty() ? 0 :
qMax(uint(kTtagTimeInterval_*1e9/(duration)),
1U) * totalPkts);
isSendQueueDirty_ = false;
qDebug("PacketListAttrib = %x",
@ -432,7 +461,9 @@ int AbstractPort::updatePacketListInterleaved()
FrameValueAttrib packetListAttrib;
int numStreams = 0;
quint64 minGap = ULLONG_MAX;
quint64 duration = quint64(1e3);
quint64 duration = quint64(1e3); // 1000ns (1us)
// TODO: convert the below to a QVector of struct aggregating all list vars
QList<int> streamId;
QList<quint64> ibg1, ibg2;
QList<quint64> nb1, nb2;
@ -442,6 +473,7 @@ int AbstractPort::updatePacketListInterleaved()
QList<ulong> pktCount, burstCount;
QList<ulong> burstSize;
QList<bool> isVariable;
QList<bool> hasTtag;
QList<QByteArray> pktBuf;
QList<ulong> pktLen;
int activeStreamCount = 0;
@ -465,6 +497,9 @@ int AbstractPort::updatePacketListInterleaved()
// First sort the streams by ordinalValue
std::sort(streamList_.begin(), streamList_.end(), StreamBase::StreamLessThan);
// FIXME: we are calculating n[bp][12], i[bp]g[12] for a duration of 1sec;
// this was fine when the actual packet list duration was also 1sec. But
// in the current code (post Turbo changes), the latter can be different!
for (int i = 0; i < streamList_.size(); i++)
{
if (!streamList_[i]->isEnabled())
@ -588,6 +623,8 @@ int AbstractPort::updatePacketListInterleaved()
packetListAttrib += attrib;
}
hasTtag.append(streamList_[i]->hasProtocol(
OstProto::Protocol::kSignFieldNumber));
numStreams++;
} // for i
@ -610,12 +647,93 @@ int AbstractPort::updatePacketListInterleaved()
uchar* buf;
int len;
quint64 durSec = duration/ulong(1e9);
quint64 durNsec = duration % ulong(1e9);
const quint64 durSec = duration/ulong(1e9);
const quint64 durNsec = duration % ulong(1e9);
quint64 sec = 0;
quint64 nsec = 0;
quint64 lastPktTxSec = 0;
quint64 lastPktTxNsec = 0;
// Count total packets we are going to add, so that we can create
// an explicit packet set first
// TODO: Find less expensive way to do this counting
// FIXME: Turbo still thinks it has to create implicit packet set for
// interleaved mode - Turbo code should be changed once this is validated
quint64 totalPkts = 0;
QVector<ulong> ttagSchedSec(numStreams, 0);
QVector<ulong> ttagSchedNsec(numStreams, 0);
QList<uint> ttagMarkers;
do
{
for (int i = 0; i < numStreams; i++)
{
// If a packet is not scheduled yet, look at the next stream
if ((schedSec.at(i) > sec) || (schedNsec.at(i) > nsec))
continue;
// Ttag marker every TtagTimeInterval for each stream
if (hasTtag.at(i)
&& ((schedSec.at(i) > ttagSchedSec.at(i))
|| ((schedSec.at(i) == ttagSchedSec.at(i))
&& (schedNsec.at(i) >= ttagSchedNsec.at(i))))) {
ttagMarkers.append(totalPkts);
ttagSchedSec[i] = schedSec.at(i) + kTtagTimeInterval_;
ttagSchedNsec[i] = schedNsec.at(i);
}
for (uint j = 0; j < burstSize[i]; j++)
{
pktCount[i]++;
schedNsec[i] += (pktCount.at(i) < np1.at(i)) ?
ipg1.at(i) : ipg2.at(i);
while (schedNsec.at(i) >= 1e9)
{
schedSec[i]++;
schedNsec[i] -= long(1e9);
}
lastPktTxSec = sec;
lastPktTxNsec = nsec;
totalPkts++;
}
burstCount[i]++;
schedNsec[i] += (burstCount.at(i) < nb1.at(i)) ?
ibg1.at(i) : ibg2.at(i);
while (schedNsec.at(i) >= 1e9)
{
schedSec[i]++;
schedNsec[i] -= long(1e9);
}
}
nsec += minGap;
while (nsec >= 1e9)
{
sec++;
nsec -= long(1e9);
}
} while ((sec < durSec) || ((sec == durSec) && (nsec < durNsec)));
// XXX: For interleaved mode, we ALWAYS have a single packet set with
// one repeat and 0n set loop delay
loopNextPacketSet(totalPkts, 1, 0, 0);
qDebug("Interleaved single PacketSet of size %lld, duration %llu.%09llu "
"repeat 1 and delay 0",
totalPkts, durSec, durNsec);
// Reset working sched/counts before building the packet list
sec = nsec = 0;
lastPktTxSec = lastPktTxNsec = 0;
for (int i = 0; i < numStreams; i++)
{
schedSec[i] = 0;
schedNsec[i] = 0;
pktCount[i] = 0;
burstCount[i] = 0;
}
// Now build the packet list
do
{
for (int i = 0; i < numStreams; i++)
@ -643,7 +761,7 @@ int AbstractPort::updatePacketListInterleaved()
if (len <= 0)
continue;
qDebug("q(%d) sec = %llu nsec = %llu", i, sec, nsec);
qDebug("q(%d) TS = %llu.%09llu", i, sec, nsec);
if (!appendToPacketList(sec, nsec, buf, len)) {
clearPacketList(); // don't leave it half baked/inconsitent
packetListAttrib.errorFlags |= FrameValueAttrib::OutOfMemoryError;
@ -678,7 +796,7 @@ int AbstractPort::updatePacketListInterleaved()
sec++;
nsec -= long(1e9);
}
} while ((sec < durSec) || (nsec < durNsec));
} while ((sec < durSec) || ((sec == durSec) && (nsec < durNsec)));
{
qint64 delaySec = durSec - lastPktTxSec;
@ -688,10 +806,22 @@ int AbstractPort::updatePacketListInterleaved()
delayNsec += long(1e9);
delaySec--;
}
qDebug("loop Delay = %lld/%lld", delaySec, delayNsec);
qDebug("loop Delay = %lld.%09lld", delaySec, delayNsec);
setPacketListLoopMode(true, delaySec, delayNsec);
}
// XXX: TTag repeat interval calculation:
// CASE 1. pktListDuration < kTtagTimeInterval:
// e.g. if pktListDuration is 1sec and TtagTimerInterval is 5s, we
// skip 5 times total packets before we repeat the markers
// CASE 2. pktListDuration > kTtagTimeInterval:
// e.g. if pktListDuration is 7sec and TtagTimerInterval is 5s, we
// skip repeat markers every pktList iteration
setPacketListTtagMarkers(ttagMarkers, ttagMarkers.isEmpty() ? 0 :
qMax(uint(kTtagTimeInterval_*1e9
/(durSec*1e9+durNsec)),
1U) * totalPkts);
_out_of_memory:
isSendQueueDirty_ = false;
@ -734,6 +864,16 @@ void AbstractPort::stats(PortStats *stats)
stats_.rxFrameErrors + (maxStatsValue_ - epochStats_.rxFrameErrors);
}
quint64 AbstractPort::streamTimingDelay(uint guid)
{
return streamTiming_->delay(id(), guid);
}
void AbstractPort::clearStreamTiming(uint guid)
{
streamTiming_->clear(id(), guid);
}
void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats)
{
// In case stats are being maintained elsewhere
@ -748,6 +888,7 @@ void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats)
s->mutable_port_id()->set_id(id());
s->set_tx_duration(lastTransmitDuration());
s->set_latency(streamTimingDelay(guid));
s->set_tx_pkts(sst.tx_pkts);
s->set_tx_bytes(sst.tx_bytes);
@ -775,6 +916,7 @@ void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats)
s->mutable_port_id()->set_id(id());
s->set_tx_duration(txDur);
s->set_latency(streamTimingDelay(i.key()));
s->set_tx_pkts(sst.tx_pkts);
s->set_tx_bytes(sst.tx_bytes);
@ -786,11 +928,13 @@ void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats)
void AbstractPort::resetStreamStats(uint guid)
{
streamStats_.remove(guid);
clearStreamTiming(guid);
}
void AbstractPort::resetStreamStatsAll()
{
streamStats_.clear();
clearStreamTiming();
}
void AbstractPort::clearDeviceNeighbors()

View File

@ -20,18 +20,20 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#ifndef _SERVER_ABSTRACT_PORT_H
#define _SERVER_ABSTRACT_PORT_H
#include "../common/protocol.pb.h"
#include "streamstats.h"
#include <QList>
#include <QtGlobal>
#include "../common/protocol.pb.h"
#include <limits.h>
class DeviceManager;
struct InterfaceInfo;
class StreamBase;
class PacketBuffer;
class QIODevice;
class StreamBase;
class StreamTiming;
// TODO: send notification back to client(s)
#define Xnotify qWarning
@ -105,6 +107,8 @@ public:
int length) = 0;
virtual void setPacketListLoopMode(bool loop,
quint64 secDelay, quint64 nsecDelay) = 0;
virtual void setPacketListTtagMarkers(QList<uint> markers,
uint repeatInterval) = 0;
int updatePacketList();
virtual void startTransmit() = 0;
@ -120,6 +124,9 @@ public:
void stats(PortStats *stats);
void resetStats() { epochStats_ = stats_; }
quint64 streamTimingDelay(uint guid);
void clearStreamTiming(uint guid = UINT_MAX);
// FIXME: combine single and All calls?
void streamStats(uint guid, OstProto::StreamStatsList *stats);
void streamStatsAll(OstProto::StreamStatsList *stats);
@ -158,6 +165,8 @@ protected:
StreamStats streamStats_;
//! \todo Need lock for stats access/update
const uint kTtagTimeInterval_{5}; // in seconds
struct InterfaceInfo *interfaceInfo_;
DeviceManager *deviceManager_;
@ -177,6 +186,8 @@ private:
QList<StreamBase*> streamList_;
struct PortStats epochStats_;
StreamTiming *streamTiming_{nullptr};
};
#endif

View File

@ -41,7 +41,8 @@ LIBS += -lm
LIBS += -lprotobuf
HEADERS += drone.h \
pcaptransmitter.h \
myservice.h
myservice.h \
streamtiming.h
SOURCES += \
devicemanager.cpp \
device.cpp \
@ -56,12 +57,14 @@ SOURCES += \
pcaprxstats.cpp \
pcaptxstats.cpp \
pcaptxthread.cpp \
pcaptxttagstats.cpp \
bsdhostdevice.cpp \
bsdport.cpp \
linuxhostdevice.cpp \
linuxport.cpp \
linuxutils.cpp \
params.cpp \
streamtiming.cpp \
turbo.cpp \
winhostdevice.cpp \
winpcapport.cpp

View File

@ -20,8 +20,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#ifndef _PACKET_SEQUENCE_H
#define _PACKET_SEQUENCE_H
#include "pcapextra.h"
#include "../common/packet.h"
#include "../common/sign.h"
#include "pcapextra.h"
#include "streamstats.h"
class PacketSequence
@ -37,6 +38,7 @@ public:
repeatCount_ = 1;
repeatSize_ = 1;
usecDelay_ = 0;
ttagL4CksumOffset_ = 0;
}
~PacketSequence() {
pcap_sendqueue_destroy(sendQueue_);
@ -69,6 +71,16 @@ public:
streamStatsMeta_[guid].tx_bytes += pktHeader->caplen;
}
}
// TODO: A PacketSequence belongs to a unique stream only in case of
// sequential streams; for interleaved streams, we have only a single
// packet set (with one or more sequences) containing packets from
// multiple streams. To support this, we need to make l4cksum a packet
// property not a sequence property
// Till the above is fixed, Ttag packets will have wrong checksum
#if 0
if (trackGuidStats_ && (packets_ == 1)) // first packet of seq
ttagL4CksumOffset_ = Packet::l4ChecksumOffset(pktData, pktHeader->caplen);
#endif
return ret;
}
pcap_send_queue *sendQueue_;
@ -79,6 +91,7 @@ public:
int repeatCount_;
int repeatSize_;
long usecDelay_;
quint16 ttagL4CksumOffset_; // For ttag packets
StreamStats streamStatsMeta_;
private:

View File

@ -34,6 +34,7 @@ PcapPort::PcapPort(int id, const char *device)
transmitter_ = new PcapTransmitter(device, streamStats_);
capturer_ = new PortCapturer(device);
emulXcvr_ = new EmulationTransceiver(device, deviceManager_);
txTtagStatsPoller_ = new PcapTxTtagStats(device, id);
rxStatsPoller_ = new PcapRxStats(device, streamStats_, id);
if (!monitorRx_->handle() || !monitorTx_->handle())
@ -85,6 +86,9 @@ PcapPort::~PcapPort()
if (monitorTx_)
monitorTx_->stop();
txTtagStatsPoller_->stop();
delete txTtagStatsPoller_;
rxStatsPoller_->stop();
delete rxStatsPoller_;
@ -145,8 +149,10 @@ bool PcapPort::setRateAccuracy(AbstractPort::Accuracy accuracy)
void PcapPort::updateStreamStats()
{
// XXX: PcapTxThread already does this at the end of transmit; we
// just dump rx stats poller debug stats here
// just dump tx/rx stats poller debug stats here
qDebug("port %d txTtagStatsPoller: %s",
id(), qUtf8Printable(txTtagStatsPoller_->debugStats()));
qDebug("port %d rxStatsPoller: %s",
id(), qUtf8Printable(rxStatsPoller_->debugStats()));
}
@ -170,6 +176,8 @@ bool PcapPort::startStreamStatsTracking()
{
if (!transmitter_->setStreamStatsTracking(true))
goto _tx_fail;
if (!txTtagStatsPoller_->start())
goto _tx_ttag_fail;
if (!rxStatsPoller_->start())
goto _rx_fail;
/*
@ -183,6 +191,8 @@ bool PcapPort::startStreamStatsTracking()
return true;
_rx_fail:
txTtagStatsPoller_->stop();
_tx_ttag_fail:
transmitter_->setStreamStatsTracking(false);
_tx_fail:
qWarning("failed to start stream stats tracking");
@ -191,17 +201,22 @@ _tx_fail:
bool PcapPort::stopStreamStatsTracking()
{
if (!transmitter_->setStreamStatsTracking(false))
goto _tx_fail;
if (!rxStatsPoller_->stop())
goto _rx_fail;
return true;
bool ret = true;
_rx_fail:
transmitter_->setStreamStatsTracking(true);
_tx_fail:
qWarning("failed to stop stream stats tracking");
return false;
if (!transmitter_->setStreamStatsTracking(false)) {
qWarning("failed to stop Transmitter stream stats tracking");
ret = false;
}
if (!txTtagStatsPoller_->stop()) {
qWarning("failed to stop TxTtag stream stats thread");
ret = false;
}
if (!rxStatsPoller_->stop()) {
qWarning("failed to stop Rx stream stats thread");
ret = false;
}
return ret;
}
/*
@ -489,7 +504,7 @@ void PcapPort::PortCapturer::stop()
{
if (state_ == kRunning) {
stop_ = true;
PcapSession::stop(handle_);
PcapSession::stop();
while (state_ == kRunning)
QThread::msleep(10);
}
@ -713,7 +728,7 @@ void PcapPort::EmulationTransceiver::stop()
{
if (state_ == kRunning) {
stop_ = true;
PcapSession::stop(handle_);
PcapSession::stop();
while (state_ == kRunning)
QThread::msleep(10);
}

View File

@ -27,6 +27,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "abstractport.h"
#include "pcapextra.h"
#include "pcaprxstats.h"
#include "pcaptxttagstats.h"
#include "pcapsession.h"
#include "pcaptransmitter.h"
@ -47,6 +48,7 @@ public:
virtual void clearPacketList() {
transmitter_->clearPacketList();
setPacketListLoopMode(false, 0, 0);
setPacketListTtagMarkers(QList<uint>(), 0);
}
virtual void loopNextPacketSet(qint64 size, qint64 repeats,
long repeatDelaySec, long repeatDelayNsec) {
@ -61,6 +63,11 @@ public:
{
transmitter_->setPacketListLoopMode(loop, secDelay, nsecDelay);
}
virtual void setPacketListTtagMarkers(QList<uint> markers,
uint repeatInterval)
{
transmitter_->setPacketListTtagMarkers(markers, repeatInterval);
}
virtual void startTransmit() {
Q_ASSERT(!isDirty());
@ -134,7 +141,6 @@ protected:
QString device_;
volatile bool stop_;
QTemporaryFile capFile_;
pcap_t *handle_;
pcap_dumper_t *dumpHandle_;
volatile State state_;
};
@ -161,7 +167,6 @@ protected:
QString device_;
DeviceManager *deviceManager_;
volatile bool stop_;
pcap_t *handle_;
volatile State state_;
};
@ -177,6 +182,7 @@ private:
PcapTransmitter *transmitter_;
PortCapturer *capturer_;
EmulationTransceiver *emulXcvr_;
PcapTxTtagStats *txTtagStatsPoller_;
PcapRxStats *rxStatsPoller_;
static pcap_if_t *deviceList_;

View File

@ -20,13 +20,16 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcaprxstats.h"
#include "pcapextra.h"
#include "../common/debugdefs.h"
#include "../common/sign.h"
#include "streamtiming.h"
#define Xnotify qWarning // FIXME
PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats, int id)
: streamStats_(portStreamStats)
{
setObjectName(QString("Rx$:%1").arg(device));
device_ = QString::fromLatin1(device);
stop_ = false;
state_ = kNotStarted;
@ -34,7 +37,9 @@ PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats, int i
handle_ = NULL;
id_ = id;
portId_ = id;
timing_ = StreamTiming::instance();
}
pcap_t* PcapRxStats::handle()
@ -106,7 +111,7 @@ void PcapRxStats::run()
}
_skip_filter:
memset(&lastPcapStats_, 0, sizeof(lastPcapStats_));
clearDebugStats();
PcapSession::preRun();
state_ = kRunning;
while (1) {
@ -117,8 +122,29 @@ _skip_filter:
ret = pcap_next_ex(handle_, &hdr, &data);
switch (ret) {
case 1: {
uint guid;
if (SignProtocol::packetGuid(data, hdr->caplen, &guid)) {
uint ttagId, guid;
#ifdef Q_OS_WIN32
// Npcap (Windows) doesn't support direction, so packets
// Tx by PcapTxThread are received back by us here - use
// TxPort to filter out. TxPort is returned as byte 1 of
// ttagId (byte 0 is ttagId).
// If TxPort is us ==> Tx Packet, so skip
// FIXME: remove once npcap supports pcap direction
if (SignProtocol::packetTtagId(data, hdr->caplen, &ttagId, &guid)
&& (ttagId >> 8 != uint(portId_))) {
ttagId &= 0xFF;
timing_->recordRxTime(portId_, guid, ttagId, hdr->ts);
timingDebug("[%d RX] %ld:%ld ttag %u guid %u", portId_,
hdr->ts.tv_sec, long(hdr->ts.tv_usec), ttagId, guid);
}
#else
if (SignProtocol::packetTtagId(data, hdr->caplen, &ttagId, &guid)) {
timing_->recordRxTime(portId_, guid, ttagId, hdr->ts);
timingDebug("[%d RX] %ld:%ld ttag %u guid %u", portId_,
hdr->ts.tv_sec, long(hdr->ts.tv_usec), ttagId, guid);
}
#endif
if (guid != SignProtocol::kInvalidGuid) {
streamStats_[guid].rx_pkts++;
streamStats_[guid].rx_bytes += hdr->caplen;
}
@ -174,7 +200,7 @@ bool PcapRxStats::stop()
{
if (state_ == kRunning) {
stop_ = true;
PcapSession::stop(handle_);
PcapSession::stop();
while (state_ == kRunning)
QThread::msleep(10);
}
@ -193,57 +219,3 @@ bool PcapRxStats::isDirectional()
{
return isDirectional_;
}
// XXX: Implemented as reset on read
QString PcapRxStats::debugStats()
{
QString dbgStats;
#ifdef Q_OS_WIN32
static_assert(sizeof(struct pcap_stat) == 6*sizeof(uint),
"pcap_stat has less or more than 6 values");
int size;
struct pcap_stat incPcapStats;
struct pcap_stat *pcapStats = pcap_stats_ex(handle_, &size);
if (pcapStats && (uint(size) >= 6*sizeof(uint))) {
incPcapStats.ps_recv = pcapStats->ps_recv - lastPcapStats_.ps_recv;
incPcapStats.ps_drop = pcapStats->ps_drop - lastPcapStats_.ps_drop;
incPcapStats.ps_ifdrop = pcapStats->ps_ifdrop - lastPcapStats_.ps_ifdrop;
incPcapStats.ps_capt = pcapStats->ps_capt - lastPcapStats_.ps_capt;
incPcapStats.ps_sent = pcapStats->ps_sent - lastPcapStats_.ps_sent;
incPcapStats.ps_netdrop = pcapStats->ps_netdrop - lastPcapStats_.ps_netdrop;
dbgStats = QString("recv: %1 drop: %2 ifdrop: %3 "
"capt: %4 sent: %5 netdrop: %6")
.arg(incPcapStats.ps_recv)
.arg(incPcapStats.ps_drop)
.arg(incPcapStats.ps_ifdrop)
.arg(incPcapStats.ps_capt)
.arg(incPcapStats.ps_sent)
.arg(incPcapStats.ps_netdrop);
lastPcapStats_ = *pcapStats;
} else {
dbgStats = QString("error reading pcap stats: %1")
.arg(pcap_geterr(handle_));
}
#else
struct pcap_stat pcapStats;
struct pcap_stat incPcapStats;
int ret = pcap_stats(handle_, &pcapStats);
if (ret == 0) {
incPcapStats.ps_recv = pcapStats.ps_recv - lastPcapStats_.ps_recv;
incPcapStats.ps_drop = pcapStats.ps_drop - lastPcapStats_.ps_drop;
incPcapStats.ps_ifdrop = pcapStats.ps_ifdrop - lastPcapStats_.ps_ifdrop;
dbgStats = QString("recv: %1 drop: %2 ifdrop: %3")
.arg(incPcapStats.ps_recv)
.arg(incPcapStats.ps_drop)
.arg(incPcapStats.ps_ifdrop);
lastPcapStats_ = pcapStats;
} else {
dbgStats = QString("error reading pcap stats: %1")
.arg(pcap_geterr(handle_));
}
#endif
return dbgStats;
}

View File

@ -24,6 +24,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcapsession.h"
class StreamTiming;
class PcapRxStats: public PcapSession
{
public:
@ -35,8 +37,6 @@ public:
bool isRunning();
bool isDirectional();
QString debugStats();
private:
enum State {
kNotStarted,
@ -47,12 +47,12 @@ private:
QString device_;
StreamStats &streamStats_;
volatile bool stop_;
pcap_t *handle_;
volatile State state_;
bool isDirectional_;
int id_;
struct pcap_stat lastPcapStats_;
int portId_;
StreamTiming *timing_{nullptr};
};
#endif

View File

@ -19,6 +19,69 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcapsession.h"
// XXX: Implemented as reset on read
QString PcapSession::debugStats()
{
QString dbgStats;
if (!handle_)
return QString();
#ifdef Q_OS_WIN32
static_assert(sizeof(struct pcap_stat) == 6*sizeof(uint),
"pcap_stat has less or more than 6 values");
int size;
struct pcap_stat incPcapStats;
struct pcap_stat *pcapStats = pcap_stats_ex(handle_, &size);
if (pcapStats && (uint(size) >= 6*sizeof(uint))) {
incPcapStats.ps_recv = pcapStats->ps_recv - lastPcapStats_.ps_recv;
incPcapStats.ps_drop = pcapStats->ps_drop - lastPcapStats_.ps_drop;
incPcapStats.ps_ifdrop = pcapStats->ps_ifdrop - lastPcapStats_.ps_ifdrop;
incPcapStats.ps_capt = pcapStats->ps_capt - lastPcapStats_.ps_capt;
incPcapStats.ps_sent = pcapStats->ps_sent - lastPcapStats_.ps_sent;
incPcapStats.ps_netdrop = pcapStats->ps_netdrop - lastPcapStats_.ps_netdrop;
dbgStats = QString("recv: %1 drop: %2 ifdrop: %3 "
"capt: %4 sent: %5 netdrop: %6")
.arg(incPcapStats.ps_recv)
.arg(incPcapStats.ps_drop)
.arg(incPcapStats.ps_ifdrop)
.arg(incPcapStats.ps_capt)
.arg(incPcapStats.ps_sent)
.arg(incPcapStats.ps_netdrop);
lastPcapStats_ = *pcapStats;
} else {
dbgStats = QString("error reading pcap stats: %1")
.arg(pcap_geterr(handle_));
}
#else
struct pcap_stat pcapStats;
struct pcap_stat incPcapStats;
int ret = pcap_stats(handle_, &pcapStats);
if (ret == 0) {
incPcapStats.ps_recv = pcapStats.ps_recv - lastPcapStats_.ps_recv;
incPcapStats.ps_drop = pcapStats.ps_drop - lastPcapStats_.ps_drop;
incPcapStats.ps_ifdrop = pcapStats.ps_ifdrop - lastPcapStats_.ps_ifdrop;
dbgStats = QString("recv: %1 drop: %2 ifdrop: %3")
.arg(incPcapStats.ps_recv)
.arg(incPcapStats.ps_drop)
.arg(incPcapStats.ps_ifdrop);
lastPcapStats_ = pcapStats;
} else {
dbgStats = QString("error reading pcap stats: %1")
.arg(pcap_geterr(handle_));
}
#endif
return dbgStats;
}
bool PcapSession::clearDebugStats()
{
memset(&lastPcapStats_, 0, sizeof(lastPcapStats_));
return true;
}
#ifdef Q_OS_UNIX
#include <signal.h>
#include <typeinfo>
@ -66,7 +129,7 @@ void PcapSession::postRun()
qDebug("Signal seen and handled");
}
void PcapSession::stop(pcap_t *handle)
void PcapSession::stop()
{
// Should be called OUTSIDE the thread's context
// XXX: As per the man page for pcap_breakloop, we need both
@ -74,7 +137,7 @@ void PcapSession::stop(pcap_t *handle)
// we use a signal for the latter
// TODO: If the signal mechanism doesn't work, we could try
// pthread_cancel(thread_);
pcap_breakloop(handle);
pcap_breakloop(handle_);
pthread_kill(thread_.nativeId(), MY_BREAK_SIGNAL);
}

View File

@ -55,27 +55,46 @@ inline uint qHash(const ThreadId &key)
class PcapSession: public QThread
{
public:
QString debugStats();
protected:
bool clearDebugStats();
void preRun();
void postRun();
void stop(pcap_t *handle);
void stop();
pcap_t *handle_{nullptr};
private:
static void signalBreakHandler(int /*signum*/);
ThreadId thread_;
static QHash<ThreadId, bool> signalSeen_;
struct pcap_stat lastPcapStats_;
};
#else
class PcapSession: public QThread
{
public:
QString debugStats();
protected:
bool clearDebugStats();
void preRun() {};
void postRun() {};
void stop(pcap_t *handle) {
qDebug("calling breakloop with handle %p", handle);
pcap_breakloop(handle);
void stop() {
qDebug("calling breakloop with handle %p", handle_);
pcap_breakloop(handle_);
}
pcap_t *handle_{nullptr};
private:
struct pcap_stat lastPcapStats_;
};
#endif

View File

@ -88,6 +88,13 @@ void PcapTransmitter::setPacketListLoopMode(
txThread_.setPacketListLoopMode(loop, secDelay, nsecDelay);
}
void PcapTransmitter::setPacketListTtagMarkers(
QList<uint> markers,
uint repeatInterval)
{
txThread_.setPacketListTtagMarkers(markers, repeatInterval);
}
void PcapTransmitter::useExternalStats(AbstractPort::PortStats *stats)
{
txStats_.useExternalStats(stats);

View File

@ -42,6 +42,7 @@ public:
bool appendToPacketList(long sec, long usec, const uchar *packet,
int length);
void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay);
void setPacketListTtagMarkers(QList<uint> markers, uint repeatInterval);
void setHandle(pcap_t *handle);
void useExternalStats(AbstractPort::PortStats *stats);

View File

@ -19,9 +19,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcaptxthread.h"
#include "sign.h"
#include "statstuple.h"
#include "timestamp.h"
#include <QtDebug>
PcapTxThread::PcapTxThread(const char *device)
{
char errbuf[PCAP_ERRBUF_SIZE] = "";
@ -109,10 +112,24 @@ void PcapTxThread::clearPacketList()
void PcapTxThread::loopNextPacketSet(qint64 size, qint64 repeats,
long repeatDelaySec, long repeatDelayNsec)
{
// Since we create implicit packetset for this case, skip
// This case => Packet set for y when x = 0 or n==1 in n*x+y
#if 0 // Don't let implicit packet sets be created
// XXX: The below change was done as part of Turbo code
// implementation alongwith calls to this function from
// AbstractPort::updatePacketListSequential(). Turbo to
// have clean code requires explicit packet sets for all
// cases (except interleaved streams). The below change
// was done so that the base code should not be affected
// after the explict packet set creation calls.
// XXX: Since we create implicit packetset for this case, skip
// This case =>
// 1. Packet set for y when x = 0
// 2. n==1 in n*x+y
// These two cases were the result of the changes in
// updatePacketListSequential() as part of Turbo changes
// mentioned above
if (repeats == 1)
return;
#endif
currentPacketSequence_ = new PacketSequence(trackStreamStats_);
currentPacketSequence_->repeatCount_ = repeats;
@ -136,24 +153,20 @@ bool PcapTxThread::appendToPacketList(long sec, long nsec,
pktHdr.ts.tv_sec = sec;
pktHdr.ts.tv_usec = nsec/1000;
if (currentPacketSequence_ == NULL ||
!currentPacketSequence_->hasFreeSpace(2*sizeof(pcap_pkthdr)+length))
{
if (currentPacketSequence_ != NULL)
{
long usecs;
// loopNextPacketSet should have created a seq
Q_ASSERT(currentPacketSequence_ != NULL);
usecs = (pktHdr.ts.tv_sec
- currentPacketSequence_->lastPacket_->ts.tv_sec)
* long(1e6);
usecs += (pktHdr.ts.tv_usec
- currentPacketSequence_->lastPacket_->ts.tv_usec);
currentPacketSequence_->usecDelay_ = usecs;
}
// If not enough space, update usecDelay and alloc a new seq
if (!currentPacketSequence_->hasFreeSpace(2*sizeof(pcap_pkthdr)+length))
{
struct timeval diff;
timersub(&pktHdr.ts, &currentPacketSequence_->lastPacket_->ts, &diff);
currentPacketSequence_->usecDelay_ = diff.tv_usec;
if (diff.tv_sec)
currentPacketSequence_->usecDelay_ += diff.tv_sec*1e6;
//! \todo (LOW): calculate sendqueue size
currentPacketSequence_ = new PacketSequence(trackStreamStats_);
packetSequenceList_.append(currentPacketSequence_);
// Validate that the pkt will fit inside the new currentSendQueue_
@ -169,6 +182,8 @@ bool PcapTxThread::appendToPacketList(long sec, long nsec,
packetCount_++;
packetListSize_ += repeatSize_ ?
currentPacketSequence_->repeatCount_ : 1;
// Last packet of packet-set?
if (repeatSize_ > 0 && packetCount_ == repeatSize_)
{
qDebug("repeatSequenceStart_=%d, repeatSize_ = %llu",
@ -190,7 +205,7 @@ bool PcapTxThread::appendToPacketList(long sec, long nsec,
repeatSize_ = 0;
// End current pktSeq and trigger a new pktSeq allocation for next pkt
// End current pktSeq
currentPacketSequence_ = NULL;
}
@ -206,6 +221,27 @@ void PcapTxThread::setPacketListLoopMode(
loopDelay_ = secDelay*long(1e6) + nsecDelay/1000;
}
void PcapTxThread::setPacketListTtagMarkers(
QList<uint> markers,
uint repeatInterval)
{
// XXX: Empty markers => no streams have Ttag
firstTtagPkt_ = markers.isEmpty() ? -1 : int(markers.first());
// Calculate delta markers
ttagDeltaMarkers_.clear();
for (int i = 1; i < markers.size(); i++)
ttagDeltaMarkers_.append(markers.at(i) - markers.at(i-1));
if (!markers.isEmpty()) {
ttagDeltaMarkers_.append(repeatInterval - markers.last()
+ markers.first());
qDebug() << "TtagRepeatInterval:" << repeatInterval;
qDebug() << "FirstTtagPkt:" << firstTtagPkt_;
qDebug() << "TtagMarkers:" << ttagDeltaMarkers_;
}
}
void PcapTxThread::setHandle(pcap_t *handle)
{
if (usingInternalHandle_)
@ -261,39 +297,45 @@ void PcapTxThread::run()
packetSequenceList_.at(i)->repeatCount_,
packetSequenceList_.at(i)->repeatSize_,
packetSequenceList_.at(i)->usecDelay_);
qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld", i,
qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld, ttagL4CksumOfs = %hu", i,
packetSequenceList_.at(i)->packets_,
packetSequenceList_.at(i)->usecDuration_);
packetSequenceList_.at(i)->usecDuration_,
packetSequenceList_.at(i)->ttagL4CksumOffset_);
}
qDebug() << "First Ttag: " << firstTtagPkt_
<< "Ttag Markers:" << ttagDeltaMarkers_;
lastStats_ = *stats_; // used for stream stats
// Init Ttag related vars. If no packets need ttag, firstTtagPkt_ is -1,
// so nextTagPkt_ is set to practically unreachable value (due to
// 64 bit counter wraparound time!)
ttagMarkerIndex_ = 0;
nextTtagPkt_ = stats_->pkts + firstTtagPkt_;
getTimeStamp(&startTime);
state_ = kRunning;
i = 0;
while (i < packetSequenceList_.size())
{
while (i < packetSequenceList_.size()) {
_restart:
int rptSz = packetSequenceList_.at(i)->repeatSize_;
int rptCnt = packetSequenceList_.at(i)->repeatCount_;
for (int j = 0; j < rptCnt; j++)
{
for (int k = 0; k < rptSz; k++)
{
for (int j = 0; j < rptCnt; j++) {
for (int k = 0; k < rptSz; k++) {
int ret;
PacketSequence *seq = packetSequenceList_.at(i+k);
#ifdef Q_OS_WIN32
TimeStamp ovrStart, ovrEnd;
if (seq->usecDuration_ <= long(1e6)) // 1s
{
// Use Windows-only pcap_sendqueue_transmit() if duration < 1s
// and no stream timing is configured
if (seq->usecDuration_ <= long(1e6) && firstTtagPkt_ < 0) {
getTimeStamp(&ovrStart);
ret = pcap_sendqueue_transmit(handle_,
seq->sendQueue_, kSyncTransmit);
if (ret >= 0)
{
if (ret >= 0) {
stats_->pkts += seq->packets_;
stats_->bytes += seq->bytes_;
@ -304,52 +346,42 @@ _restart:
}
if (stop_)
ret = -2;
}
else
{
ret = sendQueueTransmit(handle_, seq->sendQueue_,
} else {
ret = sendQueueTransmit(handle_, seq,
overHead, kSyncTransmit);
}
#else
ret = sendQueueTransmit(handle_, seq->sendQueue_,
ret = sendQueueTransmit(handle_, seq,
overHead, kSyncTransmit);
#endif
if (ret >= 0)
{
if (ret >= 0) {
long usecs = seq->usecDelay_ + overHead;
if (usecs > 0)
{
if (usecs > 0) {
(*udelayFn_)(usecs);
overHead = 0;
}
else
} else
overHead = usecs;
}
else
{
} else {
qDebug("error %d in sendQueueTransmit()", ret);
qDebug("overHead = %ld", overHead);
stop_ = false;
goto _exit;
}
}
}
} // rptSz
} // rptCnt
// Move to the next Packet Set
i += rptSz;
}
if (returnToQIdx_ >= 0)
{
if (returnToQIdx_ >= 0) {
long usecs = loopDelay_ + overHead;
if (usecs > 0)
{
if (usecs > 0) {
(*udelayFn_)(usecs);
overHead = 0;
}
else
} else
overHead = usecs;
i = returnToQIdx_;
@ -409,38 +441,77 @@ double PcapTxThread::lastTxDuration()
return lastTxDuration_;
}
int PcapTxThread::sendQueueTransmit(pcap_t *p,
pcap_send_queue *queue, long &overHead, int sync)
int PcapTxThread::sendQueueTransmit(pcap_t *p, PacketSequence *seq,
long &overHead, int sync)
{
TimeStamp ovrStart, ovrEnd;
struct timeval ts;
pcap_send_queue *queue = seq->sendQueue_;
struct pcap_pkthdr *hdr = (struct pcap_pkthdr*) queue->buffer;
char *end = queue->buffer + queue->len;
ts = hdr->ts;
getTimeStamp(&ovrStart);
while((char*) hdr < end)
{
while((char*) hdr < end) {
uchar *pkt = (uchar*)hdr + sizeof(*hdr);
int pktLen = hdr->caplen;
bool ttagPkt = false;
#if 0
quint16 origCksum = 0;
#endif
if (sync)
{
// Time for a T-Tag packet?
if (stats_->pkts == nextTtagPkt_) {
ttagPkt = true;
// XXX: write 2xBytes instead of 1xHalf-word to avoid
// potential alignment problem
*(pkt+pktLen-5) = SignProtocol::kTypeLenTtag;
*(pkt+pktLen-6) = ttagId_;
#if 0
// Recalc L4 checksum; use incremental checksum as per RFC 1624
// HC' = ~(~HC + ~m + m')
if (seq->ttagL4CksumOffset_) {
quint16 *cksum = reinterpret_cast<quint16*>(
pkt + seq->ttagL4CksumOffset_);
origCksum = qFromBigEndian<quint16>(*cksum);
// XXX: SignProtocol trailer
// ... | <guid> | 0x61 | 0x00 | 0x22 | 0x1d10c0da
// ... | <guid> | 0x61 | <TtagId> | 0x23 | 0x1d10c0da
// For odd pkt Length, Ttag spans across 2 half-words
// XXX: Hardcoded values instead of sign protocol constants
// used below for readability
quint32 newCksum = pktLen & 1 ?
quint16(~origCksum) + quint16(~0x221d) + 0x231d
+ quint16(~0x6100) + (0x6100 | ttagId_) :
quint16(~origCksum) + quint16(~0x0022) + (ttagId_ << 8 | 0x23);
while (newCksum > 0xffff)
newCksum = (newCksum & 0xffff) + (newCksum >> 16);
// XXX: For IPv4/UDP, if ~newcksum is 0x0000 we are supposed to
// set the checksum as 0xffff since 0x0000 indicates no cksum
// is present - we choose not to do this to avoid extra cost
*cksum = qToBigEndian(quint16(~newCksum));
}
#endif
ttagId_++;
nextTtagPkt_ += ttagDeltaMarkers_.at(ttagMarkerIndex_);
ttagMarkerIndex_++;
if (ttagMarkerIndex_ >= ttagDeltaMarkers_.size())
ttagMarkerIndex_ = 0;
}
if (sync) {
long usec = (hdr->ts.tv_sec - ts.tv_sec) * 1000000 +
(hdr->ts.tv_usec - ts.tv_usec);
getTimeStamp(&ovrEnd);
overHead -= udiffTimeStamp(&ovrStart, &ovrEnd);
Q_ASSERT(overHead <= 0);
usec += overHead;
if (usec > 0)
{
if (usec > 0) {
(*udelayFn_)(usec);
overHead = 0;
}
else
} else
overHead = usec;
ts = hdr->ts;
@ -453,16 +524,27 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p,
stats_->pkts++;
stats_->bytes += pktLen;
// Revert T-Tag packet changes
if (ttagPkt) {
*(pkt+pktLen-5) = SignProtocol::kTypeLenTtagPlaceholder;
*(pkt+pktLen-6) = 0;
#if 0
if (seq->ttagL4CksumOffset_) {
quint16 *cksum = reinterpret_cast<quint16*>(
pkt + seq->ttagL4CksumOffset_);
*cksum = qToBigEndian(origCksum);
}
#endif
}
// Step to the next packet in the buffer
hdr = (struct pcap_pkthdr*) (pkt + pktLen);
pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr)); // FIXME: superfluous?
if (stop_)
{
if (stop_) {
return -2;
}
}
return 0;
}
@ -613,4 +695,3 @@ void PcapTxThread::udelay(unsigned long usec)
QThread::usleep(usec);
#endif
}

View File

@ -42,6 +42,7 @@ public:
bool appendToPacketList(long sec, long usec, const uchar *packet,
int length);
void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay);
void setPacketListTtagMarkers(QList<uint> markers, uint repeatInterval);
void setHandle(pcap_t *handle);
@ -66,8 +67,8 @@ private:
};
static void udelay(unsigned long usec);
int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, long &overHead,
int sync);
int sendQueueTransmit(pcap_t *p, PacketSequence *seq,
long &overHead, int sync);
void updateTxStreamStats();
// Intermediate state variables used while building the packet list
@ -80,7 +81,7 @@ private:
quint64 packetListSize_; // count of pkts in packet List including repeats
int returnToQIdx_;
quint64 loopDelay_;
quint64 loopDelay_; // in nanosecs
void (*udelayFn_)(unsigned long);
@ -93,8 +94,17 @@ private:
StatsTuple *stats_;
StatsTuple lastStats_;
StreamStats streamStats_;
quint8 ttagId_{0};
double lastTxDuration_{0.0}; // in secs
// XXX: Ttag Marker config derived; not updated during Tx
int firstTtagPkt_;
QList<uint> ttagDeltaMarkers_;
// XXX: Ttag related; updated during Tx
int ttagMarkerIndex_;
quint64 nextTtagPkt_{0};
};
#endif

192
server/pcaptxttagstats.cpp Normal file
View File

@ -0,0 +1,192 @@
/*
Copyright (C) 2023 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#include "pcaptxttagstats.h"
#include "pcapextra.h"
#include "../common/debugdefs.h"
#include "../common/sign.h"
#include "streamtiming.h"
#define Xnotify qWarning // FIXME
PcapTxTtagStats::PcapTxTtagStats(const char *device, int id)
: portId_(id)
{
setObjectName(QString("TxT$:%1").arg(device));
device_ = QString::fromLatin1(device);
timing_ = StreamTiming::instance();
}
void PcapTxTtagStats::run()
{
int flags = PCAP_OPENFLAG_PROMISCUOUS;
char errbuf[PCAP_ERRBUF_SIZE] = "";
struct bpf_program bpf;
const int optimize = 1;
QString capture_filter = QString(
"(ether[len - 4:4] == 0x%1) and (ether[len - 5:1] == 0x%2)")
.arg(SignProtocol::magic(), 0, BASE_HEX)
.arg(SignProtocol::kTypeLenTtag, 0, BASE_HEX);
qDebug("In %s", __PRETTY_FUNCTION__);
qDebug("pcap-filter: %s", qPrintable(capture_filter));
handle_ = pcap_open_live(qPrintable(device_), 65535,
flags, 100 /* ms */, errbuf);
if (!handle_) {
if (flags && QString(errbuf).contains("promiscuous")) {
Xnotify("Unable to set promiscuous mode on <%s> - "
"stream stats time tracking will not work", qPrintable(device_));
goto _exit;
}
else {
Xnotify("Unable to open <%s> [%s] - stream stats rx will not work",
qPrintable(device_), errbuf);
goto _exit;
}
}
#ifdef Q_OS_WIN32
// pcap_setdirection() API is not supported in Windows.
// NOTE: WinPcap 4.1.1 and above exports a dummy API that returns -1
// but since we would like to work with previous versions of WinPcap
// also, we assume the API does not exist
isDirectional_ = false;
#else
if (pcap_setdirection(handle_, PCAP_D_OUT) < 0) {
qDebug("TxTtagStats: Error setting OUT direction %s: %s\n",
qPrintable(device_), pcap_geterr(handle_));
isDirectional_ = false;
}
#endif
if (pcap_compile(handle_, &bpf, qPrintable(capture_filter),
optimize, 0) < 0) {
qWarning("%s: error compiling filter: %s", qPrintable(device_),
pcap_geterr(handle_));
goto _skip_filter;
}
if (pcap_setfilter(handle_, &bpf) < 0) {
qWarning("%s: error setting filter: %s", qPrintable(device_),
pcap_geterr(handle_));
goto _skip_filter;
}
_skip_filter:
clearDebugStats();
PcapSession::preRun();
state_ = kRunning;
while (1) {
int ret;
struct pcap_pkthdr *hdr;
const uchar *data;
ret = pcap_next_ex(handle_, &hdr, &data);
switch (ret) {
case 1: {
uint ttagId;
uint guid;
if (SignProtocol::packetTtagId(data, hdr->caplen,
&ttagId, &guid)) {
#ifdef Q_OS_WIN32
// TxPort is NOT us ==> Rx Packet, so skip
// See similar check in PcapRxStats for details
if (ttagId >> 8 != uint(portId_))
break;
ttagId &= 0xFF;
#endif
timing_->recordTxTime(portId_, guid, ttagId, hdr->ts);
timingDebug("[%d TX] %ld:%ld ttag %u guid %u", portId_,
hdr->ts.tv_sec, long(hdr->ts.tv_usec), ttagId, guid);
}
break;
}
case 0:
// timeout: just go back to the loop
break;
case -1:
qWarning("%s: error reading packet (%d): %s",
__PRETTY_FUNCTION__, ret, pcap_geterr(handle_));
break;
case -2:
qDebug("%s: Loop/signal break or some other error",
__PRETTY_FUNCTION__);
break;
default:
qWarning("%s: Unexpected return value %d",
__PRETTY_FUNCTION__, ret);
stop_ = true;
}
if (stop_) {
qDebug("user requested txTtagStats stop");
break;
}
}
PcapSession::postRun();
pcap_close(handle_);
handle_ = NULL;
stop_ = false;
_exit:
state_ = kFinished;
}
bool PcapTxTtagStats::start()
{
if (state_ == kRunning) {
qWarning("TxTtagStats start requested but is already running!");
goto _exit;
}
state_ = kNotStarted;
PcapSession::start();
while (state_ == kNotStarted)
QThread::msleep(10);
_exit:
return true;
}
bool PcapTxTtagStats::stop()
{
if (state_ == kRunning) {
stop_ = true;
PcapSession::stop();
while (state_ == kRunning)
QThread::msleep(10);
}
else
qWarning("TxTtagStats stop requested but is not running!");
return true;
}
bool PcapTxTtagStats::isRunning()
{
return (state_ == kRunning);
}
bool PcapTxTtagStats::isDirectional()
{
return isDirectional_;
}

55
server/pcaptxttagstats.h Normal file
View File

@ -0,0 +1,55 @@
/*
Copyright (C) 2023 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#ifndef _PCAP_TX_TTAG_H
#define _PCAP_TX_TTAG_H
#include "pcapsession.h"
class StreamTiming;
class PcapTxTtagStats: public PcapSession
{
public:
PcapTxTtagStats(const char *device, int id);
void run();
bool start();
bool stop();
bool isRunning();
bool isDirectional();
private:
enum State {
kNotStarted,
kRunning,
kFinished
};
QString device_;
bool isDirectional_{true};
volatile State state_{kNotStarted};
volatile bool stop_{false};
int portId_;
StreamTiming *timing_{nullptr};
};
#endif

View File

@ -30,6 +30,7 @@ struct StreamStatsTuple
quint64 tx_bytes;
};
// Key(uint) is GUID
typedef QHash<uint, StreamStatsTuple> StreamStats;
typedef QHashIterator<uint, StreamStatsTuple> StreamStatsIterator;

246
server/streamtiming.cpp Normal file
View File

@ -0,0 +1,246 @@
/*
Copyright (C) 2023 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#include "streamtiming.h"
#include "../common/debugdefs.h"
#include "timestamp.h"
#include <QCoreApplication>
StreamTiming::StreamTiming(QObject *parent)
: QObject(parent)
{
// This class MUST be part of the main thread so that timers can work
Q_ASSERT(this->thread() == QCoreApplication::instance()->thread());
timer_ = new QTimer(this);
connect(timer_, &QTimer::timeout, this, &StreamTiming::processRecords);
timer_->setInterval(3000);
gcTimer_ = new QTimer(this);
connect(gcTimer_, &QTimer::timeout, this, &StreamTiming::deleteStaleRecords);
gcTimer_->setInterval(30000);
}
void StreamTiming::start(uint portId)
{
if (activePortSet_.isEmpty()) { // First port?
timer_->start();
gcTimer_->start();
qDebug("Stream Latency tracking started");
}
activePortSet_.insert(portId);
qDebug("Stream Latency tracking started for port %u", portId);
}
void StreamTiming::stop(uint portId)
{
activePortSet_.remove(portId);
qDebug("Stream Latency tracking stopped for port %u", portId);
if (activePortSet_.isEmpty()) { // Last port?
processRecords();
deleteStaleRecords();
timer_->stop();
gcTimer_->stop();
qDebug("Stream Latency tracking stopped");
}
}
bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId,
const struct timespec &timestamp)
{
TxRxKey key = makeKey(guid, ttagId);
TtagData value = { .timeStamp = timestamp, .portId = portId};
QMutexLocker locker(&txHashLock_);
txHash_.insert(key, value);
return true;
}
bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId,
const struct timespec &timestamp)
{
TxRxKey key = makeKey(guid, ttagId);
TtagData value = { .timeStamp = timestamp, .portId = portId};
QMutexLocker locker(&rxHashLock_);
rxHash_.insert(key, value);
return true;
}
bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId,
const struct timeval &timestamp)
{
struct timespec ts;
ts.tv_sec = timestamp.tv_sec;
ts.tv_nsec = timestamp.tv_usec*1000;
return recordTxTime(portId, guid, ttagId, ts);
}
bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId,
const struct timeval &timestamp)
{
struct timespec ts;
ts.tv_sec = timestamp.tv_sec;
ts.tv_nsec = timestamp.tv_usec*1000;
return recordRxTime(portId, guid, ttagId, ts);
}
quint64 StreamTiming::delay(uint portId, uint guid)
{
Q_ASSERT(guid <= SignProtocol::kMaxGuid);
// Process anything pending first
processRecords();
QMutexLocker locker(&timingLock_);
if (!timing_.contains(portId))
return 0;
Timing t = timing_.value(portId)->value(guid);
if (t.countDelays == 0)
return 0;
return timespecToNsecs(t.sumDelays)/t.countDelays;
}
void StreamTiming::clear(uint portId, uint guid)
{
// XXX: We need to clear only the final timing hash; rx/tx hashes
// are cleared by StreamTiming itself as part of processRecords and
// deleteStaleRecords respectively
QMutexLocker locker(&timingLock_);
if (!timing_.contains(portId))
return;
PortTiming *portTiming = timing_.value(portId);
if (!portTiming)
return;
if (guid == SignProtocol::kInvalidGuid)
portTiming->clear(); // remove ALL guids
else
portTiming->remove(guid);
}
int StreamTiming::processRecords()
{
// TODO: yield after a certain count of records or time when called in
// timer context; when called from delay(), process ALL
int count = 0;
QMutexLocker txLocker(&txHashLock_);
QMutexLocker rxLocker(&rxHashLock_);
QMutexLocker timingLocker(&timingLock_);
auto i = rxHash_.begin();
while (i != rxHash_.end()) {
if (txHash_.contains(i.key())) {
struct timespec txTime = txHash_.take(i.key()).timeStamp;
struct timespec rxTime = i.value().timeStamp;
struct timespec diff;
timespecsub(&rxTime, &txTime, &diff);
uint guid = i.key() >> 8;
uint portId = i.value().portId;
if (!timing_.contains(portId))
timing_.insert(portId, new PortTiming);
PortTiming *portTiming = timing_.value(portId);
Timing &guidTiming = (*portTiming)[guid];
timespecadd(&guidTiming.sumDelays, &diff, &guidTiming.sumDelays);
guidTiming.countDelays++;
count++;
timingDebug("[%u/%u/%u] diff %ld.%09ld (%ld.%09ld - %ld.%09ld)",
i.value().portId, guid, i.key() & 0xFF,
diff.tv_sec, diff.tv_nsec,
rxTime.tv_sec, rxTime.tv_nsec,
txTime.tv_sec, txTime.tv_nsec);
timingDebug("%d:[%u/%u] total %ld.%09ld count %u",
count, i.value().portId, guid,
guidTiming.sumDelays.tv_sec, guidTiming.sumDelays.tv_nsec,
guidTiming.countDelays);
}
i = rxHash_.erase(i);
}
Q_ASSERT(rxHash_.isEmpty());
return count;
}
int StreamTiming::deleteStaleRecords()
{
// TODO: yield after a certain count of records or time unless we are
// idle when we process all; how do we determine we are "idle"?
// XXX: We assume the Tx packet timestamps are based on CLOCK_REALTIME
// (or a similar and comparable source). Since garbage collection timer
// is not a short interval, it need not be the exact same source as long
// as the values are comparable
int count = 0;
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
// XXX: processRecords() iterates and deletes all rx records irrespective
// of whether it found a matching tx record. So for garbage collection we
// only need to look at (and delete) tx records
QMutexLocker locker(&txHashLock_);
auto i = txHash_.begin();
while (i != txHash_.end()) {
struct timespec txTime = i.value().timeStamp;
struct timespec diff;
timespecsub(&now, &txTime, &diff);
timingDebug("gc diff %ld", diff.tv_sec);
if (diff.tv_sec > 30) {
i = txHash_.erase(i);
count++;
} else {
i++;
}
}
if (count)
qDebug("Latency garbage collected %d stale tx timing records", count);
return count;
}
StreamTiming* StreamTiming::instance()
{
static StreamTiming *instance{nullptr};
// XXX: As of this writing, AbstractPort constructor is the first one
// to call this - hence this singleton is created when the first port
// is created
if (!instance)
instance = new StreamTiming(QCoreApplication::instance());
return instance;
}

100
server/streamtiming.h Normal file
View File

@ -0,0 +1,100 @@
/*
Copyright (C) 2023 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#ifndef _STREAM_TIMING
#define _STREAM_TIMING
#include "../common/sign.h"
#include <QHash>
#include <QMutex>
#include <QSet>
#include <QTimer>
#include <time.h>
class StreamTiming : public QObject
{
Q_OBJECT
public:
bool recordTxTime(uint portId, uint guid, uint ttagId,
const struct timespec &timestamp);
bool recordRxTime(uint portId, uint guid, uint ttagId,
const struct timespec &timestamp);
bool recordTxTime(uint portId, uint guid, uint ttagId,
const struct timeval &timestamp);
bool recordRxTime(uint portId, uint guid, uint ttagId,
const struct timeval &timestamp);
quint64 delay(uint portId, uint guid);
void clear(uint portId, uint guid = SignProtocol::kInvalidGuid);
static StreamTiming* instance();
public slots:
void start(uint portId);
void stop(uint portId);
private:
StreamTiming(QObject *parent=nullptr);
int processRecords();
int deleteStaleRecords();
quint32 makeKey(uint guid, uint ttagId) {
return guid << 8 | (ttagId & 0xFF);
}
// XXX: use only time intervals, not absolute time
quint64 timespecToNsecs(const struct timespec &interval) {
return interval.tv_nsec + interval.tv_sec*1e9;
}
struct TtagData {
struct timespec timeStamp; // nanosec resolution
uint portId;
};
struct Timing {
struct timespec sumDelays; // nanosec resolution
uint countDelays;
};
QSet<uint> activePortSet_;
// XXX: TxRxKey = guid (24 bit MSB) + ttagid (8 bit LSB)
// TODO: encode tx port in in packet and use as part of key
typedef quint32 TxRxKey;
QHash<TxRxKey, TtagData> txHash_;
QHash<TxRxKey, TtagData> rxHash_;
QMutex txHashLock_;
QMutex rxHashLock_;
typedef uint PortIdKey;
typedef uint GuidKey;
typedef QHash<GuidKey, Timing> PortTiming;
QHash<PortIdKey, PortTiming*> timing_;
QMutex timingLock_;
QTimer *timer_; // Periodic timer to process tx/rx records
QTimer *gcTimer_; // Garbage collection for stale tx records
};
#endif

View File

@ -21,6 +21,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#define _TIMESTAMP_H
#include "timespecops.h"
#include "timevalops.h"
#include <QtGlobal>
@ -69,6 +70,7 @@ static long inline udiffTimeStamp(const TimeStamp *start, const TimeStamp *end)
#endif
#elif defined(Q_OS_WIN32)
#include <windows.h>
static quint64 gTicksFreq;
typedef LARGE_INTEGER TimeStamp;
static void inline getTimeStamp(TimeStamp* stamp)

54
server/timevalops.h Normal file
View File

@ -0,0 +1,54 @@
/*
This file is part of "Ostinato"
These macros are copied from BSD sys/time.h
*/
#ifndef _TIME_VAL_OPS
#define _TIME_VAL_OPS
/* Operations on timeval - for platforms where some are not already defined*/
#if defined(Q_OS_WIN32)
#ifndef timerclear
#define timerclear(tvp) ((tvp)->tv_sec = (tvp)->tv_usec = 0)
#endif
#ifndef timerisset
#define timerisset(tvp) ((tvp)->tv_sec || (tvp)->tv_usec)
#endif
#ifndef timercmp
#define timercmp(tvp, uvp, cmp) \
(((tvp)->tv_sec == (uvp)->tv_sec) ? \
g((tvp)->tv_usec cmp (uvp)->tv_usec) : \
g((tvp)->tv_sec cmp (uvp)->tv_sec))
#endif
#ifndef timeradd
#define timeradd(tvp, uvp, vvp) \
do { \
(vvp)->tv_sec = (tvp)->tv_sec + (uvp)->tv_sec; \
(vvp)->tv_usec = (tvp)->tv_usec + (uvp)->tv_usec; \
if ((vvp)->tv_usec >= 1000000) { \
(vvp)->tv_sec++; \
(vvp)->tv_usec -= 1000000; \
} \
} while (0)
#endif
#ifndef timersub
#define timersub(tvp, uvp, vvp) \
do { \
(vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \
(vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \
if ((vvp)->tv_usec < 0) { \
(vvp)->tv_sec--; \
(vvp)->tv_usec += 1000000; \
} \
} while (0)
#endif
#endif /* Q_OS_WIN32 */
#endif