sign: Refactored PortTransmitter into PcapTransmitter and PcapTxThread classes. Also broke out PacketSequence and Timestamp stuff into their own files. No change in functionality.

This commit is contained in:
Srivats P 2016-09-13 19:05:24 +05:30
parent eba14bf9a3
commit 12351d6304
9 changed files with 731 additions and 532 deletions

View File

@ -41,6 +41,7 @@ SOURCES += \
abstractport.cpp \
pcapport.cpp \
pcaptransmitter.cpp \
pcaptxthread.cpp \
bsdport.cpp \
linuxport.cpp \
winpcapport.cpp

72
server/packetsequence.h Normal file
View File

@ -0,0 +1,72 @@
/*
Copyright (C) 2010-2016 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#ifndef _PACKET_SEQUENCE_H
#define _PACKET_SEQUENCE_H
#include "pcapextra.h"
class PacketSequence
{
public:
PacketSequence() {
sendQueue_ = pcap_sendqueue_alloc(1*1024*1024);
lastPacket_ = NULL;
packets_ = 0;
bytes_ = 0;
usecDuration_ = 0;
repeatCount_ = 1;
repeatSize_ = 1;
usecDelay_ = 0;
}
~PacketSequence() {
pcap_sendqueue_destroy(sendQueue_);
}
bool hasFreeSpace(int size) {
if ((sendQueue_->len + size) <= sendQueue_->maxlen)
return true;
else
return false;
}
int appendPacket(const struct pcap_pkthdr *pktHeader,
const uchar *pktData) {
if (lastPacket_)
{
usecDuration_ += (pktHeader->ts.tv_sec
- lastPacket_->ts.tv_sec) * long(1e6);
usecDuration_ += (pktHeader->ts.tv_usec
- lastPacket_->ts.tv_usec);
}
packets_++;
bytes_ += pktHeader->caplen;
lastPacket_ = (struct pcap_pkthdr *)
(sendQueue_->buffer + sendQueue_->len);
return pcap_sendqueue_queue(sendQueue_, pktHeader, pktData);
}
pcap_send_queue *sendQueue_;
struct pcap_pkthdr *lastPacket_;
long packets_;
long bytes_;
ulong usecDuration_;
int repeatCount_;
int repeatSize_;
long usecDelay_;
};
#endif

View File

@ -31,7 +31,7 @@ PcapPort::PcapPort(int id, const char *device)
{
monitorRx_ = new PortMonitor(device, kDirectionRx, &stats_);
monitorTx_ = new PortMonitor(device, kDirectionTx, &stats_);
transmitter_ = new PortTransmitter(device);
transmitter_ = new PcapTransmitter(device);
capturer_ = new PortCapturer(device);
emulXcvr_ = new EmulationTransceiver(device, deviceManager_);

View File

@ -163,7 +163,7 @@ protected:
void updateNotes();
private:
PortTransmitter *transmitter_;
PcapTransmitter *transmitter_;
PortCapturer *capturer_;
EmulationTransceiver *emulXcvr_;

View File

@ -19,478 +19,66 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcaptransmitter.h"
#if defined(Q_OS_LINUX)
typedef struct timeval TimeStamp;
static void inline getTimeStamp(TimeStamp *stamp)
PcapTransmitter::PcapTransmitter(const char *device)
: txThread_(device)
{
gettimeofday(stamp, NULL);
}
// Returns time diff in usecs between end and start
static long inline udiffTimeStamp(const TimeStamp *start, const TimeStamp *end)
{
struct timeval diff;
long usecs;
timersub(end, start, &diff);
usecs = diff.tv_usec;
if (diff.tv_sec)
usecs += diff.tv_sec*1e6;
return usecs;
}
#elif defined(Q_OS_WIN32)
static quint64 gTicksFreq;
typedef LARGE_INTEGER TimeStamp;
static void inline getTimeStamp(TimeStamp* stamp)
{
QueryPerformanceCounter(stamp);
}
static long inline udiffTimeStamp(const TimeStamp *start, const TimeStamp *end)
{
if (end->QuadPart >= start->QuadPart)
return (end->QuadPart - start->QuadPart)*long(1e6)/gTicksFreq;
else
{
// FIXME: incorrect! what's the max value for this counter before
// it rolls over?
return (start->QuadPart)*long(1e6)/gTicksFreq;
}
}
#else
typedef int TimeStamp;
static void inline getTimeStamp(TimeStamp*) {}
static long inline udiffTimeStamp(const TimeStamp*, const TimeStamp*) { return 0; }
#endif
PortTransmitter::PortTransmitter(const char *device)
{
char errbuf[PCAP_ERRBUF_SIZE] = "";
#ifdef Q_OS_WIN32
LARGE_INTEGER freq;
if (QueryPerformanceFrequency(&freq))
gTicksFreq = freq.QuadPart;
else
Q_ASSERT_X(false, "PortTransmitter::PortTransmitter",
"This Win32 platform does not support performance counter");
#endif
state_ = kNotStarted;
returnToQIdx_ = -1;
loopDelay_ = 0;
stop_ = false;
stats_ = new AbstractPort::PortStats;
usingInternalStats_ = true;
handle_ = pcap_open_live(device, 64 /* FIXME */, 0, 1000 /* ms */, errbuf);
if (handle_ == NULL)
goto _open_error;
usingInternalHandle_ = true;
return;
_open_error:
qDebug("%s: Error opening port %s: %s\n", __FUNCTION__, device, errbuf);
usingInternalHandle_ = false;
}
PortTransmitter::~PortTransmitter()
{
if (usingInternalStats_)
delete stats_;
if (usingInternalHandle_)
pcap_close(handle_);
}
bool PortTransmitter::setRateAccuracy(
bool PcapTransmitter::setRateAccuracy(
AbstractPort::Accuracy accuracy)
{
switch (accuracy) {
case AbstractPort::kHighAccuracy:
udelayFn_ = udelay;
qWarning("%s: rate accuracy set to High - busy wait", __FUNCTION__);
break;
case AbstractPort::kLowAccuracy:
udelayFn_ = QThread::usleep;
qWarning("%s: rate accuracy set to Low - usleep", __FUNCTION__);
break;
default:
qWarning("%s: unsupported rate accuracy value %d", __FUNCTION__,
accuracy);
return false;
}
return true;
return txThread_.setRateAccuracy(accuracy);
}
void PortTransmitter::clearPacketList()
void PcapTransmitter::clearPacketList()
{
Q_ASSERT(!isRunning());
// \todo lock for packetSequenceList
while(packetSequenceList_.size())
delete packetSequenceList_.takeFirst();
currentPacketSequence_ = NULL;
repeatSequenceStart_ = -1;
repeatSize_ = 0;
packetCount_ = 0;
returnToQIdx_ = -1;
setPacketListLoopMode(false, 0, 0);
txThread_.clearPacketList();
}
void PortTransmitter::loopNextPacketSet(qint64 size, qint64 repeats,
long repeatDelaySec, long repeatDelayNsec)
void PcapTransmitter::loopNextPacketSet(
qint64 size,
qint64 repeats,
long repeatDelaySec,
long repeatDelayNsec)
{
currentPacketSequence_ = new PacketSequence;
currentPacketSequence_->repeatCount_ = repeats;
currentPacketSequence_->usecDelay_ = repeatDelaySec * long(1e6)
+ repeatDelayNsec/1000;
repeatSequenceStart_ = packetSequenceList_.size();
repeatSize_ = size;
packetCount_ = 0;
packetSequenceList_.append(currentPacketSequence_);
txThread_.loopNextPacketSet(size, repeats, repeatDelaySec, repeatDelayNsec);
}
bool PortTransmitter::appendToPacketList(long sec, long nsec,
bool PcapTransmitter::appendToPacketList(long sec, long nsec,
const uchar *packet, int length)
{
bool op = true;
pcap_pkthdr pktHdr;
pktHdr.caplen = pktHdr.len = length;
pktHdr.ts.tv_sec = sec;
pktHdr.ts.tv_usec = nsec/1000;
if (currentPacketSequence_ == NULL ||
!currentPacketSequence_->hasFreeSpace(2*sizeof(pcap_pkthdr)+length))
{
if (currentPacketSequence_ != NULL)
{
long usecs;
usecs = (pktHdr.ts.tv_sec
- currentPacketSequence_->lastPacket_->ts.tv_sec)
* long(1e6);
usecs += (pktHdr.ts.tv_usec
- currentPacketSequence_->lastPacket_->ts.tv_usec);
currentPacketSequence_->usecDelay_ = usecs;
}
//! \todo (LOW): calculate sendqueue size
currentPacketSequence_ = new PacketSequence;
packetSequenceList_.append(currentPacketSequence_);
// Validate that the pkt will fit inside the new currentSendQueue_
Q_ASSERT(currentPacketSequence_->hasFreeSpace(
sizeof(pcap_pkthdr) + length));
}
if (currentPacketSequence_->appendPacket(&pktHdr, (u_char*) packet) < 0)
{
op = false;
}
packetCount_++;
if (repeatSize_ > 0 && packetCount_ == repeatSize_)
{
qDebug("repeatSequenceStart_=%d, repeatSize_ = %llu",
repeatSequenceStart_, repeatSize_);
// Set the packetSequence repeatSize
Q_ASSERT(repeatSequenceStart_ >= 0);
Q_ASSERT(repeatSequenceStart_ < packetSequenceList_.size());
if (currentPacketSequence_ != packetSequenceList_[repeatSequenceStart_])
{
PacketSequence *start = packetSequenceList_[repeatSequenceStart_];
currentPacketSequence_->usecDelay_ = start->usecDelay_;
start->usecDelay_ = 0;
start->repeatSize_ =
packetSequenceList_.size() - repeatSequenceStart_;
}
repeatSize_ = 0;
// End current pktSeq and trigger a new pktSeq allocation for next pkt
currentPacketSequence_ = NULL;
}
return op;
return txThread_.appendToPacketList(sec, nsec, packet, length);
}
void PortTransmitter::setHandle(pcap_t *handle)
void PcapTransmitter::setHandle(pcap_t *handle)
{
if (usingInternalHandle_)
pcap_close(handle_);
handle_ = handle;
usingInternalHandle_ = false;
txThread_.setHandle(handle);
}
void PortTransmitter::useExternalStats(AbstractPort::PortStats *stats)
void PcapTransmitter::setPacketListLoopMode(
bool loop,
quint64 secDelay,
quint64 nsecDelay)
{
if (usingInternalStats_)
delete stats_;
stats_ = stats;
usingInternalStats_ = false;
txThread_.setPacketListLoopMode(loop, secDelay, nsecDelay);
}
void PortTransmitter::run()
void PcapTransmitter::useExternalStats(AbstractPort::PortStats *stats)
{
//! \todo (MED) Stream Mode - continuous: define before implement
// NOTE1: We can't use pcap_sendqueue_transmit() directly even on Win32
// 'coz of 2 reasons - there's no way of stopping it before all packets
// in the sendQueue are sent out and secondly, stats are available only
// when all packets have been sent - no periodic updates
//
// NOTE2: Transmit on the Rx Handle so that we can receive it back
// on the Tx Handle to do stats
//
// NOTE3: Update pcapExtra counters - port TxStats will be updated in the
// 'stats callback' function so that both Rx and Tx stats are updated
// together
const int kSyncTransmit = 1;
int i;
long overHead = 0; // overHead should be negative or zero
qDebug("packetSequenceList_.size = %d", packetSequenceList_.size());
if (packetSequenceList_.size() <= 0)
goto _exit;
for(i = 0; i < packetSequenceList_.size(); i++) {
qDebug("sendQ[%d]: rptCnt = %d, rptSz = %d, usecDelay = %ld", i,
packetSequenceList_.at(i)->repeatCount_,
packetSequenceList_.at(i)->repeatSize_,
packetSequenceList_.at(i)->usecDelay_);
qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld", i,
packetSequenceList_.at(i)->packets_,
packetSequenceList_.at(i)->usecDuration_);
}
state_ = kRunning;
i = 0;
while (i < packetSequenceList_.size())
{
_restart:
int rptSz = packetSequenceList_.at(i)->repeatSize_;
int rptCnt = packetSequenceList_.at(i)->repeatCount_;
for (int j = 0; j < rptCnt; j++)
{
for (int k = 0; k < rptSz; k++)
{
int ret;
PacketSequence *seq = packetSequenceList_.at(i+k);
#ifdef Q_OS_WIN32
TimeStamp ovrStart, ovrEnd;
if (seq->usecDuration_ <= long(1e6)) // 1s
{
getTimeStamp(&ovrStart);
ret = pcap_sendqueue_transmit(handle_,
seq->sendQueue_, kSyncTransmit);
if (ret >= 0)
{
stats_->txPkts += seq->packets_;
stats_->txBytes += seq->bytes_;
getTimeStamp(&ovrEnd);
overHead += seq->usecDuration_
- udiffTimeStamp(&ovrStart, &ovrEnd);
Q_ASSERT(overHead <= 0);
}
if (stop_)
ret = -2;
}
else
{
ret = sendQueueTransmit(handle_, seq->sendQueue_,
overHead, kSyncTransmit);
}
#else
ret = sendQueueTransmit(handle_, seq->sendQueue_,
overHead, kSyncTransmit);
#endif
if (ret >= 0)
{
long usecs = seq->usecDelay_ + overHead;
if (usecs > 0)
{
(*udelayFn_)(usecs);
overHead = 0;
}
else
overHead = usecs;
}
else
{
qDebug("error %d in sendQueueTransmit()", ret);
qDebug("overHead = %ld", overHead);
stop_ = false;
goto _exit;
}
}
}
// Move to the next Packet Set
i += rptSz;
}
if (returnToQIdx_ >= 0)
{
long usecs = loopDelay_ + overHead;
if (usecs > 0)
{
(*udelayFn_)(usecs);
overHead = 0;
}
else
overHead = usecs;
i = returnToQIdx_;
goto _restart;
}
_exit:
state_ = kFinished;
txThread_.useExternalStats(stats);
}
void PortTransmitter::start()
void PcapTransmitter::start()
{
// FIXME: return error
if (state_ == kRunning) {
qWarning("Transmit start requested but is already running!");
return;
}
state_ = kNotStarted;
QThread::start();
while (state_ == kNotStarted)
QThread::msleep(10);
txThread_.start();
}
void PortTransmitter::stop()
void PcapTransmitter::stop()
{
if (state_ == kRunning) {
stop_ = true;
while (state_ == kRunning)
QThread::msleep(10);
}
else {
// FIXME: return error
qWarning("Transmit stop requested but is not running!");
return;
}
txThread_.stop();
}
bool PortTransmitter::isRunning()
bool PcapTransmitter::isRunning()
{
return (state_ == kRunning);
return txThread_.isRunning();
}
int PortTransmitter::sendQueueTransmit(pcap_t *p,
pcap_send_queue *queue, long &overHead, int sync)
{
TimeStamp ovrStart, ovrEnd;
struct timeval ts;
struct pcap_pkthdr *hdr = (struct pcap_pkthdr*) queue->buffer;
char *end = queue->buffer + queue->len;
ts = hdr->ts;
getTimeStamp(&ovrStart);
while((char*) hdr < end)
{
uchar *pkt = (uchar*)hdr + sizeof(*hdr);
int pktLen = hdr->caplen;
if (sync)
{
long usec = (hdr->ts.tv_sec - ts.tv_sec) * 1000000 +
(hdr->ts.tv_usec - ts.tv_usec);
getTimeStamp(&ovrEnd);
overHead -= udiffTimeStamp(&ovrStart, &ovrEnd);
Q_ASSERT(overHead <= 0);
usec += overHead;
if (usec > 0)
{
(*udelayFn_)(usec);
overHead = 0;
}
else
overHead = usec;
ts = hdr->ts;
getTimeStamp(&ovrStart);
}
Q_ASSERT(pktLen > 0);
pcap_sendpacket(p, pkt, pktLen);
stats_->txPkts++;
stats_->txBytes += pktLen;
// Step to the next packet in the buffer
hdr = (struct pcap_pkthdr*) (pkt + pktLen);
pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr));
if (stop_)
{
return -2;
}
}
return 0;
}
void PortTransmitter::udelay(unsigned long usec)
{
#if defined(Q_OS_WIN32)
LARGE_INTEGER tgtTicks;
LARGE_INTEGER curTicks;
QueryPerformanceCounter(&curTicks);
tgtTicks.QuadPart = curTicks.QuadPart + (usec*gTicksFreq)/1000000;
while (curTicks.QuadPart < tgtTicks.QuadPart)
QueryPerformanceCounter(&curTicks);
#elif defined(Q_OS_LINUX)
struct timeval delay, target, now;
//qDebug("usec delay = %ld", usec);
delay.tv_sec = 0;
delay.tv_usec = usec;
while (delay.tv_usec >= 1000000)
{
delay.tv_sec++;
delay.tv_usec -= 1000000;
}
gettimeofday(&now, NULL);
timeradd(&now, &delay, &target);
do {
gettimeofday(&now, NULL);
} while (timercmp(&now, &target, <));
#else
QThread::usleep(usec);
#endif
}

