Compare commits
50 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
ca956c3c18 | ||
|
f98c8af594 | ||
|
accc47fa34 | ||
|
6f71844f7c | ||
|
bd2a4715dc | ||
|
4886739da6 | ||
|
80a4578847 | ||
|
390ffae2b4 | ||
|
d17ab7ab42 | ||
|
5abd6fb962 | ||
|
3c0bc067fa | ||
|
223e44a6e3 | ||
|
6108de9b4f | ||
|
e761bfa5c4 | ||
|
b9345463c4 | ||
|
dc6c4963a2 | ||
|
cded62246e | ||
|
10befe0a66 | ||
|
4394c7ffee | ||
|
ef1c166e7f | ||
|
d1d2a5c1b5 | ||
|
f56ce2e2ec | ||
|
c91475d416 | ||
|
b2ad3c5d08 | ||
|
896371b987 | ||
|
f5bb2e5d80 | ||
|
7cfccd686e | ||
|
2e502434db | ||
|
aa140cd32a | ||
|
3b499263ec | ||
|
c378600baf | ||
|
68734c44ca | ||
|
05a9dd5743 | ||
|
05335b31d5 | ||
|
f4c21e1ae4 | ||
|
757d3f1b24 | ||
|
7e30ef5541 | ||
|
f7b6b46a5d | ||
|
823f01557b | ||
|
682d0cc5c9 | ||
|
f1cfaa6e89 | ||
|
072dfcdc3b | ||
|
90a3731a90 | ||
|
5d4a19174e | ||
|
2104936b69 | ||
|
5dc1b851cc | ||
|
8e25669a0e | ||
|
ebccc44cdf | ||
|
f3a9b507b0 | ||
|
620004d46b |
@ -209,7 +209,7 @@ void DeviceGroupDialog::updateTotalDeviceCount()
|
||||
|
||||
void DeviceGroupDialog::updateIp4Gateway()
|
||||
{
|
||||
quint32 net = ip4Address->value() & (~0 << (32 - ip4PrefixLength->value()));
|
||||
quint32 net = ip4Address->value() & (~0UL << (32 - ip4PrefixLength->value()));
|
||||
ip4Gateway->setValue(net | 0x01);
|
||||
}
|
||||
|
||||
|
@ -51,6 +51,7 @@ enum {
|
||||
kAvgRxFrameRate,
|
||||
kAvgTxBitRate,
|
||||
kAvgRxBitRate,
|
||||
kAvgLatency,
|
||||
kMaxAggrStreamStats
|
||||
};
|
||||
static QStringList aggrStatTitles = QStringList()
|
||||
@ -61,7 +62,8 @@ static QStringList aggrStatTitles = QStringList()
|
||||
<< "Avg\nTx PktRate"
|
||||
<< "Avg\nRx PktRate"
|
||||
<< "Avg\nTx BitRate"
|
||||
<< "Avg\nRx BitRate";
|
||||
<< "Avg\nRx BitRate"
|
||||
<< "Avg\nLatency";
|
||||
|
||||
static const uint kAggrGuid = 0xffffffff;
|
||||
|
||||
@ -184,6 +186,12 @@ QVariant StreamStatsModel::data(const QModelIndex &index, int role) const
|
||||
(aggrGuidStats_.value(guid).rxBytes
|
||||
+ 24 * aggrGuidStats_.value(guid).rxPkts) * 8
|
||||
/ aggrGuidStats_.value(guid).txDuration);
|
||||
case kAvgLatency:
|
||||
return aggrGuidStats_.value(guid).latencyCount <= 0
|
||||
|| aggrGuidStats_.value(guid).latencySum <= 0 ? QString("-") :
|
||||
XLocale().toTimeIntervalString(
|
||||
aggrGuidStats_.value(guid).latencySum
|
||||
/ aggrGuidStats_.value(guid).latencyCount);
|
||||
default:
|
||||
break;
|
||||
};
|
||||
@ -258,6 +266,7 @@ void StreamStatsModel::appendStreamStatsList(
|
||||
ss.txPkts = s.tx_pkts();
|
||||
ss.rxBytes = s.rx_bytes();
|
||||
ss.txBytes = s.tx_bytes();
|
||||
ss.rxLatency = s.latency();
|
||||
|
||||
aggrPort.rxPkts += ss.rxPkts;
|
||||
aggrPort.txPkts += ss.txPkts;
|
||||
@ -271,6 +280,10 @@ void StreamStatsModel::appendStreamStatsList(
|
||||
aggrGuid.txBytes += ss.txBytes;
|
||||
if (s.tx_duration() > aggrGuid.txDuration)
|
||||
aggrGuid.txDuration = s.tx_duration(); // XXX: use largest or avg?
|
||||
if (ss.rxLatency) {
|
||||
aggrGuid.latencySum += ss.rxLatency;
|
||||
aggrGuid.latencyCount++;
|
||||
}
|
||||
|
||||
aggrAggr.rxPkts += ss.rxPkts;
|
||||
aggrAggr.txPkts += ss.txPkts;
|
||||
@ -279,6 +292,10 @@ void StreamStatsModel::appendStreamStatsList(
|
||||
aggrAggr.txBytes += ss.txBytes;
|
||||
if (aggrGuid.txDuration > aggrAggr.txDuration)
|
||||
aggrAggr.txDuration = aggrGuid.txDuration;
|
||||
if (ss.rxLatency) {
|
||||
aggrAggr.latencySum += ss.rxLatency;
|
||||
aggrAggr.latencyCount++;
|
||||
}
|
||||
|
||||
if (!portList_.contains(pgp))
|
||||
portList_.append(pgp);
|
||||
|
@ -57,6 +57,7 @@ private:
|
||||
quint64 txPkts;
|
||||
quint64 rxBytes;
|
||||
quint64 txBytes;
|
||||
quint64 rxLatency;
|
||||
};
|
||||
struct AggrGuidStats {
|
||||
quint64 rxPkts;
|
||||
@ -65,6 +66,8 @@ private:
|
||||
quint64 txBytes;
|
||||
qint64 pktLoss;
|
||||
double txDuration;
|
||||
quint64 latencySum;
|
||||
uint latencyCount;
|
||||
};
|
||||
QList<Guid> guidList_;
|
||||
QList<PortGroupPort> portList_;
|
||||
|
@ -100,6 +100,22 @@ public:
|
||||
|
||||
return QObject::tr("%L1 bps").arg(bps, 0, 'f', 4);
|
||||
}
|
||||
|
||||
QString toTimeIntervalString(qint64 nanosecs) const
|
||||
{
|
||||
QString text;
|
||||
|
||||
if (nanosecs >= 1e9)
|
||||
return QObject::tr("%L1 s").arg(nanosecs/1e9, 0, 'f', 3);
|
||||
|
||||
if (nanosecs >= 1e6)
|
||||
return QObject::tr("%L1 ms").arg(nanosecs/1e6, 0, 'f', 3);
|
||||
|
||||
if (nanosecs >= 1e3)
|
||||
return QObject::tr("%L1 us").arg(nanosecs/1e3, 0, 'f', 3);
|
||||
|
||||
return QObject::tr("%L1 ns").arg(nanosecs);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
29
common/debugdefs.h
Normal file
29
common/debugdefs.h
Normal file
@ -0,0 +1,29 @@
|
||||
/*
|
||||
Copyright (C) 2023 Srivats P.
|
||||
|
||||
This file is part of "Ostinato"
|
||||
|
||||
This is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
#ifndef _DEBUG_DEFS_H
|
||||
#define _DEBUG_DEFS_H
|
||||
|
||||
#if 0
|
||||
#define timingDebug(fmt, ...) qDebug("TIMING:" fmt, __VA_ARGS__)
|
||||
#else
|
||||
#define timingDebug(...)
|
||||
#endif
|
||||
|
||||
#endif
|
@ -113,6 +113,7 @@ SOURCES += \
|
||||
udp.cpp \
|
||||
textproto.cpp \
|
||||
hexdump.cpp \
|
||||
packet.cpp \
|
||||
payload.cpp \
|
||||
sample.cpp \
|
||||
sign.cpp \
|
||||
|
117
common/packet.cpp
Normal file
117
common/packet.cpp
Normal file
@ -0,0 +1,117 @@
|
||||
/*
|
||||
Copyright (C) 2023 Srivats P.
|
||||
|
||||
This file is part of "Ostinato"
|
||||
|
||||
This is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
#include "packet.h"
|
||||
|
||||
using namespace Packet;
|
||||
|
||||
quint16 Packet::l4ChecksumOffset(const uchar *pktData, int pktLen)
|
||||
{
|
||||
Parser parser(pktData, pktLen);
|
||||
quint16 offset = kEthTypeOffset;
|
||||
|
||||
// Skip VLANs, if any
|
||||
quint16 ethType = parser.field16(offset);
|
||||
if (!parser.ok()) return 0;
|
||||
|
||||
// TODO: support 802.3 frames
|
||||
if (ethType <= 1500)
|
||||
return 0;
|
||||
|
||||
while (kVlanEthTypes.contains(ethType)) {
|
||||
offset += kVlanTagSize;
|
||||
ethType = parser.field16(offset);
|
||||
if (!parser.ok()) return 0;
|
||||
}
|
||||
offset += kEthTypeSize;
|
||||
|
||||
// XXX: offset now points to Eth payload
|
||||
|
||||
// Skip MPLS tags, if any
|
||||
if (ethType == kMplsEthType) {
|
||||
while (1) {
|
||||
quint32 mplsTag = parser.field32(offset);
|
||||
if (!parser.ok()) return 0;
|
||||
offset += kMplsTagSize;
|
||||
if (mplsTag & 0x100) { // BOS bit
|
||||
quint32 nextWord = parser.field32(offset);
|
||||
if (!parser.ok()) return 0;
|
||||
if (nextWord == 0) { // PW Control Word
|
||||
offset += kMplsTagSize;
|
||||
ethType = 0;
|
||||
break;
|
||||
}
|
||||
quint8 firstPayloadNibble = nextWord >> 28;
|
||||
if (firstPayloadNibble == 0x4)
|
||||
ethType = kIp4EthType;
|
||||
else if (firstPayloadNibble == 0x6)
|
||||
ethType = kIp6EthType;
|
||||
else
|
||||
ethType = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
quint8 ipProto = 0;
|
||||
if (ethType == kIp4EthType) {
|
||||
ipProto = parser.field8(offset + kIp4ProtocolOffset);
|
||||
if (!parser.ok()) return 0;
|
||||
|
||||
quint8 ipHdrLen = parser.field8(offset) & 0x0F;
|
||||
if (!parser.ok()) return 0;
|
||||
offset += 4*ipHdrLen;
|
||||
} else if (ethType == kIp6EthType) {
|
||||
ipProto = parser.field8(offset + kIp6NextHeaderOffset);
|
||||
if (!parser.ok()) return 0;
|
||||
offset += kIp6HeaderSize;
|
||||
|
||||
// XXX: offset now points to IPv6 payload
|
||||
|
||||
// Skip IPv6 extension headers, if any
|
||||
while (kIp6ExtensionHeaders.contains(ipProto)) {
|
||||
ipProto = parser.field8(offset + kIp6ExtNextHeaderOffset);
|
||||
if (!parser.ok()) return 0;
|
||||
|
||||
quint16 extHdrLen = parser.field8(offset + kIp6ExtLengthOffset);
|
||||
offset += 8 + 8*extHdrLen;
|
||||
}
|
||||
} else {
|
||||
// Non-IP
|
||||
// TODO: support MPLS PW with Eth payload
|
||||
return 0;
|
||||
}
|
||||
|
||||
// XXX: offset now points to IP payload
|
||||
|
||||
if (ipProto == kIpProtoTcp) {
|
||||
parser.field16(offset + kTcpChecksumOffset);
|
||||
if (!parser.ok()) return 0;
|
||||
|
||||
return offset + kTcpChecksumOffset;
|
||||
} else if (ipProto == kIpProtoUdp) {
|
||||
parser.field16(offset + kUdpChecksumOffset);
|
||||
if (!parser.ok()) return 0;
|
||||
|
||||
return offset + kUdpChecksumOffset;
|
||||
}
|
||||
|
||||
// No L4
|
||||
return 0;
|
||||
}
|
112
common/packet.h
Normal file
112
common/packet.h
Normal file
@ -0,0 +1,112 @@
|
||||
/*
|
||||
Copyright (C) 2023 Srivats P.
|
||||
|
||||
This file is part of "Ostinato"
|
||||
|
||||
This is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
#ifndef _PACKET_H
|
||||
#define _PACKET_H
|
||||
|
||||
#include <QSet>
|
||||
#include <QtGlobal>
|
||||
|
||||
namespace Packet {
|
||||
|
||||
class Parser {
|
||||
public:
|
||||
Parser(const uchar *data, int length)
|
||||
: pktData_(data), pktLen_(length) {}
|
||||
quint8 field8(int offset) {
|
||||
if (offset >= pktLen_) {
|
||||
ok_ = false;
|
||||
return 0;
|
||||
}
|
||||
ok_ = true;
|
||||
return pktData_[offset];
|
||||
}
|
||||
quint16 field16(int offset) {
|
||||
if (offset + 1 >= pktLen_) {
|
||||
ok_ = false;
|
||||
return 0;
|
||||
}
|
||||
ok_ = true;
|
||||
return pktData_[offset] << 8
|
||||
| pktData_[offset+1];
|
||||
}
|
||||
quint32 field32(int offset) {
|
||||
if (offset + 3 >= pktLen_) {
|
||||
ok_ = false;
|
||||
return 0;
|
||||
}
|
||||
ok_ = true;
|
||||
return pktData_[offset] << 24
|
||||
| pktData_[offset+1] << 16
|
||||
| pktData_[offset+2] << 8
|
||||
| pktData_[offset+3];
|
||||
}
|
||||
bool ok() {
|
||||
return ok_;
|
||||
}
|
||||
private:
|
||||
const uchar *pktData_;
|
||||
int pktLen_;
|
||||
bool ok_{false};
|
||||
};
|
||||
|
||||
quint16 l4ChecksumOffset(const uchar *pktData, int pktLen);
|
||||
|
||||
//
|
||||
// Constants
|
||||
//
|
||||
// Ethernet
|
||||
const quint16 kEthTypeOffset = 12;
|
||||
const quint16 kEthTypeSize = 2;
|
||||
const quint16 kIp4EthType = 0x0800;
|
||||
const quint16 kIp6EthType = 0x86dd;
|
||||
const quint16 kMplsEthType = 0x8847;
|
||||
const QSet<quint16> kVlanEthTypes = {0x8100, 0x9100, 0x88a8};
|
||||
|
||||
// VLAN
|
||||
const quint16 kVlanTagSize = 4;
|
||||
|
||||
// MPLS
|
||||
const quint16 kMplsTagSize = 4;
|
||||
|
||||
// IPv4
|
||||
const quint16 kIp4ProtocolOffset = 9;
|
||||
|
||||
// IPv6
|
||||
const quint16 kIp6HeaderSize = 40;
|
||||
const quint16 kIp6NextHeaderOffset = 6;
|
||||
|
||||
// IPv6 Extension Header
|
||||
const quint16 kIp6ExtNextHeaderOffset = 0;
|
||||
const quint16 kIp6ExtLengthOffset = 1;
|
||||
|
||||
// IPv4/IPv6 Proto/NextHeader values
|
||||
const quint8 kIpProtoTcp = 6;
|
||||
const quint8 kIpProtoUdp = 17;
|
||||
|
||||
const QSet<quint8> kIp6ExtensionHeaders = {0, 60, 43, 44, 51, 50, 60, 135}; // FIXME: use names
|
||||
|
||||
// TCP
|
||||
const quint16 kTcpChecksumOffset = 16;
|
||||
|
||||
// UDP
|
||||
const quint16 kUdpChecksumOffset = 6;
|
||||
};
|
||||
|
||||
#endif
|
@ -292,6 +292,7 @@ message StreamStats {
|
||||
required StreamGuid stream_guid = 2;
|
||||
|
||||
optional double tx_duration = 3; // in seconds
|
||||
optional uint64 latency = 4; // in nanoseconds
|
||||
|
||||
optional uint64 rx_pkts = 11;
|
||||
optional uint64 rx_bytes = 12;
|
||||
|
@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
#include "sign.h"
|
||||
|
||||
#include "../common/streambase.h"
|
||||
|
||||
SignProtocol::SignProtocol(StreamBase *stream, AbstractProtocol *parent)
|
||||
: AbstractProtocol(stream, parent)
|
||||
{
|
||||
@ -76,7 +78,9 @@ AbstractProtocol::FieldFlags SignProtocol::fieldFlags(int index) const
|
||||
switch (index)
|
||||
{
|
||||
case sign_magic:
|
||||
case sign_tlv_tx_port:
|
||||
case sign_tlv_guid:
|
||||
case sign_tlv_ttag:
|
||||
case sign_tlv_end:
|
||||
break;
|
||||
|
||||
@ -116,6 +120,52 @@ QVariant SignProtocol::fieldData(int index, FieldAttrib attrib,
|
||||
}
|
||||
break;
|
||||
}
|
||||
case sign_tlv_ttag:
|
||||
{
|
||||
switch(attrib)
|
||||
{
|
||||
case FieldName:
|
||||
return QString("T-Tag");
|
||||
case FieldValue:
|
||||
return 0;
|
||||
case FieldTextValue:
|
||||
return QString("%1").arg(0);
|
||||
case FieldFrameValue:
|
||||
{
|
||||
QByteArray fv;
|
||||
fv.resize(2);
|
||||
fv[0] = 0;
|
||||
fv[1] = kTypeLenTtagPlaceholder;
|
||||
return fv;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case sign_tlv_tx_port:
|
||||
{
|
||||
switch(attrib)
|
||||
{
|
||||
case FieldName:
|
||||
return QString("TxPort");
|
||||
case FieldValue:
|
||||
return mpStream->portId();
|
||||
case FieldTextValue:
|
||||
return QString("%1").arg(mpStream->portId());
|
||||
case FieldFrameValue:
|
||||
{
|
||||
QByteArray fv;
|
||||
fv.resize(2);
|
||||
fv[0] = mpStream->portId() & 0xFF;
|
||||
fv[1] = kTypeLenTxPort;
|
||||
return fv;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case sign_tlv_guid:
|
||||
{
|
||||
quint32 guid = data.stream_guid() & 0xFFFFFF;
|
||||
@ -217,3 +267,27 @@ bool SignProtocol::packetGuid(const uchar *pkt, int pktLen, uint *guid)
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool SignProtocol::packetTtagId(const uchar *pkt, int pktLen, uint *ttagId, uint *guid)
|
||||
{
|
||||
bool ret = false;
|
||||
const uchar *p = pkt + pktLen - sizeof(kSignMagic);
|
||||
quint32 magic = qFromBigEndian<quint32>(p);
|
||||
if (magic != kSignMagic)
|
||||
return ret;
|
||||
|
||||
*guid = kInvalidGuid;
|
||||
p--;
|
||||
while (*p != kTypeLenEnd) {
|
||||
if (*p == kTypeLenTtag) {
|
||||
*ttagId = *(p - 1);
|
||||
ret = true;
|
||||
} else if (*p == kTypeLenGuid) {
|
||||
*guid = qFromBigEndian<quint32>(p - 3) >> 8;
|
||||
} else if (*p == kTypeLenTxPort) {
|
||||
*ttagId |= uint(*(p - 1)) << 8;
|
||||
}
|
||||
p -= 1 + (*p >> 5); // move to next TLV
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -23,6 +23,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
#include "abstractprotocol.h"
|
||||
#include "sign.pb.h"
|
||||
|
||||
#include <limits.h>
|
||||
|
||||
/*
|
||||
Sign Protocol is expected at the end of the frame (just before the Eth FCS)
|
||||
---+--------+-------+
|
||||
@ -42,6 +44,9 @@ TLVs are encoded as
|
||||
Defined TLVs
|
||||
Type = 0, Len = 0 (0x00): End of TLVs
|
||||
Type = 1, Len = 3 (0x61): Stream GUID
|
||||
Type = 2, Len = 1 (0x22): T-Tag Placeholder (0 value)
|
||||
Type = 3, Len = 1 (0x23): T-Tag with actual value
|
||||
Type = 4, Len = 1 (0x24): Tx Port Id
|
||||
*/
|
||||
|
||||
class SignProtocol : public AbstractProtocol
|
||||
@ -51,7 +56,9 @@ public:
|
||||
{
|
||||
// Frame Fields
|
||||
sign_tlv_end = 0,
|
||||
sign_tlv_tx_port,
|
||||
sign_tlv_guid,
|
||||
sign_tlv_ttag,
|
||||
sign_magic,
|
||||
|
||||
// Meta Fields
|
||||
@ -83,11 +90,19 @@ public:
|
||||
|
||||
static quint32 magic();
|
||||
static bool packetGuid(const uchar *pkt, int pktLen, uint *guid);
|
||||
static bool packetTtagId(const uchar *pkt, int pktLen, uint *ttagId, uint *guid);
|
||||
|
||||
// XXX: Any change in kTypeLenXXX or magic value should also be done in
|
||||
// TxThread/Ttag code as well where hardcoded values are used
|
||||
static const quint32 kMaxGuid = 0x00ffffff;
|
||||
static const quint32 kInvalidGuid = UINT_MAX;
|
||||
static const quint8 kTypeLenTtagPlaceholder = 0x22;
|
||||
static const quint8 kTypeLenTtag = 0x23;
|
||||
private:
|
||||
static const quint32 kSignMagic = 0x1d10c0da; // coda! (unicode - 0x1d10c)
|
||||
static const quint8 kTypeLenEnd = 0x00;
|
||||
static const quint8 kTypeLenGuid = 0x61;
|
||||
static const quint8 kTypeLenTxPort = 0x24;
|
||||
OstProto::Sign data;
|
||||
};
|
||||
|
||||
|
@ -75,9 +75,8 @@ public:
|
||||
quint32 id() const;
|
||||
bool setId(quint32 id);
|
||||
|
||||
quint32 portId() { return portId_;}
|
||||
#if 0 // FIXME(HI): needed?
|
||||
quint32 portId()
|
||||
{ return mCore->port_id();}
|
||||
bool setPortId(quint32 id)
|
||||
{ mCore->set_port_id(id); return true;}
|
||||
#endif
|
||||
|
@ -25,6 +25,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
#include "devicemanager.h"
|
||||
#include "interfaceinfo.h"
|
||||
#include "packetbuffer.h"
|
||||
#include "streamtiming.h"
|
||||
|
||||
#include <QString>
|
||||
#include <QIODevice>
|
||||
@ -54,6 +55,8 @@ AbstractPort::AbstractPort(int id, const char *device)
|
||||
maxStatsValue_ = ULLONG_MAX; // assume 64-bit stats
|
||||
memset((void*) &stats_, 0, sizeof(stats_));
|
||||
resetStats();
|
||||
|
||||
streamTiming_ = StreamTiming::instance();
|
||||
}
|
||||
|
||||
AbstractPort::~AbstractPort()
|
||||
@ -197,6 +200,13 @@ void AbstractPort::addNote(QString note)
|
||||
|
||||
bool AbstractPort::setTrackStreamStats(bool enable)
|
||||
{
|
||||
// XXX: This function is called by modify() in context of the RPC
|
||||
// thread (1 thread per connected client), but the StreamTiming
|
||||
// singleton resides in the main thread and its' start/stop methods
|
||||
// start/stop timers which cannot be done across Qt Threads. Hence
|
||||
// this slightly hacky way of invoking those methods
|
||||
QMetaObject::invokeMethod(streamTiming_, enable ? "start" : "stop",
|
||||
Qt::QueuedConnection, Q_ARG(uint, id()));
|
||||
data_.set_is_tracking_stream_stats(enable);
|
||||
|
||||
return true;
|
||||
@ -232,6 +242,9 @@ int AbstractPort::updatePacketList()
|
||||
|
||||
int AbstractPort::updatePacketListSequential()
|
||||
{
|
||||
quint64 duration = 0; // in nanosec
|
||||
quint64 totalPkts = 0;
|
||||
QList<uint> ttagMarkers;
|
||||
FrameValueAttrib packetListAttrib;
|
||||
long sec = 0;
|
||||
long nsec = 0;
|
||||
@ -259,6 +272,8 @@ int AbstractPort::updatePacketListSequential()
|
||||
quint64 npy1 = 0, npy2 = 0;
|
||||
quint64 loopDelay;
|
||||
ulong frameVariableCount = streamList_[i]->frameVariableCount();
|
||||
bool hasTtag = streamList_[i]->hasProtocol(
|
||||
OstProto::Protocol::kSignFieldNumber);
|
||||
|
||||
// We derive n, x, y such that
|
||||
// n * x + y = total number of packets to be sent
|
||||
@ -331,6 +346,7 @@ int AbstractPort::updatePacketListSequential()
|
||||
else if (n == 0)
|
||||
x = 0;
|
||||
|
||||
quint64 pktCount = n*x + y;
|
||||
for (uint j = 0; j < (x+y); j++)
|
||||
{
|
||||
|
||||
@ -383,6 +399,15 @@ int AbstractPort::updatePacketListSequential()
|
||||
}
|
||||
}
|
||||
|
||||
// Add a Ttag marker after every kTtagTimeInterval_ worth of pkts
|
||||
if (hasTtag) {
|
||||
uint ttagPktInterval = kTtagTimeInterval_*1e9/loopDelay;
|
||||
for (uint k = 0; k < pktCount; k += ttagPktInterval)
|
||||
ttagMarkers.append(totalPkts + k);
|
||||
}
|
||||
totalPkts += pktCount;
|
||||
duration += pktCount*loopDelay; // in nanosecs
|
||||
|
||||
switch(streamList_[i]->nextWhat())
|
||||
{
|
||||
case StreamBase::e_nw_stop:
|
||||
@ -420,6 +445,10 @@ int AbstractPort::updatePacketListSequential()
|
||||
|
||||
_out_of_memory:
|
||||
_stop_no_more_pkts:
|
||||
// See comments in updatePacketListInterleaved() for calc explanation
|
||||
setPacketListTtagMarkers(ttagMarkers, ttagMarkers.isEmpty() ? 0 :
|
||||
qMax(uint(kTtagTimeInterval_*1e9/(duration)),
|
||||
1U) * totalPkts);
|
||||
isSendQueueDirty_ = false;
|
||||
|
||||
qDebug("PacketListAttrib = %x",
|
||||
@ -432,7 +461,9 @@ int AbstractPort::updatePacketListInterleaved()
|
||||
FrameValueAttrib packetListAttrib;
|
||||
int numStreams = 0;
|
||||
quint64 minGap = ULLONG_MAX;
|
||||
quint64 duration = quint64(1e3);
|
||||
quint64 duration = quint64(1e3); // 1000ns (1us)
|
||||
|
||||
// TODO: convert the below to a QVector of struct aggregating all list vars
|
||||
QList<int> streamId;
|
||||
QList<quint64> ibg1, ibg2;
|
||||
QList<quint64> nb1, nb2;
|
||||
@ -442,6 +473,7 @@ int AbstractPort::updatePacketListInterleaved()
|
||||
QList<ulong> pktCount, burstCount;
|
||||
QList<ulong> burstSize;
|
||||
QList<bool> isVariable;
|
||||
QList<bool> hasTtag;
|
||||
QList<QByteArray> pktBuf;
|
||||
QList<ulong> pktLen;
|
||||
int activeStreamCount = 0;
|
||||
@ -465,6 +497,9 @@ int AbstractPort::updatePacketListInterleaved()
|
||||
// First sort the streams by ordinalValue
|
||||
std::sort(streamList_.begin(), streamList_.end(), StreamBase::StreamLessThan);
|
||||
|
||||
// FIXME: we are calculating n[bp][12], i[bp]g[12] for a duration of 1sec;
|
||||
// this was fine when the actual packet list duration was also 1sec. But
|
||||
// in the current code (post Turbo changes), the latter can be different!
|
||||
for (int i = 0; i < streamList_.size(); i++)
|
||||
{
|
||||
if (!streamList_[i]->isEnabled())
|
||||
@ -588,6 +623,8 @@ int AbstractPort::updatePacketListInterleaved()
|
||||
packetListAttrib += attrib;
|
||||
}
|
||||
|
||||
hasTtag.append(streamList_[i]->hasProtocol(
|
||||
OstProto::Protocol::kSignFieldNumber));
|
||||
numStreams++;
|
||||
} // for i
|
||||
|
||||
@ -610,12 +647,93 @@ int AbstractPort::updatePacketListInterleaved()
|
||||
|
||||
uchar* buf;
|
||||
int len;
|
||||
quint64 durSec = duration/ulong(1e9);
|
||||
quint64 durNsec = duration % ulong(1e9);
|
||||
const quint64 durSec = duration/ulong(1e9);
|
||||
const quint64 durNsec = duration % ulong(1e9);
|
||||
quint64 sec = 0;
|
||||
quint64 nsec = 0;
|
||||
quint64 lastPktTxSec = 0;
|
||||
quint64 lastPktTxNsec = 0;
|
||||
|
||||
// Count total packets we are going to add, so that we can create
|
||||
// an explicit packet set first
|
||||
// TODO: Find less expensive way to do this counting
|
||||
// FIXME: Turbo still thinks it has to create implicit packet set for
|
||||
// interleaved mode - Turbo code should be changed once this is validated
|
||||
quint64 totalPkts = 0;
|
||||
QVector<ulong> ttagSchedSec(numStreams, 0);
|
||||
QVector<ulong> ttagSchedNsec(numStreams, 0);
|
||||
QList<uint> ttagMarkers;
|
||||
|
||||
do
|
||||
{
|
||||
for (int i = 0; i < numStreams; i++)
|
||||
{
|
||||
// If a packet is not scheduled yet, look at the next stream
|
||||
if ((schedSec.at(i) > sec) || (schedNsec.at(i) > nsec))
|
||||
continue;
|
||||
|
||||
// Ttag marker every TtagTimeInterval for each stream
|
||||
if (hasTtag.at(i)
|
||||
&& ((schedSec.at(i) > ttagSchedSec.at(i))
|
||||
|| ((schedSec.at(i) == ttagSchedSec.at(i))
|
||||
&& (schedNsec.at(i) >= ttagSchedNsec.at(i))))) {
|
||||
ttagMarkers.append(totalPkts);
|
||||
ttagSchedSec[i] = schedSec.at(i) + kTtagTimeInterval_;
|
||||
ttagSchedNsec[i] = schedNsec.at(i);
|
||||
}
|
||||
|
||||
for (uint j = 0; j < burstSize[i]; j++)
|
||||
{
|
||||
pktCount[i]++;
|
||||
schedNsec[i] += (pktCount.at(i) < np1.at(i)) ?
|
||||
ipg1.at(i) : ipg2.at(i);
|
||||
while (schedNsec.at(i) >= 1e9)
|
||||
{
|
||||
schedSec[i]++;
|
||||
schedNsec[i] -= long(1e9);
|
||||
}
|
||||
lastPktTxSec = sec;
|
||||
lastPktTxNsec = nsec;
|
||||
totalPkts++;
|
||||
}
|
||||
|
||||
burstCount[i]++;
|
||||
schedNsec[i] += (burstCount.at(i) < nb1.at(i)) ?
|
||||
ibg1.at(i) : ibg2.at(i);
|
||||
while (schedNsec.at(i) >= 1e9)
|
||||
{
|
||||
schedSec[i]++;
|
||||
schedNsec[i] -= long(1e9);
|
||||
}
|
||||
}
|
||||
|
||||
nsec += minGap;
|
||||
while (nsec >= 1e9)
|
||||
{
|
||||
sec++;
|
||||
nsec -= long(1e9);
|
||||
}
|
||||
} while ((sec < durSec) || ((sec == durSec) && (nsec < durNsec)));
|
||||
|
||||
// XXX: For interleaved mode, we ALWAYS have a single packet set with
|
||||
// one repeat and 0n set loop delay
|
||||
loopNextPacketSet(totalPkts, 1, 0, 0);
|
||||
qDebug("Interleaved single PacketSet of size %lld, duration %llu.%09llu "
|
||||
"repeat 1 and delay 0",
|
||||
totalPkts, durSec, durNsec);
|
||||
|
||||
// Reset working sched/counts before building the packet list
|
||||
sec = nsec = 0;
|
||||
lastPktTxSec = lastPktTxNsec = 0;
|
||||
for (int i = 0; i < numStreams; i++)
|
||||
{
|
||||
schedSec[i] = 0;
|
||||
schedNsec[i] = 0;
|
||||
pktCount[i] = 0;
|
||||
burstCount[i] = 0;
|
||||
}
|
||||
|
||||
// Now build the packet list
|
||||
do
|
||||
{
|
||||
for (int i = 0; i < numStreams; i++)
|
||||
@ -643,7 +761,7 @@ int AbstractPort::updatePacketListInterleaved()
|
||||
if (len <= 0)
|
||||
continue;
|
||||
|
||||
qDebug("q(%d) sec = %llu nsec = %llu", i, sec, nsec);
|
||||
qDebug("q(%d) TS = %llu.%09llu", i, sec, nsec);
|
||||
if (!appendToPacketList(sec, nsec, buf, len)) {
|
||||
clearPacketList(); // don't leave it half baked/inconsitent
|
||||
packetListAttrib.errorFlags |= FrameValueAttrib::OutOfMemoryError;
|
||||
@ -678,7 +796,7 @@ int AbstractPort::updatePacketListInterleaved()
|
||||
sec++;
|
||||
nsec -= long(1e9);
|
||||
}
|
||||
} while ((sec < durSec) || (nsec < durNsec));
|
||||
} while ((sec < durSec) || ((sec == durSec) && (nsec < durNsec)));
|
||||
|
||||
{
|
||||
qint64 delaySec = durSec - lastPktTxSec;
|
||||
@ -688,10 +806,22 @@ int AbstractPort::updatePacketListInterleaved()
|
||||
delayNsec += long(1e9);
|
||||
delaySec--;
|
||||
}
|
||||
qDebug("loop Delay = %lld/%lld", delaySec, delayNsec);
|
||||
qDebug("loop Delay = %lld.%09lld", delaySec, delayNsec);
|
||||
setPacketListLoopMode(true, delaySec, delayNsec);
|
||||
}
|
||||
|
||||
// XXX: TTag repeat interval calculation:
|
||||
// CASE 1. pktListDuration < kTtagTimeInterval:
|
||||
// e.g. if pktListDuration is 1sec and TtagTimerInterval is 5s, we
|
||||
// skip 5 times total packets before we repeat the markers
|
||||
// CASE 2. pktListDuration > kTtagTimeInterval:
|
||||
// e.g. if pktListDuration is 7sec and TtagTimerInterval is 5s, we
|
||||
// skip repeat markers every pktList iteration
|
||||
setPacketListTtagMarkers(ttagMarkers, ttagMarkers.isEmpty() ? 0 :
|
||||
qMax(uint(kTtagTimeInterval_*1e9
|
||||
/(durSec*1e9+durNsec)),
|
||||
1U) * totalPkts);
|
||||
|
||||
_out_of_memory:
|
||||
isSendQueueDirty_ = false;
|
||||
|
||||
@ -734,6 +864,16 @@ void AbstractPort::stats(PortStats *stats)
|
||||
stats_.rxFrameErrors + (maxStatsValue_ - epochStats_.rxFrameErrors);
|
||||
}
|
||||
|
||||
quint64 AbstractPort::streamTimingDelay(uint guid)
|
||||
{
|
||||
return streamTiming_->delay(id(), guid);
|
||||
}
|
||||
|
||||
void AbstractPort::clearStreamTiming(uint guid)
|
||||
{
|
||||
streamTiming_->clear(id(), guid);
|
||||
}
|
||||
|
||||
void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats)
|
||||
{
|
||||
// In case stats are being maintained elsewhere
|
||||
@ -748,6 +888,7 @@ void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats)
|
||||
s->mutable_port_id()->set_id(id());
|
||||
|
||||
s->set_tx_duration(lastTransmitDuration());
|
||||
s->set_latency(streamTimingDelay(guid));
|
||||
|
||||
s->set_tx_pkts(sst.tx_pkts);
|
||||
s->set_tx_bytes(sst.tx_bytes);
|
||||
@ -775,6 +916,7 @@ void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats)
|
||||
s->mutable_port_id()->set_id(id());
|
||||
|
||||
s->set_tx_duration(txDur);
|
||||
s->set_latency(streamTimingDelay(i.key()));
|
||||
|
||||
s->set_tx_pkts(sst.tx_pkts);
|
||||
s->set_tx_bytes(sst.tx_bytes);
|
||||
@ -786,11 +928,13 @@ void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats)
|
||||
void AbstractPort::resetStreamStats(uint guid)
|
||||
{
|
||||
streamStats_.remove(guid);
|
||||
clearStreamTiming(guid);
|
||||
}
|
||||
|
||||
void AbstractPort::resetStreamStatsAll()
|
||||
{
|
||||
streamStats_.clear();
|
||||
clearStreamTiming();
|
||||
}
|
||||
|
||||
void AbstractPort::clearDeviceNeighbors()
|
||||
|
@ -20,18 +20,20 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
#ifndef _SERVER_ABSTRACT_PORT_H
|
||||
#define _SERVER_ABSTRACT_PORT_H
|
||||
|
||||
#include "../common/protocol.pb.h"
|
||||
#include "streamstats.h"
|
||||
|
||||
#include <QList>
|
||||
#include <QtGlobal>
|
||||
|
||||
#include "../common/protocol.pb.h"
|
||||
#include <limits.h>
|
||||
|
||||
class DeviceManager;
|
||||
struct InterfaceInfo;
|
||||
class StreamBase;
|
||||
class PacketBuffer;
|
||||
class QIODevice;
|
||||
class StreamBase;
|
||||
class StreamTiming;
|
||||
|
||||
// TODO: send notification back to client(s)
|
||||
#define Xnotify qWarning
|
||||
@ -105,6 +107,8 @@ public:
|
||||
int length) = 0;
|
||||
virtual void setPacketListLoopMode(bool loop,
|
||||
quint64 secDelay, quint64 nsecDelay) = 0;
|
||||
virtual void setPacketListTtagMarkers(QList<uint> markers,
|
||||
uint repeatInterval) = 0;
|
||||
int updatePacketList();
|
||||
|
||||
virtual void startTransmit() = 0;
|
||||
@ -120,6 +124,9 @@ public:
|
||||
void stats(PortStats *stats);
|
||||
void resetStats() { epochStats_ = stats_; }
|
||||
|
||||
quint64 streamTimingDelay(uint guid);
|
||||
void clearStreamTiming(uint guid = UINT_MAX);
|
||||
|
||||
// FIXME: combine single and All calls?
|
||||
void streamStats(uint guid, OstProto::StreamStatsList *stats);
|
||||
void streamStatsAll(OstProto::StreamStatsList *stats);
|
||||
@ -158,6 +165,8 @@ protected:
|
||||
StreamStats streamStats_;
|
||||
//! \todo Need lock for stats access/update
|
||||
|
||||
const uint kTtagTimeInterval_{5}; // in seconds
|
||||
|
||||
struct InterfaceInfo *interfaceInfo_;
|
||||
DeviceManager *deviceManager_;
|
||||
|
||||
@ -177,6 +186,8 @@ private:
|
||||
QList<StreamBase*> streamList_;
|
||||
|
||||
struct PortStats epochStats_;
|
||||
|
||||
StreamTiming *streamTiming_{nullptr};
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -41,7 +41,8 @@ LIBS += -lm
|
||||
LIBS += -lprotobuf
|
||||
HEADERS += drone.h \
|
||||
pcaptransmitter.h \
|
||||
myservice.h
|
||||
myservice.h \
|
||||
streamtiming.h
|
||||
SOURCES += \
|
||||
devicemanager.cpp \
|
||||
device.cpp \
|
||||
@ -56,12 +57,14 @@ SOURCES += \
|
||||
pcaprxstats.cpp \
|
||||
pcaptxstats.cpp \
|
||||
pcaptxthread.cpp \
|
||||
pcaptxttagstats.cpp \
|
||||
bsdhostdevice.cpp \
|
||||
bsdport.cpp \
|
||||
linuxhostdevice.cpp \
|
||||
linuxport.cpp \
|
||||
linuxutils.cpp \
|
||||
params.cpp \
|
||||
streamtiming.cpp \
|
||||
turbo.cpp \
|
||||
winhostdevice.cpp \
|
||||
winpcapport.cpp
|
||||
|
@ -20,8 +20,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
#ifndef _PACKET_SEQUENCE_H
|
||||
#define _PACKET_SEQUENCE_H
|
||||
|
||||
#include "pcapextra.h"
|
||||
#include "../common/packet.h"
|
||||
#include "../common/sign.h"
|
||||
#include "pcapextra.h"
|
||||
#include "streamstats.h"
|
||||
|
||||
class PacketSequence
|
||||
@ -37,6 +38,7 @@ public:
|
||||
repeatCount_ = 1;
|
||||
repeatSize_ = 1;
|
||||
usecDelay_ = 0;
|
||||
ttagL4CksumOffset_ = 0;
|
||||
}
|
||||
~PacketSequence() {
|
||||
pcap_sendqueue_destroy(sendQueue_);
|
||||
@ -69,6 +71,16 @@ public:
|
||||
streamStatsMeta_[guid].tx_bytes += pktHeader->caplen;
|
||||
}
|
||||
}
|
||||
// TODO: A PacketSequence belongs to a unique stream only in case of
|
||||
// sequential streams; for interleaved streams, we have only a single
|
||||
// packet set (with one or more sequences) containing packets from
|
||||
// multiple streams. To support this, we need to make l4cksum a packet
|
||||
// property not a sequence property
|
||||
// Till the above is fixed, Ttag packets will have wrong checksum
|
||||
#if 0
|
||||
if (trackGuidStats_ && (packets_ == 1)) // first packet of seq
|
||||
ttagL4CksumOffset_ = Packet::l4ChecksumOffset(pktData, pktHeader->caplen);
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
pcap_send_queue *sendQueue_;
|
||||
@ -79,6 +91,7 @@ public:
|
||||
int repeatCount_;
|
||||
int repeatSize_;
|
||||
long usecDelay_;
|
||||
quint16 ttagL4CksumOffset_; // For ttag packets
|
||||
StreamStats streamStatsMeta_;
|
||||
|
||||
private:
|
||||
|
@ -34,6 +34,7 @@ PcapPort::PcapPort(int id, const char *device)
|
||||
transmitter_ = new PcapTransmitter(device, streamStats_);
|
||||
capturer_ = new PortCapturer(device);
|
||||
emulXcvr_ = new EmulationTransceiver(device, deviceManager_);
|
||||
txTtagStatsPoller_ = new PcapTxTtagStats(device, id);
|
||||
rxStatsPoller_ = new PcapRxStats(device, streamStats_, id);
|
||||
|
||||
if (!monitorRx_->handle() || !monitorTx_->handle())
|
||||
@ -85,6 +86,9 @@ PcapPort::~PcapPort()
|
||||
if (monitorTx_)
|
||||
monitorTx_->stop();
|
||||
|
||||
txTtagStatsPoller_->stop();
|
||||
delete txTtagStatsPoller_;
|
||||
|
||||
rxStatsPoller_->stop();
|
||||
delete rxStatsPoller_;
|
||||
|
||||
@ -145,8 +149,10 @@ bool PcapPort::setRateAccuracy(AbstractPort::Accuracy accuracy)
|
||||
void PcapPort::updateStreamStats()
|
||||
{
|
||||
// XXX: PcapTxThread already does this at the end of transmit; we
|
||||
// just dump rx stats poller debug stats here
|
||||
// just dump tx/rx stats poller debug stats here
|
||||
|
||||
qDebug("port %d txTtagStatsPoller: %s",
|
||||
id(), qUtf8Printable(txTtagStatsPoller_->debugStats()));
|
||||
qDebug("port %d rxStatsPoller: %s",
|
||||
id(), qUtf8Printable(rxStatsPoller_->debugStats()));
|
||||
}
|
||||
@ -170,6 +176,8 @@ bool PcapPort::startStreamStatsTracking()
|
||||
{
|
||||
if (!transmitter_->setStreamStatsTracking(true))
|
||||
goto _tx_fail;
|
||||
if (!txTtagStatsPoller_->start())
|
||||
goto _tx_ttag_fail;
|
||||
if (!rxStatsPoller_->start())
|
||||
goto _rx_fail;
|
||||
/*
|
||||
@ -183,6 +191,8 @@ bool PcapPort::startStreamStatsTracking()
|
||||
return true;
|
||||
|
||||
_rx_fail:
|
||||
txTtagStatsPoller_->stop();
|
||||
_tx_ttag_fail:
|
||||
transmitter_->setStreamStatsTracking(false);
|
||||
_tx_fail:
|
||||
qWarning("failed to start stream stats tracking");
|
||||
@ -191,17 +201,22 @@ _tx_fail:
|
||||
|
||||
bool PcapPort::stopStreamStatsTracking()
|
||||
{
|
||||
if (!transmitter_->setStreamStatsTracking(false))
|
||||
goto _tx_fail;
|
||||
if (!rxStatsPoller_->stop())
|
||||
goto _rx_fail;
|
||||
return true;
|
||||
bool ret = true;
|
||||
|
||||
_rx_fail:
|
||||
transmitter_->setStreamStatsTracking(true);
|
||||
_tx_fail:
|
||||
qWarning("failed to stop stream stats tracking");
|
||||
return false;
|
||||
if (!transmitter_->setStreamStatsTracking(false)) {
|
||||
qWarning("failed to stop Transmitter stream stats tracking");
|
||||
ret = false;
|
||||
}
|
||||
if (!txTtagStatsPoller_->stop()) {
|
||||
qWarning("failed to stop TxTtag stream stats thread");
|
||||
ret = false;
|
||||
}
|
||||
if (!rxStatsPoller_->stop()) {
|
||||
qWarning("failed to stop Rx stream stats thread");
|
||||
ret = false;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -489,7 +504,7 @@ void PcapPort::PortCapturer::stop()
|
||||
{
|
||||
if (state_ == kRunning) {
|
||||
stop_ = true;
|
||||
PcapSession::stop(handle_);
|
||||
PcapSession::stop();
|
||||
while (state_ == kRunning)
|
||||
QThread::msleep(10);
|
||||
}
|
||||
@ -713,7 +728,7 @@ void PcapPort::EmulationTransceiver::stop()
|
||||
{
|
||||
if (state_ == kRunning) {
|
||||
stop_ = true;
|
||||
PcapSession::stop(handle_);
|
||||
PcapSession::stop();
|
||||
while (state_ == kRunning)
|
||||
QThread::msleep(10);
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
#include "abstractport.h"
|
||||
#include "pcapextra.h"
|
||||
#include "pcaprxstats.h"
|
||||
#include "pcaptxttagstats.h"
|
||||
#include "pcapsession.h"
|
||||
#include "pcaptransmitter.h"
|
||||
|
||||
@ -47,6 +48,7 @@ public:
|
||||
virtual void clearPacketList() {
|
||||
transmitter_->clearPacketList();
|
||||
setPacketListLoopMode(false, 0, 0);
|
||||
setPacketListTtagMarkers(QList<uint>(), 0);
|
||||
}
|
||||
virtual void loopNextPacketSet(qint64 size, qint64 repeats,
|
||||
long repeatDelaySec, long repeatDelayNsec) {
|
||||
@ -61,6 +63,11 @@ public:
|
||||
{
|
||||
transmitter_->setPacketListLoopMode(loop, secDelay, nsecDelay);
|
||||
}
|
||||
virtual void setPacketListTtagMarkers(QList<uint> markers,
|
||||
uint repeatInterval)
|
||||
{
|
||||
transmitter_->setPacketListTtagMarkers(markers, repeatInterval);
|
||||
}
|
||||
|
||||
virtual void startTransmit() {
|
||||
Q_ASSERT(!isDirty());
|
||||
@ -134,7 +141,6 @@ protected:
|
||||
QString device_;
|
||||
volatile bool stop_;
|
||||
QTemporaryFile capFile_;
|
||||
pcap_t *handle_;
|
||||
pcap_dumper_t *dumpHandle_;
|
||||
volatile State state_;
|
||||
};
|
||||
@ -161,7 +167,6 @@ protected:
|
||||
QString device_;
|
||||
DeviceManager *deviceManager_;
|
||||
volatile bool stop_;
|
||||
pcap_t *handle_;
|
||||
volatile State state_;
|
||||
};
|
||||
|
||||
@ -177,6 +182,7 @@ private:
|
||||
PcapTransmitter *transmitter_;
|
||||
PortCapturer *capturer_;
|
||||
EmulationTransceiver *emulXcvr_;
|
||||
PcapTxTtagStats *txTtagStatsPoller_;
|
||||
PcapRxStats *rxStatsPoller_;
|
||||
|
||||
static pcap_if_t *deviceList_;
|
||||
|
@ -20,13 +20,16 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
#include "pcaprxstats.h"
|
||||
|
||||
#include "pcapextra.h"
|
||||
#include "../common/debugdefs.h"
|
||||
#include "../common/sign.h"
|
||||
#include "streamtiming.h"
|
||||
|
||||
#define Xnotify qWarning // FIXME
|
||||
|
||||
PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats, int id)
|
||||
: streamStats_(portStreamStats)
|
||||
{
|
||||
setObjectName(QString("Rx$:%1").arg(device));
|
||||
device_ = QString::fromLatin1(device);
|
||||
stop_ = false;
|
||||
state_ = kNotStarted;
|
||||
@ -34,7 +37,9 @@ PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats, int i
|
||||
|
||||
handle_ = NULL;
|
||||
|
||||
id_ = id;
|
||||
portId_ = id;
|
||||
|
||||
timing_ = StreamTiming::instance();
|
||||
}
|
||||
|
||||
pcap_t* PcapRxStats::handle()
|
||||
@ -106,7 +111,7 @@ void PcapRxStats::run()
|
||||
}
|
||||
|
||||
_skip_filter:
|
||||
memset(&lastPcapStats_, 0, sizeof(lastPcapStats_));
|
||||
clearDebugStats();
|
||||
PcapSession::preRun();
|
||||
state_ = kRunning;
|
||||
while (1) {
|
||||
@ -117,8 +122,29 @@ _skip_filter:
|
||||
ret = pcap_next_ex(handle_, &hdr, &data);
|
||||
switch (ret) {
|
||||
case 1: {
|
||||
uint guid;
|
||||
if (SignProtocol::packetGuid(data, hdr->caplen, &guid)) {
|
||||
uint ttagId, guid;
|
||||
#ifdef Q_OS_WIN32
|
||||
// Npcap (Windows) doesn't support direction, so packets
|
||||
// Tx by PcapTxThread are received back by us here - use
|
||||
// TxPort to filter out. TxPort is returned as byte 1 of
|
||||
// ttagId (byte 0 is ttagId).
|
||||
// If TxPort is us ==> Tx Packet, so skip
|
||||
// FIXME: remove once npcap supports pcap direction
|
||||
if (SignProtocol::packetTtagId(data, hdr->caplen, &ttagId, &guid)
|
||||
&& (ttagId >> 8 != uint(portId_))) {
|
||||
ttagId &= 0xFF;
|
||||
timing_->recordRxTime(portId_, guid, ttagId, hdr->ts);
|
||||
timingDebug("[%d RX] %ld:%ld ttag %u guid %u", portId_,
|
||||
hdr->ts.tv_sec, long(hdr->ts.tv_usec), ttagId, guid);
|
||||
}
|
||||
#else
|
||||
if (SignProtocol::packetTtagId(data, hdr->caplen, &ttagId, &guid)) {
|
||||
timing_->recordRxTime(portId_, guid, ttagId, hdr->ts);
|
||||
timingDebug("[%d RX] %ld:%ld ttag %u guid %u", portId_,
|
||||
hdr->ts.tv_sec, long(hdr->ts.tv_usec), ttagId, guid);
|
||||
}
|
||||
#endif
|
||||
if (guid != SignProtocol::kInvalidGuid) {
|
||||
streamStats_[guid].rx_pkts++;
|
||||
streamStats_[guid].rx_bytes += hdr->caplen;
|
||||
}
|
||||
@ -174,7 +200,7 @@ bool PcapRxStats::stop()
|
||||
{
|
||||
if (state_ == kRunning) {
|
||||
stop_ = true;
|
||||
PcapSession::stop(handle_);
|
||||
PcapSession::stop();
|
||||
while (state_ == kRunning)
|
||||
QThread::msleep(10);
|
||||
}
|
||||
@ -193,57 +219,3 @@ bool PcapRxStats::isDirectional()
|
||||
{
|
||||
return isDirectional_;
|
||||
}
|
||||
|
||||
// XXX: Implemented as reset on read
|
||||
QString PcapRxStats::debugStats()
|
||||
{
|
||||
QString dbgStats;
|
||||
|
||||
#ifdef Q_OS_WIN32
|
||||
static_assert(sizeof(struct pcap_stat) == 6*sizeof(uint),
|
||||
"pcap_stat has less or more than 6 values");
|
||||
int size;
|
||||
struct pcap_stat incPcapStats;
|
||||
struct pcap_stat *pcapStats = pcap_stats_ex(handle_, &size);
|
||||
if (pcapStats && (uint(size) >= 6*sizeof(uint))) {
|
||||
incPcapStats.ps_recv = pcapStats->ps_recv - lastPcapStats_.ps_recv;
|
||||
incPcapStats.ps_drop = pcapStats->ps_drop - lastPcapStats_.ps_drop;
|
||||
incPcapStats.ps_ifdrop = pcapStats->ps_ifdrop - lastPcapStats_.ps_ifdrop;
|
||||
incPcapStats.ps_capt = pcapStats->ps_capt - lastPcapStats_.ps_capt;
|
||||
incPcapStats.ps_sent = pcapStats->ps_sent - lastPcapStats_.ps_sent;
|
||||
incPcapStats.ps_netdrop = pcapStats->ps_netdrop - lastPcapStats_.ps_netdrop;
|
||||
dbgStats = QString("recv: %1 drop: %2 ifdrop: %3 "
|
||||
"capt: %4 sent: %5 netdrop: %6")
|
||||
.arg(incPcapStats.ps_recv)
|
||||
.arg(incPcapStats.ps_drop)
|
||||
.arg(incPcapStats.ps_ifdrop)
|
||||
.arg(incPcapStats.ps_capt)
|
||||
.arg(incPcapStats.ps_sent)
|
||||
.arg(incPcapStats.ps_netdrop);
|
||||
lastPcapStats_ = *pcapStats;
|
||||
} else {
|
||||
dbgStats = QString("error reading pcap stats: %1")
|
||||
.arg(pcap_geterr(handle_));
|
||||
}
|
||||
#else
|
||||
struct pcap_stat pcapStats;
|
||||
struct pcap_stat incPcapStats;
|
||||
|
||||
int ret = pcap_stats(handle_, &pcapStats);
|
||||
if (ret == 0) {
|
||||
incPcapStats.ps_recv = pcapStats.ps_recv - lastPcapStats_.ps_recv;
|
||||
incPcapStats.ps_drop = pcapStats.ps_drop - lastPcapStats_.ps_drop;
|
||||
incPcapStats.ps_ifdrop = pcapStats.ps_ifdrop - lastPcapStats_.ps_ifdrop;
|
||||
dbgStats = QString("recv: %1 drop: %2 ifdrop: %3")
|
||||
.arg(incPcapStats.ps_recv)
|
||||
.arg(incPcapStats.ps_drop)
|
||||
.arg(incPcapStats.ps_ifdrop);
|
||||
lastPcapStats_ = pcapStats;
|
||||
} else {
|
||||
dbgStats = QString("error reading pcap stats: %1")
|
||||
.arg(pcap_geterr(handle_));
|
||||
}
|
||||
#endif
|
||||
|
||||
return dbgStats;
|
||||
}
|
||||
|
@ -24,6 +24,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
#include "pcapsession.h"
|
||||
|
||||
class StreamTiming;
|
||||
|
||||
class PcapRxStats: public PcapSession
|
||||
{
|
||||
public:
|
||||
@ -35,8 +37,6 @@ public:
|
||||
bool isRunning();
|
||||
bool isDirectional();
|
||||
|
||||
QString debugStats();
|
||||
|
||||
private:
|
||||
enum State {
|
||||
kNotStarted,
|
||||
@ -47,12 +47,12 @@ private:
|
||||
QString device_;
|
||||
StreamStats &streamStats_;
|
||||
volatile bool stop_;
|
||||
pcap_t *handle_;
|
||||
volatile State state_;
|
||||
bool isDirectional_;
|
||||
|
||||
int id_;
|
||||
struct pcap_stat lastPcapStats_;
|
||||
int portId_;
|
||||
|
||||
StreamTiming *timing_{nullptr};
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -19,6 +19,69 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
#include "pcapsession.h"
|
||||
|
||||
// XXX: Implemented as reset on read
|
||||
QString PcapSession::debugStats()
|
||||
{
|
||||
QString dbgStats;
|
||||
|
||||
if (!handle_)
|
||||
return QString();
|
||||
|
||||
#ifdef Q_OS_WIN32
|
||||
static_assert(sizeof(struct pcap_stat) == 6*sizeof(uint),
|
||||
"pcap_stat has less or more than 6 values");
|
||||
int size;
|
||||
struct pcap_stat incPcapStats;
|
||||
struct pcap_stat *pcapStats = pcap_stats_ex(handle_, &size);
|
||||
if (pcapStats && (uint(size) >= 6*sizeof(uint))) {
|
||||
incPcapStats.ps_recv = pcapStats->ps_recv - lastPcapStats_.ps_recv;
|
||||
incPcapStats.ps_drop = pcapStats->ps_drop - lastPcapStats_.ps_drop;
|
||||
incPcapStats.ps_ifdrop = pcapStats->ps_ifdrop - lastPcapStats_.ps_ifdrop;
|
||||
incPcapStats.ps_capt = pcapStats->ps_capt - lastPcapStats_.ps_capt;
|
||||
incPcapStats.ps_sent = pcapStats->ps_sent - lastPcapStats_.ps_sent;
|
||||
incPcapStats.ps_netdrop = pcapStats->ps_netdrop - lastPcapStats_.ps_netdrop;
|
||||
dbgStats = QString("recv: %1 drop: %2 ifdrop: %3 "
|
||||
"capt: %4 sent: %5 netdrop: %6")
|
||||
.arg(incPcapStats.ps_recv)
|
||||
.arg(incPcapStats.ps_drop)
|
||||
.arg(incPcapStats.ps_ifdrop)
|
||||
.arg(incPcapStats.ps_capt)
|
||||
.arg(incPcapStats.ps_sent)
|
||||
.arg(incPcapStats.ps_netdrop);
|
||||
lastPcapStats_ = *pcapStats;
|
||||
} else {
|
||||
dbgStats = QString("error reading pcap stats: %1")
|
||||
.arg(pcap_geterr(handle_));
|
||||
}
|
||||
#else
|
||||
struct pcap_stat pcapStats;
|
||||
struct pcap_stat incPcapStats;
|
||||
|
||||
int ret = pcap_stats(handle_, &pcapStats);
|
||||
if (ret == 0) {
|
||||
incPcapStats.ps_recv = pcapStats.ps_recv - lastPcapStats_.ps_recv;
|
||||
incPcapStats.ps_drop = pcapStats.ps_drop - lastPcapStats_.ps_drop;
|
||||
incPcapStats.ps_ifdrop = pcapStats.ps_ifdrop - lastPcapStats_.ps_ifdrop;
|
||||
dbgStats = QString("recv: %1 drop: %2 ifdrop: %3")
|
||||
.arg(incPcapStats.ps_recv)
|
||||
.arg(incPcapStats.ps_drop)
|
||||
.arg(incPcapStats.ps_ifdrop);
|
||||
lastPcapStats_ = pcapStats;
|
||||
} else {
|
||||
dbgStats = QString("error reading pcap stats: %1")
|
||||
.arg(pcap_geterr(handle_));
|
||||
}
|
||||
#endif
|
||||
|
||||
return dbgStats;
|
||||
}
|
||||
|
||||
bool PcapSession::clearDebugStats()
|
||||
{
|
||||
memset(&lastPcapStats_, 0, sizeof(lastPcapStats_));
|
||||
return true;
|
||||
}
|
||||
|
||||
#ifdef Q_OS_UNIX
|
||||
#include <signal.h>
|
||||
#include <typeinfo>
|
||||
@ -66,7 +129,7 @@ void PcapSession::postRun()
|
||||
qDebug("Signal seen and handled");
|
||||
}
|
||||
|
||||
void PcapSession::stop(pcap_t *handle)
|
||||
void PcapSession::stop()
|
||||
{
|
||||
// Should be called OUTSIDE the thread's context
|
||||
// XXX: As per the man page for pcap_breakloop, we need both
|
||||
@ -74,7 +137,7 @@ void PcapSession::stop(pcap_t *handle)
|
||||
// we use a signal for the latter
|
||||
// TODO: If the signal mechanism doesn't work, we could try
|
||||
// pthread_cancel(thread_);
|
||||
pcap_breakloop(handle);
|
||||
pcap_breakloop(handle_);
|
||||
pthread_kill(thread_.nativeId(), MY_BREAK_SIGNAL);
|
||||
}
|
||||
|
||||
|
@ -55,27 +55,46 @@ inline uint qHash(const ThreadId &key)
|
||||
|
||||
class PcapSession: public QThread
|
||||
{
|
||||
public:
|
||||
QString debugStats();
|
||||
|
||||
protected:
|
||||
bool clearDebugStats();
|
||||
|
||||
void preRun();
|
||||
void postRun();
|
||||
void stop(pcap_t *handle);
|
||||
void stop();
|
||||
|
||||
pcap_t *handle_{nullptr};
|
||||
|
||||
private:
|
||||
static void signalBreakHandler(int /*signum*/);
|
||||
|
||||
ThreadId thread_;
|
||||
static QHash<ThreadId, bool> signalSeen_;
|
||||
|
||||
struct pcap_stat lastPcapStats_;
|
||||
};
|
||||
#else
|
||||
class PcapSession: public QThread
|
||||
{
|
||||
public:
|
||||
QString debugStats();
|
||||
|
||||
protected:
|
||||
bool clearDebugStats();
|
||||
|
||||
void preRun() {};
|
||||
void postRun() {};
|
||||
void stop(pcap_t *handle) {
|
||||
qDebug("calling breakloop with handle %p", handle);
|
||||
pcap_breakloop(handle);
|
||||
void stop() {
|
||||
qDebug("calling breakloop with handle %p", handle_);
|
||||
pcap_breakloop(handle_);
|
||||
}
|
||||
|
||||
pcap_t *handle_{nullptr};
|
||||
|
||||
private:
|
||||
struct pcap_stat lastPcapStats_;
|
||||
};
|
||||
#endif
|
||||
|
||||
|
@ -88,6 +88,13 @@ void PcapTransmitter::setPacketListLoopMode(
|
||||
txThread_.setPacketListLoopMode(loop, secDelay, nsecDelay);
|
||||
}
|
||||
|
||||
void PcapTransmitter::setPacketListTtagMarkers(
|
||||
QList<uint> markers,
|
||||
uint repeatInterval)
|
||||
{
|
||||
txThread_.setPacketListTtagMarkers(markers, repeatInterval);
|
||||
}
|
||||
|
||||
void PcapTransmitter::useExternalStats(AbstractPort::PortStats *stats)
|
||||
{
|
||||
txStats_.useExternalStats(stats);
|
||||
|
@ -42,6 +42,7 @@ public:
|
||||
bool appendToPacketList(long sec, long usec, const uchar *packet,
|
||||
int length);
|
||||
void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay);
|
||||
void setPacketListTtagMarkers(QList<uint> markers, uint repeatInterval);
|
||||
|
||||
void setHandle(pcap_t *handle);
|
||||
void useExternalStats(AbstractPort::PortStats *stats);
|
||||
|
@ -19,9 +19,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
#include "pcaptxthread.h"
|
||||
|
||||
#include "sign.h"
|
||||
#include "statstuple.h"
|
||||
#include "timestamp.h"
|
||||
|
||||
#include <QtDebug>
|
||||
|
||||
PcapTxThread::PcapTxThread(const char *device)
|
||||
{
|
||||
char errbuf[PCAP_ERRBUF_SIZE] = "";
|
||||
@ -109,10 +112,24 @@ void PcapTxThread::clearPacketList()
|
||||
void PcapTxThread::loopNextPacketSet(qint64 size, qint64 repeats,
|
||||
long repeatDelaySec, long repeatDelayNsec)
|
||||
{
|
||||
// Since we create implicit packetset for this case, skip
|
||||
// This case => Packet set for y when x = 0 or n==1 in n*x+y
|
||||
#if 0 // Don't let implicit packet sets be created
|
||||
// XXX: The below change was done as part of Turbo code
|
||||
// implementation alongwith calls to this function from
|
||||
// AbstractPort::updatePacketListSequential(). Turbo to
|
||||
// have clean code requires explicit packet sets for all
|
||||
// cases (except interleaved streams). The below change
|
||||
// was done so that the base code should not be affected
|
||||
// after the explict packet set creation calls.
|
||||
// XXX: Since we create implicit packetset for this case, skip
|
||||
// This case =>
|
||||
// 1. Packet set for y when x = 0
|
||||
// 2. n==1 in n*x+y
|
||||
// These two cases were the result of the changes in
|
||||
// updatePacketListSequential() as part of Turbo changes
|
||||
// mentioned above
|
||||
if (repeats == 1)
|
||||
return;
|
||||
#endif
|
||||
|
||||
currentPacketSequence_ = new PacketSequence(trackStreamStats_);
|
||||
currentPacketSequence_->repeatCount_ = repeats;
|
||||
@ -136,24 +153,20 @@ bool PcapTxThread::appendToPacketList(long sec, long nsec,
|
||||
pktHdr.ts.tv_sec = sec;
|
||||
pktHdr.ts.tv_usec = nsec/1000;
|
||||
|
||||
if (currentPacketSequence_ == NULL ||
|
||||
!currentPacketSequence_->hasFreeSpace(2*sizeof(pcap_pkthdr)+length))
|
||||
{
|
||||
if (currentPacketSequence_ != NULL)
|
||||
{
|
||||
long usecs;
|
||||
// loopNextPacketSet should have created a seq
|
||||
Q_ASSERT(currentPacketSequence_ != NULL);
|
||||
|
||||
usecs = (pktHdr.ts.tv_sec
|
||||
- currentPacketSequence_->lastPacket_->ts.tv_sec)
|
||||
* long(1e6);
|
||||
usecs += (pktHdr.ts.tv_usec
|
||||
- currentPacketSequence_->lastPacket_->ts.tv_usec);
|
||||
currentPacketSequence_->usecDelay_ = usecs;
|
||||
}
|
||||
// If not enough space, update usecDelay and alloc a new seq
|
||||
if (!currentPacketSequence_->hasFreeSpace(2*sizeof(pcap_pkthdr)+length))
|
||||
{
|
||||
struct timeval diff;
|
||||
timersub(&pktHdr.ts, ¤tPacketSequence_->lastPacket_->ts, &diff);
|
||||
currentPacketSequence_->usecDelay_ = diff.tv_usec;
|
||||
if (diff.tv_sec)
|
||||
currentPacketSequence_->usecDelay_ += diff.tv_sec*1e6;
|
||||
|
||||
//! \todo (LOW): calculate sendqueue size
|
||||
currentPacketSequence_ = new PacketSequence(trackStreamStats_);
|
||||
|
||||
packetSequenceList_.append(currentPacketSequence_);
|
||||
|
||||
// Validate that the pkt will fit inside the new currentSendQueue_
|
||||
@ -169,6 +182,8 @@ bool PcapTxThread::appendToPacketList(long sec, long nsec,
|
||||
packetCount_++;
|
||||
packetListSize_ += repeatSize_ ?
|
||||
currentPacketSequence_->repeatCount_ : 1;
|
||||
|
||||
// Last packet of packet-set?
|
||||
if (repeatSize_ > 0 && packetCount_ == repeatSize_)
|
||||
{
|
||||
qDebug("repeatSequenceStart_=%d, repeatSize_ = %llu",
|
||||
@ -190,7 +205,7 @@ bool PcapTxThread::appendToPacketList(long sec, long nsec,
|
||||
|
||||
repeatSize_ = 0;
|
||||
|
||||
// End current pktSeq and trigger a new pktSeq allocation for next pkt
|
||||
// End current pktSeq
|
||||
currentPacketSequence_ = NULL;
|
||||
}
|
||||
|
||||
@ -206,6 +221,27 @@ void PcapTxThread::setPacketListLoopMode(
|
||||
loopDelay_ = secDelay*long(1e6) + nsecDelay/1000;
|
||||
}
|
||||
|
||||
void PcapTxThread::setPacketListTtagMarkers(
|
||||
QList<uint> markers,
|
||||
uint repeatInterval)
|
||||
{
|
||||
// XXX: Empty markers => no streams have Ttag
|
||||
firstTtagPkt_ = markers.isEmpty() ? -1 : int(markers.first());
|
||||
|
||||
// Calculate delta markers
|
||||
ttagDeltaMarkers_.clear();
|
||||
for (int i = 1; i < markers.size(); i++)
|
||||
ttagDeltaMarkers_.append(markers.at(i) - markers.at(i-1));
|
||||
if (!markers.isEmpty()) {
|
||||
ttagDeltaMarkers_.append(repeatInterval - markers.last()
|
||||
+ markers.first());
|
||||
qDebug() << "TtagRepeatInterval:" << repeatInterval;
|
||||
qDebug() << "FirstTtagPkt:" << firstTtagPkt_;
|
||||
qDebug() << "TtagMarkers:" << ttagDeltaMarkers_;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void PcapTxThread::setHandle(pcap_t *handle)
|
||||
{
|
||||
if (usingInternalHandle_)
|
||||
@ -261,39 +297,45 @@ void PcapTxThread::run()
|
||||
packetSequenceList_.at(i)->repeatCount_,
|
||||
packetSequenceList_.at(i)->repeatSize_,
|
||||
packetSequenceList_.at(i)->usecDelay_);
|
||||
qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld", i,
|
||||
qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld, ttagL4CksumOfs = %hu", i,
|
||||
packetSequenceList_.at(i)->packets_,
|
||||
packetSequenceList_.at(i)->usecDuration_);
|
||||
packetSequenceList_.at(i)->usecDuration_,
|
||||
packetSequenceList_.at(i)->ttagL4CksumOffset_);
|
||||
}
|
||||
|
||||
qDebug() << "First Ttag: " << firstTtagPkt_
|
||||
<< "Ttag Markers:" << ttagDeltaMarkers_;
|
||||
|
||||
lastStats_ = *stats_; // used for stream stats
|
||||
|
||||
// Init Ttag related vars. If no packets need ttag, firstTtagPkt_ is -1,
|
||||
// so nextTagPkt_ is set to practically unreachable value (due to
|
||||
// 64 bit counter wraparound time!)
|
||||
ttagMarkerIndex_ = 0;
|
||||
nextTtagPkt_ = stats_->pkts + firstTtagPkt_;
|
||||
|
||||
getTimeStamp(&startTime);
|
||||
state_ = kRunning;
|
||||
i = 0;
|
||||
while (i < packetSequenceList_.size())
|
||||
{
|
||||
|
||||
while (i < packetSequenceList_.size()) {
|
||||
_restart:
|
||||
int rptSz = packetSequenceList_.at(i)->repeatSize_;
|
||||
int rptCnt = packetSequenceList_.at(i)->repeatCount_;
|
||||
|
||||
for (int j = 0; j < rptCnt; j++)
|
||||
{
|
||||
for (int k = 0; k < rptSz; k++)
|
||||
{
|
||||
for (int j = 0; j < rptCnt; j++) {
|
||||
for (int k = 0; k < rptSz; k++) {
|
||||
int ret;
|
||||
PacketSequence *seq = packetSequenceList_.at(i+k);
|
||||
#ifdef Q_OS_WIN32
|
||||
TimeStamp ovrStart, ovrEnd;
|
||||
|
||||
if (seq->usecDuration_ <= long(1e6)) // 1s
|
||||
{
|
||||
// Use Windows-only pcap_sendqueue_transmit() if duration < 1s
|
||||
// and no stream timing is configured
|
||||
if (seq->usecDuration_ <= long(1e6) && firstTtagPkt_ < 0) {
|
||||
getTimeStamp(&ovrStart);
|
||||
ret = pcap_sendqueue_transmit(handle_,
|
||||
seq->sendQueue_, kSyncTransmit);
|
||||
if (ret >= 0)
|
||||
{
|
||||
if (ret >= 0) {
|
||||
stats_->pkts += seq->packets_;
|
||||
stats_->bytes += seq->bytes_;
|
||||
|
||||
@ -304,52 +346,42 @@ _restart:
|
||||
}
|
||||
if (stop_)
|
||||
ret = -2;
|
||||
}
|
||||
else
|
||||
{
|
||||
ret = sendQueueTransmit(handle_, seq->sendQueue_,
|
||||
} else {
|
||||
ret = sendQueueTransmit(handle_, seq,
|
||||
overHead, kSyncTransmit);
|
||||
}
|
||||
#else
|
||||
ret = sendQueueTransmit(handle_, seq->sendQueue_,
|
||||
ret = sendQueueTransmit(handle_, seq,
|
||||
overHead, kSyncTransmit);
|
||||
#endif
|
||||
|
||||
if (ret >= 0)
|
||||
{
|
||||
if (ret >= 0) {
|
||||
long usecs = seq->usecDelay_ + overHead;
|
||||
if (usecs > 0)
|
||||
{
|
||||
if (usecs > 0) {
|
||||
(*udelayFn_)(usecs);
|
||||
overHead = 0;
|
||||
}
|
||||
else
|
||||
} else
|
||||
overHead = usecs;
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
qDebug("error %d in sendQueueTransmit()", ret);
|
||||
qDebug("overHead = %ld", overHead);
|
||||
stop_ = false;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
} // rptSz
|
||||
} // rptCnt
|
||||
|
||||
// Move to the next Packet Set
|
||||
i += rptSz;
|
||||
}
|
||||
|
||||
if (returnToQIdx_ >= 0)
|
||||
{
|
||||
if (returnToQIdx_ >= 0) {
|
||||
long usecs = loopDelay_ + overHead;
|
||||
|
||||
if (usecs > 0)
|
||||
{
|
||||
if (usecs > 0) {
|
||||
(*udelayFn_)(usecs);
|
||||
overHead = 0;
|
||||
}
|
||||
else
|
||||
} else
|
||||
overHead = usecs;
|
||||
|
||||
i = returnToQIdx_;
|
||||
@ -409,38 +441,77 @@ double PcapTxThread::lastTxDuration()
|
||||
return lastTxDuration_;
|
||||
}
|
||||
|
||||
int PcapTxThread::sendQueueTransmit(pcap_t *p,
|
||||
pcap_send_queue *queue, long &overHead, int sync)
|
||||
int PcapTxThread::sendQueueTransmit(pcap_t *p, PacketSequence *seq,
|
||||
long &overHead, int sync)
|
||||
{
|
||||
TimeStamp ovrStart, ovrEnd;
|
||||
struct timeval ts;
|
||||
pcap_send_queue *queue = seq->sendQueue_;
|
||||
struct pcap_pkthdr *hdr = (struct pcap_pkthdr*) queue->buffer;
|
||||
char *end = queue->buffer + queue->len;
|
||||
|
||||
ts = hdr->ts;
|
||||
|
||||
getTimeStamp(&ovrStart);
|
||||
while((char*) hdr < end)
|
||||
{
|
||||
while((char*) hdr < end) {
|
||||
uchar *pkt = (uchar*)hdr + sizeof(*hdr);
|
||||
int pktLen = hdr->caplen;
|
||||
bool ttagPkt = false;
|
||||
#if 0
|
||||
quint16 origCksum = 0;
|
||||
#endif
|
||||
|
||||
if (sync)
|
||||
{
|
||||
// Time for a T-Tag packet?
|
||||
if (stats_->pkts == nextTtagPkt_) {
|
||||
ttagPkt = true;
|
||||
// XXX: write 2xBytes instead of 1xHalf-word to avoid
|
||||
// potential alignment problem
|
||||
*(pkt+pktLen-5) = SignProtocol::kTypeLenTtag;
|
||||
*(pkt+pktLen-6) = ttagId_;
|
||||
|
||||
#if 0
|
||||
// Recalc L4 checksum; use incremental checksum as per RFC 1624
|
||||
// HC' = ~(~HC + ~m + m')
|
||||
if (seq->ttagL4CksumOffset_) {
|
||||
quint16 *cksum = reinterpret_cast<quint16*>(
|
||||
pkt + seq->ttagL4CksumOffset_);
|
||||
origCksum = qFromBigEndian<quint16>(*cksum);
|
||||
// XXX: SignProtocol trailer
|
||||
// ... | <guid> | 0x61 | 0x00 | 0x22 | 0x1d10c0da
|
||||
// ... | <guid> | 0x61 | <TtagId> | 0x23 | 0x1d10c0da
|
||||
// For odd pkt Length, Ttag spans across 2 half-words
|
||||
// XXX: Hardcoded values instead of sign protocol constants
|
||||
// used below for readability
|
||||
quint32 newCksum = pktLen & 1 ?
|
||||
quint16(~origCksum) + quint16(~0x221d) + 0x231d
|
||||
+ quint16(~0x6100) + (0x6100 | ttagId_) :
|
||||
quint16(~origCksum) + quint16(~0x0022) + (ttagId_ << 8 | 0x23);
|
||||
while (newCksum > 0xffff)
|
||||
newCksum = (newCksum & 0xffff) + (newCksum >> 16);
|
||||
// XXX: For IPv4/UDP, if ~newcksum is 0x0000 we are supposed to
|
||||
// set the checksum as 0xffff since 0x0000 indicates no cksum
|
||||
// is present - we choose not to do this to avoid extra cost
|
||||
*cksum = qToBigEndian(quint16(~newCksum));
|
||||
}
|
||||
#endif
|
||||
ttagId_++;
|
||||
nextTtagPkt_ += ttagDeltaMarkers_.at(ttagMarkerIndex_);
|
||||
ttagMarkerIndex_++;
|
||||
if (ttagMarkerIndex_ >= ttagDeltaMarkers_.size())
|
||||
ttagMarkerIndex_ = 0;
|
||||
}
|
||||
|
||||
if (sync) {
|
||||
long usec = (hdr->ts.tv_sec - ts.tv_sec) * 1000000 +
|
||||
(hdr->ts.tv_usec - ts.tv_usec);
|
||||
|
||||
getTimeStamp(&ovrEnd);
|
||||
|
||||
overHead -= udiffTimeStamp(&ovrStart, &ovrEnd);
|
||||
Q_ASSERT(overHead <= 0);
|
||||
usec += overHead;
|
||||
if (usec > 0)
|
||||
{
|
||||
if (usec > 0) {
|
||||
(*udelayFn_)(usec);
|
||||
overHead = 0;
|
||||
}
|
||||
else
|
||||
} else
|
||||
overHead = usec;
|
||||
|
||||
ts = hdr->ts;
|
||||
@ -453,16 +524,27 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p,
|
||||
stats_->pkts++;
|
||||
stats_->bytes += pktLen;
|
||||
|
||||
// Revert T-Tag packet changes
|
||||
if (ttagPkt) {
|
||||
*(pkt+pktLen-5) = SignProtocol::kTypeLenTtagPlaceholder;
|
||||
*(pkt+pktLen-6) = 0;
|
||||
#if 0
|
||||
if (seq->ttagL4CksumOffset_) {
|
||||
quint16 *cksum = reinterpret_cast<quint16*>(
|
||||
pkt + seq->ttagL4CksumOffset_);
|
||||
*cksum = qToBigEndian(origCksum);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// Step to the next packet in the buffer
|
||||
hdr = (struct pcap_pkthdr*) (pkt + pktLen);
|
||||
pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr)); // FIXME: superfluous?
|
||||
|
||||
if (stop_)
|
||||
{
|
||||
if (stop_) {
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -613,4 +695,3 @@ void PcapTxThread::udelay(unsigned long usec)
|
||||
QThread::usleep(usec);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -42,6 +42,7 @@ public:
|
||||
bool appendToPacketList(long sec, long usec, const uchar *packet,
|
||||
int length);
|
||||
void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay);
|
||||
void setPacketListTtagMarkers(QList<uint> markers, uint repeatInterval);
|
||||
|
||||
void setHandle(pcap_t *handle);
|
||||
|
||||
@ -66,8 +67,8 @@ private:
|
||||
};
|
||||
|
||||
static void udelay(unsigned long usec);
|
||||
int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, long &overHead,
|
||||
int sync);
|
||||
int sendQueueTransmit(pcap_t *p, PacketSequence *seq,
|
||||
long &overHead, int sync);
|
||||
void updateTxStreamStats();
|
||||
|
||||
// Intermediate state variables used while building the packet list
|
||||
@ -80,7 +81,7 @@ private:
|
||||
quint64 packetListSize_; // count of pkts in packet List including repeats
|
||||
|
||||
int returnToQIdx_;
|
||||
quint64 loopDelay_;
|
||||
quint64 loopDelay_; // in nanosecs
|
||||
|
||||
void (*udelayFn_)(unsigned long);
|
||||
|
||||
@ -93,8 +94,17 @@ private:
|
||||
StatsTuple *stats_;
|
||||
StatsTuple lastStats_;
|
||||
StreamStats streamStats_;
|
||||
quint8 ttagId_{0};
|
||||
|
||||
double lastTxDuration_{0.0}; // in secs
|
||||
|
||||
// XXX: Ttag Marker config derived; not updated during Tx
|
||||
int firstTtagPkt_;
|
||||
QList<uint> ttagDeltaMarkers_;
|
||||
|
||||
// XXX: Ttag related; updated during Tx
|
||||
int ttagMarkerIndex_;
|
||||
quint64 nextTtagPkt_{0};
|
||||
};
|
||||
|
||||
#endif
|
||||
|
192
server/pcaptxttagstats.cpp
Normal file
192
server/pcaptxttagstats.cpp
Normal file
@ -0,0 +1,192 @@
|
||||
/*
|
||||
Copyright (C) 2023 Srivats P.
|
||||
|
||||
This file is part of "Ostinato"
|
||||
|
||||
This is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
#include "pcaptxttagstats.h"
|
||||
|
||||
#include "pcapextra.h"
|
||||
#include "../common/debugdefs.h"
|
||||
#include "../common/sign.h"
|
||||
#include "streamtiming.h"
|
||||
|
||||
#define Xnotify qWarning // FIXME
|
||||
|
||||
PcapTxTtagStats::PcapTxTtagStats(const char *device, int id)
|
||||
: portId_(id)
|
||||
{
|
||||
setObjectName(QString("TxT$:%1").arg(device));
|
||||
device_ = QString::fromLatin1(device);
|
||||
|
||||
timing_ = StreamTiming::instance();
|
||||
}
|
||||
|
||||
void PcapTxTtagStats::run()
|
||||
{
|
||||
int flags = PCAP_OPENFLAG_PROMISCUOUS;
|
||||
char errbuf[PCAP_ERRBUF_SIZE] = "";
|
||||
struct bpf_program bpf;
|
||||
const int optimize = 1;
|
||||
QString capture_filter = QString(
|
||||
"(ether[len - 4:4] == 0x%1) and (ether[len - 5:1] == 0x%2)")
|
||||
.arg(SignProtocol::magic(), 0, BASE_HEX)
|
||||
.arg(SignProtocol::kTypeLenTtag, 0, BASE_HEX);
|
||||
|
||||
qDebug("In %s", __PRETTY_FUNCTION__);
|
||||
qDebug("pcap-filter: %s", qPrintable(capture_filter));
|
||||
|
||||
handle_ = pcap_open_live(qPrintable(device_), 65535,
|
||||
flags, 100 /* ms */, errbuf);
|
||||
if (!handle_) {
|
||||
if (flags && QString(errbuf).contains("promiscuous")) {
|
||||
Xnotify("Unable to set promiscuous mode on <%s> - "
|
||||
"stream stats time tracking will not work", qPrintable(device_));
|
||||
goto _exit;
|
||||
}
|
||||
else {
|
||||
Xnotify("Unable to open <%s> [%s] - stream stats rx will not work",
|
||||
qPrintable(device_), errbuf);
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef Q_OS_WIN32
|
||||
// pcap_setdirection() API is not supported in Windows.
|
||||
// NOTE: WinPcap 4.1.1 and above exports a dummy API that returns -1
|
||||
// but since we would like to work with previous versions of WinPcap
|
||||
// also, we assume the API does not exist
|
||||
isDirectional_ = false;
|
||||
#else
|
||||
if (pcap_setdirection(handle_, PCAP_D_OUT) < 0) {
|
||||
qDebug("TxTtagStats: Error setting OUT direction %s: %s\n",
|
||||
qPrintable(device_), pcap_geterr(handle_));
|
||||
isDirectional_ = false;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (pcap_compile(handle_, &bpf, qPrintable(capture_filter),
|
||||
optimize, 0) < 0) {
|
||||
qWarning("%s: error compiling filter: %s", qPrintable(device_),
|
||||
pcap_geterr(handle_));
|
||||
goto _skip_filter;
|
||||
}
|
||||
|
||||
if (pcap_setfilter(handle_, &bpf) < 0) {
|
||||
qWarning("%s: error setting filter: %s", qPrintable(device_),
|
||||
pcap_geterr(handle_));
|
||||
goto _skip_filter;
|
||||
}
|
||||
|
||||
_skip_filter:
|
||||
clearDebugStats();
|
||||
PcapSession::preRun();
|
||||
state_ = kRunning;
|
||||
while (1) {
|
||||
int ret;
|
||||
struct pcap_pkthdr *hdr;
|
||||
const uchar *data;
|
||||
|
||||
ret = pcap_next_ex(handle_, &hdr, &data);
|
||||
switch (ret) {
|
||||
case 1: {
|
||||
uint ttagId;
|
||||
uint guid;
|
||||
if (SignProtocol::packetTtagId(data, hdr->caplen,
|
||||
&ttagId, &guid)) {
|
||||
#ifdef Q_OS_WIN32
|
||||
// TxPort is NOT us ==> Rx Packet, so skip
|
||||
// See similar check in PcapRxStats for details
|
||||
if (ttagId >> 8 != uint(portId_))
|
||||
break;
|
||||
ttagId &= 0xFF;
|
||||
#endif
|
||||
timing_->recordTxTime(portId_, guid, ttagId, hdr->ts);
|
||||
timingDebug("[%d TX] %ld:%ld ttag %u guid %u", portId_,
|
||||
hdr->ts.tv_sec, long(hdr->ts.tv_usec), ttagId, guid);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 0:
|
||||
// timeout: just go back to the loop
|
||||
break;
|
||||
case -1:
|
||||
qWarning("%s: error reading packet (%d): %s",
|
||||
__PRETTY_FUNCTION__, ret, pcap_geterr(handle_));
|
||||
break;
|
||||
case -2:
|
||||
qDebug("%s: Loop/signal break or some other error",
|
||||
__PRETTY_FUNCTION__);
|
||||
break;
|
||||
default:
|
||||
qWarning("%s: Unexpected return value %d",
|
||||
__PRETTY_FUNCTION__, ret);
|
||||
stop_ = true;
|
||||
}
|
||||
|
||||
if (stop_) {
|
||||
qDebug("user requested txTtagStats stop");
|
||||
break;
|
||||
}
|
||||
}
|
||||
PcapSession::postRun();
|
||||
pcap_close(handle_);
|
||||
handle_ = NULL;
|
||||
stop_ = false;
|
||||
|
||||
_exit:
|
||||
state_ = kFinished;
|
||||
}
|
||||
|
||||
bool PcapTxTtagStats::start()
|
||||
{
|
||||
if (state_ == kRunning) {
|
||||
qWarning("TxTtagStats start requested but is already running!");
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
state_ = kNotStarted;
|
||||
PcapSession::start();
|
||||
|
||||
while (state_ == kNotStarted)
|
||||
QThread::msleep(10);
|
||||
_exit:
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PcapTxTtagStats::stop()
|
||||
{
|
||||
if (state_ == kRunning) {
|
||||
stop_ = true;
|
||||
PcapSession::stop();
|
||||
while (state_ == kRunning)
|
||||
QThread::msleep(10);
|
||||
}
|
||||
else
|
||||
qWarning("TxTtagStats stop requested but is not running!");
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PcapTxTtagStats::isRunning()
|
||||
{
|
||||
return (state_ == kRunning);
|
||||
}
|
||||
|
||||
bool PcapTxTtagStats::isDirectional()
|
||||
{
|
||||
return isDirectional_;
|
||||
}
|
55
server/pcaptxttagstats.h
Normal file
55
server/pcaptxttagstats.h
Normal file
@ -0,0 +1,55 @@
|
||||
/*
|
||||
Copyright (C) 2023 Srivats P.
|
||||
|
||||
This file is part of "Ostinato"
|
||||
|
||||
This is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
#ifndef _PCAP_TX_TTAG_H
|
||||
#define _PCAP_TX_TTAG_H
|
||||
|
||||
#include "pcapsession.h"
|
||||
|
||||
class StreamTiming;
|
||||
|
||||
class PcapTxTtagStats: public PcapSession
|
||||
{
|
||||
public:
|
||||
PcapTxTtagStats(const char *device, int id);
|
||||
|
||||
void run();
|
||||
bool start();
|
||||
bool stop();
|
||||
bool isRunning();
|
||||
bool isDirectional();
|
||||
|
||||
private:
|
||||
enum State {
|
||||
kNotStarted,
|
||||
kRunning,
|
||||
kFinished
|
||||
};
|
||||
|
||||
QString device_;
|
||||
bool isDirectional_{true};
|
||||
volatile State state_{kNotStarted};
|
||||
volatile bool stop_{false};
|
||||
|
||||
int portId_;
|
||||
|
||||
StreamTiming *timing_{nullptr};
|
||||
};
|
||||
|
||||
#endif
|
@ -30,6 +30,7 @@ struct StreamStatsTuple
|
||||
quint64 tx_bytes;
|
||||
};
|
||||
|
||||
// Key(uint) is GUID
|
||||
typedef QHash<uint, StreamStatsTuple> StreamStats;
|
||||
typedef QHashIterator<uint, StreamStatsTuple> StreamStatsIterator;
|
||||
|
||||
|
246
server/streamtiming.cpp
Normal file
246
server/streamtiming.cpp
Normal file
@ -0,0 +1,246 @@
|
||||
/*
|
||||
Copyright (C) 2023 Srivats P.
|
||||
|
||||
This file is part of "Ostinato"
|
||||
|
||||
This is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
#include "streamtiming.h"
|
||||
|
||||
#include "../common/debugdefs.h"
|
||||
#include "timestamp.h"
|
||||
|
||||
#include <QCoreApplication>
|
||||
|
||||
StreamTiming::StreamTiming(QObject *parent)
|
||||
: QObject(parent)
|
||||
{
|
||||
// This class MUST be part of the main thread so that timers can work
|
||||
Q_ASSERT(this->thread() == QCoreApplication::instance()->thread());
|
||||
|
||||
timer_ = new QTimer(this);
|
||||
connect(timer_, &QTimer::timeout, this, &StreamTiming::processRecords);
|
||||
timer_->setInterval(3000);
|
||||
|
||||
gcTimer_ = new QTimer(this);
|
||||
connect(gcTimer_, &QTimer::timeout, this, &StreamTiming::deleteStaleRecords);
|
||||
gcTimer_->setInterval(30000);
|
||||
}
|
||||
|
||||
void StreamTiming::start(uint portId)
|
||||
{
|
||||
if (activePortSet_.isEmpty()) { // First port?
|
||||
timer_->start();
|
||||
gcTimer_->start();
|
||||
qDebug("Stream Latency tracking started");
|
||||
}
|
||||
activePortSet_.insert(portId);
|
||||
qDebug("Stream Latency tracking started for port %u", portId);
|
||||
}
|
||||
|
||||
void StreamTiming::stop(uint portId)
|
||||
{
|
||||
activePortSet_.remove(portId);
|
||||
qDebug("Stream Latency tracking stopped for port %u", portId);
|
||||
if (activePortSet_.isEmpty()) { // Last port?
|
||||
processRecords();
|
||||
deleteStaleRecords();
|
||||
timer_->stop();
|
||||
gcTimer_->stop();
|
||||
qDebug("Stream Latency tracking stopped");
|
||||
}
|
||||
}
|
||||
|
||||
bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId,
|
||||
const struct timespec ×tamp)
|
||||
{
|
||||
TxRxKey key = makeKey(guid, ttagId);
|
||||
TtagData value = { .timeStamp = timestamp, .portId = portId};
|
||||
|
||||
QMutexLocker locker(&txHashLock_);
|
||||
txHash_.insert(key, value);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId,
|
||||
const struct timespec ×tamp)
|
||||
{
|
||||
TxRxKey key = makeKey(guid, ttagId);
|
||||
TtagData value = { .timeStamp = timestamp, .portId = portId};
|
||||
|
||||
QMutexLocker locker(&rxHashLock_);
|
||||
rxHash_.insert(key, value);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId,
|
||||
const struct timeval ×tamp)
|
||||
{
|
||||
struct timespec ts;
|
||||
ts.tv_sec = timestamp.tv_sec;
|
||||
ts.tv_nsec = timestamp.tv_usec*1000;
|
||||
|
||||
return recordTxTime(portId, guid, ttagId, ts);
|
||||
}
|
||||
|
||||
bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId,
|
||||
const struct timeval ×tamp)
|
||||
{
|
||||
struct timespec ts;
|
||||
ts.tv_sec = timestamp.tv_sec;
|
||||
ts.tv_nsec = timestamp.tv_usec*1000;
|
||||
|
||||
return recordRxTime(portId, guid, ttagId, ts);
|
||||
}
|
||||
|
||||
quint64 StreamTiming::delay(uint portId, uint guid)
|
||||
{
|
||||
Q_ASSERT(guid <= SignProtocol::kMaxGuid);
|
||||
|
||||
// Process anything pending first
|
||||
processRecords();
|
||||
|
||||
QMutexLocker locker(&timingLock_);
|
||||
|
||||
if (!timing_.contains(portId))
|
||||
return 0;
|
||||
|
||||
Timing t = timing_.value(portId)->value(guid);
|
||||
if (t.countDelays == 0)
|
||||
return 0;
|
||||
|
||||
return timespecToNsecs(t.sumDelays)/t.countDelays;
|
||||
}
|
||||
|
||||
void StreamTiming::clear(uint portId, uint guid)
|
||||
{
|
||||
// XXX: We need to clear only the final timing hash; rx/tx hashes
|
||||
// are cleared by StreamTiming itself as part of processRecords and
|
||||
// deleteStaleRecords respectively
|
||||
QMutexLocker locker(&timingLock_);
|
||||
|
||||
if (!timing_.contains(portId))
|
||||
return;
|
||||
|
||||
PortTiming *portTiming = timing_.value(portId);
|
||||
if (!portTiming)
|
||||
return;
|
||||
|
||||
if (guid == SignProtocol::kInvalidGuid)
|
||||
portTiming->clear(); // remove ALL guids
|
||||
else
|
||||
portTiming->remove(guid);
|
||||
}
|
||||
|
||||
int StreamTiming::processRecords()
|
||||
{
|
||||
// TODO: yield after a certain count of records or time when called in
|
||||
// timer context; when called from delay(), process ALL
|
||||
|
||||
int count = 0;
|
||||
QMutexLocker txLocker(&txHashLock_);
|
||||
QMutexLocker rxLocker(&rxHashLock_);
|
||||
QMutexLocker timingLocker(&timingLock_);
|
||||
|
||||
auto i = rxHash_.begin();
|
||||
while (i != rxHash_.end()) {
|
||||
if (txHash_.contains(i.key())) {
|
||||
struct timespec txTime = txHash_.take(i.key()).timeStamp;
|
||||
struct timespec rxTime = i.value().timeStamp;
|
||||
struct timespec diff;
|
||||
timespecsub(&rxTime, &txTime, &diff);
|
||||
|
||||
uint guid = i.key() >> 8;
|
||||
uint portId = i.value().portId;
|
||||
|
||||
if (!timing_.contains(portId))
|
||||
timing_.insert(portId, new PortTiming);
|
||||
PortTiming *portTiming = timing_.value(portId);
|
||||
Timing &guidTiming = (*portTiming)[guid];
|
||||
timespecadd(&guidTiming.sumDelays, &diff, &guidTiming.sumDelays);
|
||||
guidTiming.countDelays++;
|
||||
|
||||
count++;
|
||||
|
||||
timingDebug("[%u/%u/%u] diff %ld.%09ld (%ld.%09ld - %ld.%09ld)",
|
||||
i.value().portId, guid, i.key() & 0xFF,
|
||||
diff.tv_sec, diff.tv_nsec,
|
||||
rxTime.tv_sec, rxTime.tv_nsec,
|
||||
txTime.tv_sec, txTime.tv_nsec);
|
||||
timingDebug("%d:[%u/%u] total %ld.%09ld count %u",
|
||||
count, i.value().portId, guid,
|
||||
guidTiming.sumDelays.tv_sec, guidTiming.sumDelays.tv_nsec,
|
||||
guidTiming.countDelays);
|
||||
}
|
||||
i = rxHash_.erase(i);
|
||||
}
|
||||
|
||||
Q_ASSERT(rxHash_.isEmpty());
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
int StreamTiming::deleteStaleRecords()
|
||||
{
|
||||
// TODO: yield after a certain count of records or time unless we are
|
||||
// idle when we process all; how do we determine we are "idle"?
|
||||
|
||||
// XXX: We assume the Tx packet timestamps are based on CLOCK_REALTIME
|
||||
// (or a similar and comparable source). Since garbage collection timer
|
||||
// is not a short interval, it need not be the exact same source as long
|
||||
// as the values are comparable
|
||||
int count = 0;
|
||||
struct timespec now;
|
||||
clock_gettime(CLOCK_REALTIME, &now);
|
||||
|
||||
// XXX: processRecords() iterates and deletes all rx records irrespective
|
||||
// of whether it found a matching tx record. So for garbage collection we
|
||||
// only need to look at (and delete) tx records
|
||||
QMutexLocker locker(&txHashLock_);
|
||||
|
||||
auto i = txHash_.begin();
|
||||
while (i != txHash_.end()) {
|
||||
struct timespec txTime = i.value().timeStamp;
|
||||
struct timespec diff;
|
||||
timespecsub(&now, &txTime, &diff);
|
||||
timingDebug("gc diff %ld", diff.tv_sec);
|
||||
if (diff.tv_sec > 30) {
|
||||
i = txHash_.erase(i);
|
||||
count++;
|
||||
} else {
|
||||
i++;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (count)
|
||||
qDebug("Latency garbage collected %d stale tx timing records", count);
|
||||
return count;
|
||||
}
|
||||
|
||||
StreamTiming* StreamTiming::instance()
|
||||
{
|
||||
static StreamTiming *instance{nullptr};
|
||||
|
||||
// XXX: As of this writing, AbstractPort constructor is the first one
|
||||
// to call this - hence this singleton is created when the first port
|
||||
// is created
|
||||
if (!instance)
|
||||
instance = new StreamTiming(QCoreApplication::instance());
|
||||
|
||||
return instance;
|
||||
}
|
100
server/streamtiming.h
Normal file
100
server/streamtiming.h
Normal file
@ -0,0 +1,100 @@
|
||||
/*
|
||||
Copyright (C) 2023 Srivats P.
|
||||
|
||||
This file is part of "Ostinato"
|
||||
|
||||
This is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
#ifndef _STREAM_TIMING
|
||||
#define _STREAM_TIMING
|
||||
|
||||
#include "../common/sign.h"
|
||||
|
||||
#include <QHash>
|
||||
#include <QMutex>
|
||||
#include <QSet>
|
||||
#include <QTimer>
|
||||
|
||||
#include <time.h>
|
||||
|
||||
class StreamTiming : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
public:
|
||||
bool recordTxTime(uint portId, uint guid, uint ttagId,
|
||||
const struct timespec ×tamp);
|
||||
bool recordRxTime(uint portId, uint guid, uint ttagId,
|
||||
const struct timespec ×tamp);
|
||||
|
||||
bool recordTxTime(uint portId, uint guid, uint ttagId,
|
||||
const struct timeval ×tamp);
|
||||
bool recordRxTime(uint portId, uint guid, uint ttagId,
|
||||
const struct timeval ×tamp);
|
||||
|
||||
quint64 delay(uint portId, uint guid);
|
||||
void clear(uint portId, uint guid = SignProtocol::kInvalidGuid);
|
||||
|
||||
static StreamTiming* instance();
|
||||
|
||||
public slots:
|
||||
void start(uint portId);
|
||||
void stop(uint portId);
|
||||
|
||||
private:
|
||||
StreamTiming(QObject *parent=nullptr);
|
||||
|
||||
int processRecords();
|
||||
int deleteStaleRecords();
|
||||
|
||||
quint32 makeKey(uint guid, uint ttagId) {
|
||||
return guid << 8 | (ttagId & 0xFF);
|
||||
}
|
||||
|
||||
// XXX: use only time intervals, not absolute time
|
||||
quint64 timespecToNsecs(const struct timespec &interval) {
|
||||
return interval.tv_nsec + interval.tv_sec*1e9;
|
||||
}
|
||||
|
||||
struct TtagData {
|
||||
struct timespec timeStamp; // nanosec resolution
|
||||
uint portId;
|
||||
};
|
||||
|
||||
struct Timing {
|
||||
struct timespec sumDelays; // nanosec resolution
|
||||
uint countDelays;
|
||||
};
|
||||
|
||||
QSet<uint> activePortSet_;
|
||||
|
||||
// XXX: TxRxKey = guid (24 bit MSB) + ttagid (8 bit LSB)
|
||||
// TODO: encode tx port in in packet and use as part of key
|
||||
typedef quint32 TxRxKey;
|
||||
QHash<TxRxKey, TtagData> txHash_;
|
||||
QHash<TxRxKey, TtagData> rxHash_;
|
||||
QMutex txHashLock_;
|
||||
QMutex rxHashLock_;
|
||||
|
||||
typedef uint PortIdKey;
|
||||
typedef uint GuidKey;
|
||||
typedef QHash<GuidKey, Timing> PortTiming;
|
||||
QHash<PortIdKey, PortTiming*> timing_;
|
||||
QMutex timingLock_;
|
||||
|
||||
QTimer *timer_; // Periodic timer to process tx/rx records
|
||||
QTimer *gcTimer_; // Garbage collection for stale tx records
|
||||
};
|
||||
|
||||
#endif
|
@ -21,6 +21,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
#define _TIMESTAMP_H
|
||||
|
||||
#include "timespecops.h"
|
||||
#include "timevalops.h"
|
||||
|
||||
#include <QtGlobal>
|
||||
|
||||
@ -69,6 +70,7 @@ static long inline udiffTimeStamp(const TimeStamp *start, const TimeStamp *end)
|
||||
#endif
|
||||
|
||||
#elif defined(Q_OS_WIN32)
|
||||
#include <windows.h>
|
||||
static quint64 gTicksFreq;
|
||||
typedef LARGE_INTEGER TimeStamp;
|
||||
static void inline getTimeStamp(TimeStamp* stamp)
|
||||
|
54
server/timevalops.h
Normal file
54
server/timevalops.h
Normal file
@ -0,0 +1,54 @@
|
||||
/*
|
||||
This file is part of "Ostinato"
|
||||
|
||||
These macros are copied from BSD sys/time.h
|
||||
*/
|
||||
|
||||
#ifndef _TIME_VAL_OPS
|
||||
#define _TIME_VAL_OPS
|
||||
|
||||
/* Operations on timeval - for platforms where some are not already defined*/
|
||||
#if defined(Q_OS_WIN32)
|
||||
|
||||
#ifndef timerclear
|
||||
#define timerclear(tvp) ((tvp)->tv_sec = (tvp)->tv_usec = 0)
|
||||
#endif
|
||||
|
||||
#ifndef timerisset
|
||||
#define timerisset(tvp) ((tvp)->tv_sec || (tvp)->tv_usec)
|
||||
#endif
|
||||
|
||||
#ifndef timercmp
|
||||
#define timercmp(tvp, uvp, cmp) \
|
||||
(((tvp)->tv_sec == (uvp)->tv_sec) ? \
|
||||
g((tvp)->tv_usec cmp (uvp)->tv_usec) : \
|
||||
g((tvp)->tv_sec cmp (uvp)->tv_sec))
|
||||
#endif
|
||||
|
||||
#ifndef timeradd
|
||||
#define timeradd(tvp, uvp, vvp) \
|
||||
do { \
|
||||
(vvp)->tv_sec = (tvp)->tv_sec + (uvp)->tv_sec; \
|
||||
(vvp)->tv_usec = (tvp)->tv_usec + (uvp)->tv_usec; \
|
||||
if ((vvp)->tv_usec >= 1000000) { \
|
||||
(vvp)->tv_sec++; \
|
||||
(vvp)->tv_usec -= 1000000; \
|
||||
} \
|
||||
} while (0)
|
||||
#endif
|
||||
|
||||
#ifndef timersub
|
||||
#define timersub(tvp, uvp, vvp) \
|
||||
do { \
|
||||
(vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \
|
||||
(vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \
|
||||
if ((vvp)->tv_usec < 0) { \
|
||||
(vvp)->tv_sec--; \
|
||||
(vvp)->tv_usec += 1000000; \
|
||||
} \
|
||||
} while (0)
|
||||
#endif
|
||||
|
||||
#endif /* Q_OS_WIN32 */
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user