View File

@ -21,110 +21,31 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#define _PCAP_TRANSMITTER_H
#include "abstractport.h"
#include "pcapextra.h"
#include "pcaptxthread.h"
#include <QThread>
#include <pcap.h>
class PortTransmitter: public QThread
class PcapTransmitter
{
public:
PortTransmitter(const char *device);
~PortTransmitter();
PcapTransmitter(const char *device);
bool setRateAccuracy(AbstractPort::Accuracy accuracy);
void clearPacketList();
void loopNextPacketSet(qint64 size, qint64 repeats,
long repeatDelaySec, long repeatDelayNsec);
long repeatDelaySec, long repeatDelayNsec);
bool appendToPacketList(long sec, long usec, const uchar *packet,
int length);
void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay) {
returnToQIdx_ = loop ? 0 : -1;
loopDelay_ = secDelay*long(1e6) + nsecDelay/1000;
}
int length);
void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay);
void setHandle(pcap_t *handle);
void useExternalStats(AbstractPort::PortStats *stats);
void run();
void start();
void stop();
bool isRunning();
private:
enum State
{
kNotStarted,
kRunning,
kFinished
};
class PacketSequence
{
public:
PacketSequence() {
sendQueue_ = pcap_sendqueue_alloc(1*1024*1024);
lastPacket_ = NULL;
packets_ = 0;
bytes_ = 0;
usecDuration_ = 0;
repeatCount_ = 1;
repeatSize_ = 1;
usecDelay_ = 0;
}
~PacketSequence() {
pcap_sendqueue_destroy(sendQueue_);
}
bool hasFreeSpace(int size) {
if ((sendQueue_->len + size) <= sendQueue_->maxlen)
return true;
else
return false;
}
int appendPacket(const struct pcap_pkthdr *pktHeader,
const uchar *pktData) {
if (lastPacket_)
{
usecDuration_ += (pktHeader->ts.tv_sec
- lastPacket_->ts.tv_sec) * long(1e6);
usecDuration_ += (pktHeader->ts.tv_usec
- lastPacket_->ts.tv_usec);
}
packets_++;
bytes_ += pktHeader->caplen;
lastPacket_ = (struct pcap_pkthdr *)
(sendQueue_->buffer + sendQueue_->len);
return pcap_sendqueue_queue(sendQueue_, pktHeader, pktData);
}
pcap_send_queue *sendQueue_;
struct pcap_pkthdr *lastPacket_;
long packets_;
long bytes_;
ulong usecDuration_;
int repeatCount_;
int repeatSize_;
long usecDelay_;
};
static void udelay(unsigned long usec);
int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, long &overHead,
int sync);
QList<PacketSequence*> packetSequenceList_;
PacketSequence *currentPacketSequence_;
int repeatSequenceStart_;
quint64 repeatSize_;
quint64 packetCount_;
int returnToQIdx_;
quint64 loopDelay_;
void (*udelayFn_)(unsigned long);
bool usingInternalStats_;
AbstractPort::PortStats *stats_;
bool usingInternalHandle_;
pcap_t *handle_;
volatile bool stop_;
volatile State state_;
PcapTxThread txThread_;
};
#endif

461
server/pcaptxthread.cpp Normal file
View File

@ -0,0 +1,461 @@
/*
Copyright (C) 2010-2016 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#include "pcaptransmitter.h"
#include "timestamp.h"
PcapTxThread::PcapTxThread(const char *device)
{
char errbuf[PCAP_ERRBUF_SIZE] = "";
#ifdef Q_OS_WIN32
LARGE_INTEGER freq;
if (QueryPerformanceFrequency(&freq))
gTicksFreq = freq.QuadPart;
else
Q_ASSERT_X(false, "PcapTxThread::PcapTxThread",
"This Win32 platform does not support performance counter");
#endif
state_ = kNotStarted;
returnToQIdx_ = -1;
loopDelay_ = 0;
stop_ = false;
stats_ = new AbstractPort::PortStats;
usingInternalStats_ = true;
handle_ = pcap_open_live(device, 64 /* FIXME */, 0, 1000 /* ms */, errbuf);
if (handle_ == NULL)
goto _open_error;
usingInternalHandle_ = true;
return;
_open_error:
qDebug("%s: Error opening port %s: %s\n", __FUNCTION__, device, errbuf);
usingInternalHandle_ = false;
}
PcapTxThread::~PcapTxThread()
{
if (usingInternalStats_)
delete stats_;
if (usingInternalHandle_)
pcap_close(handle_);
}
bool PcapTxThread::setRateAccuracy(
AbstractPort::Accuracy accuracy)
{
switch (accuracy) {
case AbstractPort::kHighAccuracy:
udelayFn_ = udelay;
qWarning("%s: rate accuracy set to High - busy wait", __FUNCTION__);
break;
case AbstractPort::kLowAccuracy:
udelayFn_ = QThread::usleep;
qWarning("%s: rate accuracy set to Low - usleep", __FUNCTION__);
break;
default:
qWarning("%s: unsupported rate accuracy value %d", __FUNCTION__,
accuracy);
return false;
}
return true;
}
void PcapTxThread::clearPacketList()
{
Q_ASSERT(!isRunning());
// \todo lock for packetSequenceList
while(packetSequenceList_.size())
delete packetSequenceList_.takeFirst();
currentPacketSequence_ = NULL;
repeatSequenceStart_ = -1;
repeatSize_ = 0;
packetCount_ = 0;
returnToQIdx_ = -1;
setPacketListLoopMode(false, 0, 0);
}
void PcapTxThread::loopNextPacketSet(qint64 size, qint64 repeats,
long repeatDelaySec, long repeatDelayNsec)
{
currentPacketSequence_ = new PacketSequence;
currentPacketSequence_->repeatCount_ = repeats;
currentPacketSequence_->usecDelay_ = repeatDelaySec * long(1e6)
+ repeatDelayNsec/1000;
repeatSequenceStart_ = packetSequenceList_.size();
repeatSize_ = size;
packetCount_ = 0;
packetSequenceList_.append(currentPacketSequence_);
}
bool PcapTxThread::appendToPacketList(long sec, long nsec,
const uchar *packet, int length)
{
bool op = true;
pcap_pkthdr pktHdr;
pktHdr.caplen = pktHdr.len = length;
pktHdr.ts.tv_sec = sec;
pktHdr.ts.tv_usec = nsec/1000;
if (currentPacketSequence_ == NULL ||
!currentPacketSequence_->hasFreeSpace(2*sizeof(pcap_pkthdr)+length))
{
if (currentPacketSequence_ != NULL)
{
long usecs;
usecs = (pktHdr.ts.tv_sec
- currentPacketSequence_->lastPacket_->ts.tv_sec)
* long(1e6);
usecs += (pktHdr.ts.tv_usec
- currentPacketSequence_->lastPacket_->ts.tv_usec);
currentPacketSequence_->usecDelay_ = usecs;
}
//! \todo (LOW): calculate sendqueue size
currentPacketSequence_ = new PacketSequence;
packetSequenceList_.append(currentPacketSequence_);
// Validate that the pkt will fit inside the new currentSendQueue_
Q_ASSERT(currentPacketSequence_->hasFreeSpace(
sizeof(pcap_pkthdr) + length));
}
if (currentPacketSequence_->appendPacket(&pktHdr, (u_char*) packet) < 0)
{
op = false;
}
packetCount_++;
if (repeatSize_ > 0 && packetCount_ == repeatSize_)
{
qDebug("repeatSequenceStart_=%d, repeatSize_ = %llu",
repeatSequenceStart_, repeatSize_);
// Set the packetSequence repeatSize
Q_ASSERT(repeatSequenceStart_ >= 0);
Q_ASSERT(repeatSequenceStart_ < packetSequenceList_.size());
if (currentPacketSequence_ != packetSequenceList_[repeatSequenceStart_])
{
PacketSequence *start = packetSequenceList_[repeatSequenceStart_];
currentPacketSequence_->usecDelay_ = start->usecDelay_;
start->usecDelay_ = 0;
start->repeatSize_ =
packetSequenceList_.size() - repeatSequenceStart_;
}
repeatSize_ = 0;
// End current pktSeq and trigger a new pktSeq allocation for next pkt
currentPacketSequence_ = NULL;
}
return op;
}
void PcapTxThread::setPacketListLoopMode(
bool loop,
quint64 secDelay,
quint64 nsecDelay)
{
returnToQIdx_ = loop ? 0 : -1;
loopDelay_ = secDelay*long(1e6) + nsecDelay/1000;
}
void PcapTxThread::setHandle(pcap_t *handle)
{
if (usingInternalHandle_)
pcap_close(handle_);
handle_ = handle;
usingInternalHandle_ = false;
}
void PcapTxThread::useExternalStats(AbstractPort::PortStats *stats)
{
if (usingInternalStats_)
delete stats_;
stats_ = stats;
usingInternalStats_ = false;
}
void PcapTxThread::run()
{
//! \todo (MED) Stream Mode - continuous: define before implement
// NOTE1: We can't use pcap_sendqueue_transmit() directly even on Win32
// 'coz of 2 reasons - there's no way of stopping it before all packets
// in the sendQueue are sent out and secondly, stats are available only
// when all packets have been sent - no periodic updates
//
// NOTE2: Transmit on the Rx Handle so that we can receive it back
// on the Tx Handle to do stats
//
// NOTE3: Update pcapExtra counters - port TxStats will be updated in the
// 'stats callback' function so that both Rx and Tx stats are updated
// together
const int kSyncTransmit = 1;
int i;
long overHead = 0; // overHead should be negative or zero
qDebug("packetSequenceList_.size = %d", packetSequenceList_.size());
if (packetSequenceList_.size() <= 0)
goto _exit;
for(i = 0; i < packetSequenceList_.size(); i++) {
qDebug("sendQ[%d]: rptCnt = %d, rptSz = %d, usecDelay = %ld", i,
packetSequenceList_.at(i)->repeatCount_,
packetSequenceList_.at(i)->repeatSize_,
packetSequenceList_.at(i)->usecDelay_);
qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld", i,
packetSequenceList_.at(i)->packets_,
packetSequenceList_.at(i)->usecDuration_);
}
state_ = kRunning;
i = 0;
while (i < packetSequenceList_.size())
{
_restart:
int rptSz = packetSequenceList_.at(i)->repeatSize_;
int rptCnt = packetSequenceList_.at(i)->repeatCount_;
for (int j = 0; j < rptCnt; j++)
{
for (int k = 0; k < rptSz; k++)
{
int ret;
PacketSequence *seq = packetSequenceList_.at(i+k);
#ifdef Q_OS_WIN32
TimeStamp ovrStart, ovrEnd;
if (seq->usecDuration_ <= long(1e6)) // 1s
{
getTimeStamp(&ovrStart);
ret = pcap_sendqueue_transmit(handle_,
seq->sendQueue_, kSyncTransmit);
if (ret >= 0)
{
stats_->txPkts += seq->packets_;
stats_->txBytes += seq->bytes_;
getTimeStamp(&ovrEnd);
overHead += seq->usecDuration_
- udiffTimeStamp(&ovrStart, &ovrEnd);
Q_ASSERT(overHead <= 0);
}
if (stop_)
ret = -2;
}
else
{
ret = sendQueueTransmit(handle_, seq->sendQueue_,
overHead, kSyncTransmit);
}
#else
ret = sendQueueTransmit(handle_, seq->sendQueue_,
overHead, kSyncTransmit);
#endif
if (ret >= 0)
{
long usecs = seq->usecDelay_ + overHead;
if (usecs > 0)
{
(*udelayFn_)(usecs);
overHead = 0;
}
else
overHead = usecs;
}
else
{
qDebug("error %d in sendQueueTransmit()", ret);
qDebug("overHead = %ld", overHead);
stop_ = false;
goto _exit;
}
}
}
// Move to the next Packet Set
i += rptSz;
}
if (returnToQIdx_ >= 0)
{
long usecs = loopDelay_ + overHead;
if (usecs > 0)
{
(*udelayFn_)(usecs);
overHead = 0;
}
else
overHead = usecs;
i = returnToQIdx_;
goto _restart;
}
_exit:
state_ = kFinished;
}
void PcapTxThread::start()
{
// FIXME: return error
if (state_ == kRunning) {
qWarning("Transmit start requested but is already running!");
return;
}
state_ = kNotStarted;
QThread::start();
while (state_ == kNotStarted)
QThread::msleep(10);
}
void PcapTxThread::stop()
{
if (state_ == kRunning) {
stop_ = true;
while (state_ == kRunning)
QThread::msleep(10);
}
else {
// FIXME: return error
qWarning("Transmit stop requested but is not running!");
return;
}
}
bool PcapTxThread::isRunning()
{
return (state_ == kRunning);
}
int PcapTxThread::sendQueueTransmit(pcap_t *p,
pcap_send_queue *queue, long &overHead, int sync)
{
TimeStamp ovrStart, ovrEnd;
struct timeval ts;
struct pcap_pkthdr *hdr = (struct pcap_pkthdr*) queue->buffer;
char *end = queue->buffer + queue->len;
ts = hdr->ts;
getTimeStamp(&ovrStart);
while((char*) hdr < end)
{
uchar *pkt = (uchar*)hdr + sizeof(*hdr);
int pktLen = hdr->caplen;
if (sync)
{
long usec = (hdr->ts.tv_sec - ts.tv_sec) * 1000000 +
(hdr->ts.tv_usec - ts.tv_usec);
getTimeStamp(&ovrEnd);
overHead -= udiffTimeStamp(&ovrStart, &ovrEnd);
Q_ASSERT(overHead <= 0);
usec += overHead;
if (usec > 0)
{
(*udelayFn_)(usec);
overHead = 0;
}
else
overHead = usec;
ts = hdr->ts;
getTimeStamp(&ovrStart);
}
Q_ASSERT(pktLen > 0);
pcap_sendpacket(p, pkt, pktLen);
stats_->txPkts++;
stats_->txBytes += pktLen;
// Step to the next packet in the buffer
hdr = (struct pcap_pkthdr*) (pkt + pktLen);
pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr));
if (stop_)
{
return -2;
}
}
return 0;
}
void PcapTxThread::udelay(unsigned long usec)
{
#if defined(Q_OS_WIN32)
LARGE_INTEGER tgtTicks;
LARGE_INTEGER curTicks;
QueryPerformanceCounter(&curTicks);
tgtTicks.QuadPart = curTicks.QuadPart + (usec*gTicksFreq)/1000000;
while (curTicks.QuadPart < tgtTicks.QuadPart)
QueryPerformanceCounter(&curTicks);
#elif defined(Q_OS_LINUX)
struct timeval delay, target, now;
//qDebug("usec delay = %ld", usec);
delay.tv_sec = 0;
delay.tv_usec = usec;
while (delay.tv_usec >= 1000000)
{
delay.tv_sec++;
delay.tv_usec -= 1000000;
}
gettimeofday(&now, NULL);
timeradd(&now, &delay, &target);
do {
gettimeofday(&now, NULL);
} while (timercmp(&now, &target, <));
#else
QThread::usleep(usec);
#endif
}

84
server/pcaptxthread.h Normal file
View File

@ -0,0 +1,84 @@
/*
Copyright (C) 2010-2016 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#ifndef _PCAP_TX_THREAD_H
#define _PCAP_TX_THREAD_H
#include "abstractport.h"
#include "packetsequence.h"
#include <QThread>
#include <pcap.h>
class PcapTxThread: public QThread
{
public:
PcapTxThread(const char *device);
~PcapTxThread();
bool setRateAccuracy(AbstractPort::Accuracy accuracy);
void clearPacketList();
void loopNextPacketSet(qint64 size, qint64 repeats,
long repeatDelaySec, long repeatDelayNsec);
bool appendToPacketList(long sec, long usec, const uchar *packet,
int length);
void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay);
void setHandle(pcap_t *handle);
void useExternalStats(AbstractPort::PortStats *stats);
void run();
void start();
void stop();
bool isRunning();
private:
enum State
{
kNotStarted,
kRunning,
kFinished
};
static void udelay(unsigned long usec);
int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, long &overHead,
int sync);
QList<PacketSequence*> packetSequenceList_;
PacketSequence *currentPacketSequence_;
int repeatSequenceStart_;
quint64 repeatSize_;
quint64 packetCount_;
int returnToQIdx_;
quint64 loopDelay_;
void (*udelayFn_)(unsigned long);
bool usingInternalStats_;
AbstractPort::PortStats *stats_;
bool usingInternalHandle_;
pcap_t *handle_;
volatile bool stop_;
volatile State state_;
};
#endif

72
server/timestamp.h Normal file
View File

@ -0,0 +1,72 @@
/*
Copyright (C) 2010-2016 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#ifndef _TIMESTAMP_H
#define _TIMESTAMP_H
#include <QtGlobal>
#if defined(Q_OS_LINUX)
typedef struct timeval TimeStamp;
static void inline getTimeStamp(TimeStamp *stamp)
{
gettimeofday(stamp, NULL);
}
// Returns time diff in usecs between end and start
static long inline udiffTimeStamp(const TimeStamp *start, const TimeStamp *end)
{
struct timeval diff;
long usecs;
timersub(end, start, &diff);
usecs = diff.tv_usec;
if (diff.tv_sec)
usecs += diff.tv_sec*1e6;
return usecs;
}
#elif defined(Q_OS_WIN32)
static quint64 gTicksFreq;
typedef LARGE_INTEGER TimeStamp;
static void inline getTimeStamp(TimeStamp* stamp)
{
QueryPerformanceCounter(stamp);
}
static long inline udiffTimeStamp(const TimeStamp *start, const TimeStamp *end)
{
if (end->QuadPart >= start->QuadPart)
return (end->QuadPart - start->QuadPart)*long(1e6)/gTicksFreq;
else
{
// FIXME: incorrect! what's the max value for this counter before
// it rolls over?
return (start->QuadPart)*long(1e6)/gTicksFreq;
}
}
#else
typedef int TimeStamp;
static void inline getTimeStamp(TimeStamp*) {}
static long inline udiffTimeStamp(const TimeStamp*, const TimeStamp*) { return 0; }
#endif
#endif