diff --git a/server/drone.pro b/server/drone.pro index f7ebd74..143101a 100644 --- a/server/drone.pro +++ b/server/drone.pro @@ -41,6 +41,7 @@ SOURCES += \ abstractport.cpp \ pcapport.cpp \ pcaptransmitter.cpp \ + pcaptxthread.cpp \ bsdport.cpp \ linuxport.cpp \ winpcapport.cpp diff --git a/server/packetsequence.h b/server/packetsequence.h new file mode 100644 index 0000000..494ba2b --- /dev/null +++ b/server/packetsequence.h @@ -0,0 +1,72 @@ +/* +Copyright (C) 2010-2016 Srivats P. + +This file is part of "Ostinato" + +This is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +#ifndef _PACKET_SEQUENCE_H +#define _PACKET_SEQUENCE_H + +#include "pcapextra.h" + +class PacketSequence +{ +public: + PacketSequence() { + sendQueue_ = pcap_sendqueue_alloc(1*1024*1024); + lastPacket_ = NULL; + packets_ = 0; + bytes_ = 0; + usecDuration_ = 0; + repeatCount_ = 1; + repeatSize_ = 1; + usecDelay_ = 0; + } + ~PacketSequence() { + pcap_sendqueue_destroy(sendQueue_); + } + bool hasFreeSpace(int size) { + if ((sendQueue_->len + size) <= sendQueue_->maxlen) + return true; + else + return false; + } + int appendPacket(const struct pcap_pkthdr *pktHeader, + const uchar *pktData) { + if (lastPacket_) + { + usecDuration_ += (pktHeader->ts.tv_sec + - lastPacket_->ts.tv_sec) * long(1e6); + usecDuration_ += (pktHeader->ts.tv_usec + - lastPacket_->ts.tv_usec); + } + packets_++; + bytes_ += pktHeader->caplen; + lastPacket_ = (struct pcap_pkthdr *) + (sendQueue_->buffer + sendQueue_->len); + return pcap_sendqueue_queue(sendQueue_, pktHeader, pktData); + } + pcap_send_queue *sendQueue_; + struct pcap_pkthdr *lastPacket_; + long packets_; + long bytes_; + ulong usecDuration_; + int repeatCount_; + int repeatSize_; + long usecDelay_; +}; + +#endif diff --git a/server/pcapport.cpp b/server/pcapport.cpp index 2580323..700358f 100644 --- a/server/pcapport.cpp +++ b/server/pcapport.cpp @@ -31,7 +31,7 @@ PcapPort::PcapPort(int id, const char *device) { monitorRx_ = new PortMonitor(device, kDirectionRx, &stats_); monitorTx_ = new PortMonitor(device, kDirectionTx, &stats_); - transmitter_ = new PortTransmitter(device); + transmitter_ = new PcapTransmitter(device); capturer_ = new PortCapturer(device); emulXcvr_ = new EmulationTransceiver(device, deviceManager_); diff --git a/server/pcapport.h b/server/pcapport.h index e5a47fe..04e8269 100644 --- a/server/pcapport.h +++ b/server/pcapport.h @@ -163,7 +163,7 @@ protected: void updateNotes(); private: - PortTransmitter *transmitter_; + PcapTransmitter *transmitter_; PortCapturer *capturer_; EmulationTransceiver *emulXcvr_; diff --git a/server/pcaptransmitter.cpp b/server/pcaptransmitter.cpp index 5c72359..8c807f8 100644 --- a/server/pcaptransmitter.cpp +++ b/server/pcaptransmitter.cpp @@ -19,478 +19,66 @@ along with this program. If not, see #include "pcaptransmitter.h" -#if defined(Q_OS_LINUX) -typedef struct timeval TimeStamp; -static void inline getTimeStamp(TimeStamp *stamp) +PcapTransmitter::PcapTransmitter(const char *device) + : txThread_(device) { - gettimeofday(stamp, NULL); } -// Returns time diff in usecs between end and start -static long inline udiffTimeStamp(const TimeStamp *start, const TimeStamp *end) -{ - struct timeval diff; - long usecs; - - timersub(end, start, &diff); - - usecs = diff.tv_usec; - if (diff.tv_sec) - usecs += diff.tv_sec*1e6; - - return usecs; -} -#elif defined(Q_OS_WIN32) -static quint64 gTicksFreq; -typedef LARGE_INTEGER TimeStamp; -static void inline getTimeStamp(TimeStamp* stamp) -{ - QueryPerformanceCounter(stamp); -} - -static long inline udiffTimeStamp(const TimeStamp *start, const TimeStamp *end) -{ - if (end->QuadPart >= start->QuadPart) - return (end->QuadPart - start->QuadPart)*long(1e6)/gTicksFreq; - else - { - // FIXME: incorrect! what's the max value for this counter before - // it rolls over? - return (start->QuadPart)*long(1e6)/gTicksFreq; - } -} -#else -typedef int TimeStamp; -static void inline getTimeStamp(TimeStamp*) {} -static long inline udiffTimeStamp(const TimeStamp*, const TimeStamp*) { return 0; } -#endif - -PortTransmitter::PortTransmitter(const char *device) -{ - char errbuf[PCAP_ERRBUF_SIZE] = ""; - -#ifdef Q_OS_WIN32 - LARGE_INTEGER freq; - if (QueryPerformanceFrequency(&freq)) - gTicksFreq = freq.QuadPart; - else - Q_ASSERT_X(false, "PortTransmitter::PortTransmitter", - "This Win32 platform does not support performance counter"); -#endif - state_ = kNotStarted; - returnToQIdx_ = -1; - loopDelay_ = 0; - stop_ = false; - stats_ = new AbstractPort::PortStats; - usingInternalStats_ = true; - handle_ = pcap_open_live(device, 64 /* FIXME */, 0, 1000 /* ms */, errbuf); - - if (handle_ == NULL) - goto _open_error; - - usingInternalHandle_ = true; - - return; - -_open_error: - qDebug("%s: Error opening port %s: %s\n", __FUNCTION__, device, errbuf); - usingInternalHandle_ = false; -} - -PortTransmitter::~PortTransmitter() -{ - if (usingInternalStats_) - delete stats_; - if (usingInternalHandle_) - pcap_close(handle_); -} - -bool PortTransmitter::setRateAccuracy( +bool PcapTransmitter::setRateAccuracy( AbstractPort::Accuracy accuracy) { - switch (accuracy) { - case AbstractPort::kHighAccuracy: - udelayFn_ = udelay; - qWarning("%s: rate accuracy set to High - busy wait", __FUNCTION__); - break; - case AbstractPort::kLowAccuracy: - udelayFn_ = QThread::usleep; - qWarning("%s: rate accuracy set to Low - usleep", __FUNCTION__); - break; - default: - qWarning("%s: unsupported rate accuracy value %d", __FUNCTION__, - accuracy); - return false; - } - return true; + return txThread_.setRateAccuracy(accuracy); } -void PortTransmitter::clearPacketList() +void PcapTransmitter::clearPacketList() { - Q_ASSERT(!isRunning()); - // \todo lock for packetSequenceList - while(packetSequenceList_.size()) - delete packetSequenceList_.takeFirst(); - - currentPacketSequence_ = NULL; - repeatSequenceStart_ = -1; - repeatSize_ = 0; - packetCount_ = 0; - - returnToQIdx_ = -1; - - setPacketListLoopMode(false, 0, 0); + txThread_.clearPacketList(); } -void PortTransmitter::loopNextPacketSet(qint64 size, qint64 repeats, - long repeatDelaySec, long repeatDelayNsec) +void PcapTransmitter::loopNextPacketSet( + qint64 size, + qint64 repeats, + long repeatDelaySec, + long repeatDelayNsec) { - currentPacketSequence_ = new PacketSequence; - currentPacketSequence_->repeatCount_ = repeats; - currentPacketSequence_->usecDelay_ = repeatDelaySec * long(1e6) - + repeatDelayNsec/1000; - - repeatSequenceStart_ = packetSequenceList_.size(); - repeatSize_ = size; - packetCount_ = 0; - - packetSequenceList_.append(currentPacketSequence_); + txThread_.loopNextPacketSet(size, repeats, repeatDelaySec, repeatDelayNsec); } -bool PortTransmitter::appendToPacketList(long sec, long nsec, +bool PcapTransmitter::appendToPacketList(long sec, long nsec, const uchar *packet, int length) { - bool op = true; - pcap_pkthdr pktHdr; - - pktHdr.caplen = pktHdr.len = length; - pktHdr.ts.tv_sec = sec; - pktHdr.ts.tv_usec = nsec/1000; - - if (currentPacketSequence_ == NULL || - !currentPacketSequence_->hasFreeSpace(2*sizeof(pcap_pkthdr)+length)) - { - if (currentPacketSequence_ != NULL) - { - long usecs; - - usecs = (pktHdr.ts.tv_sec - - currentPacketSequence_->lastPacket_->ts.tv_sec) - * long(1e6); - usecs += (pktHdr.ts.tv_usec - - currentPacketSequence_->lastPacket_->ts.tv_usec); - currentPacketSequence_->usecDelay_ = usecs; - } - - //! \todo (LOW): calculate sendqueue size - currentPacketSequence_ = new PacketSequence; - - packetSequenceList_.append(currentPacketSequence_); - - // Validate that the pkt will fit inside the new currentSendQueue_ - Q_ASSERT(currentPacketSequence_->hasFreeSpace( - sizeof(pcap_pkthdr) + length)); - } - - if (currentPacketSequence_->appendPacket(&pktHdr, (u_char*) packet) < 0) - { - op = false; - } - - packetCount_++; - if (repeatSize_ > 0 && packetCount_ == repeatSize_) - { - qDebug("repeatSequenceStart_=%d, repeatSize_ = %llu", - repeatSequenceStart_, repeatSize_); - - // Set the packetSequence repeatSize - Q_ASSERT(repeatSequenceStart_ >= 0); - Q_ASSERT(repeatSequenceStart_ < packetSequenceList_.size()); - - if (currentPacketSequence_ != packetSequenceList_[repeatSequenceStart_]) - { - PacketSequence *start = packetSequenceList_[repeatSequenceStart_]; - - currentPacketSequence_->usecDelay_ = start->usecDelay_; - start->usecDelay_ = 0; - start->repeatSize_ = - packetSequenceList_.size() - repeatSequenceStart_; - } - - repeatSize_ = 0; - - // End current pktSeq and trigger a new pktSeq allocation for next pkt - currentPacketSequence_ = NULL; - } - - return op; + return txThread_.appendToPacketList(sec, nsec, packet, length); } -void PortTransmitter::setHandle(pcap_t *handle) +void PcapTransmitter::setHandle(pcap_t *handle) { - if (usingInternalHandle_) - pcap_close(handle_); - handle_ = handle; - usingInternalHandle_ = false; + txThread_.setHandle(handle); } -void PortTransmitter::useExternalStats(AbstractPort::PortStats *stats) +void PcapTransmitter::setPacketListLoopMode( + bool loop, + quint64 secDelay, + quint64 nsecDelay) { - if (usingInternalStats_) - delete stats_; - stats_ = stats; - usingInternalStats_ = false; + txThread_.setPacketListLoopMode(loop, secDelay, nsecDelay); } -void PortTransmitter::run() +void PcapTransmitter::useExternalStats(AbstractPort::PortStats *stats) { - //! \todo (MED) Stream Mode - continuous: define before implement - - // NOTE1: We can't use pcap_sendqueue_transmit() directly even on Win32 - // 'coz of 2 reasons - there's no way of stopping it before all packets - // in the sendQueue are sent out and secondly, stats are available only - // when all packets have been sent - no periodic updates - // - // NOTE2: Transmit on the Rx Handle so that we can receive it back - // on the Tx Handle to do stats - // - // NOTE3: Update pcapExtra counters - port TxStats will be updated in the - // 'stats callback' function so that both Rx and Tx stats are updated - // together - - const int kSyncTransmit = 1; - int i; - long overHead = 0; // overHead should be negative or zero - - qDebug("packetSequenceList_.size = %d", packetSequenceList_.size()); - if (packetSequenceList_.size() <= 0) - goto _exit; - - for(i = 0; i < packetSequenceList_.size(); i++) { - qDebug("sendQ[%d]: rptCnt = %d, rptSz = %d, usecDelay = %ld", i, - packetSequenceList_.at(i)->repeatCount_, - packetSequenceList_.at(i)->repeatSize_, - packetSequenceList_.at(i)->usecDelay_); - qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld", i, - packetSequenceList_.at(i)->packets_, - packetSequenceList_.at(i)->usecDuration_); - } - - state_ = kRunning; - i = 0; - while (i < packetSequenceList_.size()) - { - -_restart: - int rptSz = packetSequenceList_.at(i)->repeatSize_; - int rptCnt = packetSequenceList_.at(i)->repeatCount_; - - for (int j = 0; j < rptCnt; j++) - { - for (int k = 0; k < rptSz; k++) - { - int ret; - PacketSequence *seq = packetSequenceList_.at(i+k); -#ifdef Q_OS_WIN32 - TimeStamp ovrStart, ovrEnd; - - if (seq->usecDuration_ <= long(1e6)) // 1s - { - getTimeStamp(&ovrStart); - ret = pcap_sendqueue_transmit(handle_, - seq->sendQueue_, kSyncTransmit); - if (ret >= 0) - { - stats_->txPkts += seq->packets_; - stats_->txBytes += seq->bytes_; - - getTimeStamp(&ovrEnd); - overHead += seq->usecDuration_ - - udiffTimeStamp(&ovrStart, &ovrEnd); - Q_ASSERT(overHead <= 0); - } - if (stop_) - ret = -2; - } - else - { - ret = sendQueueTransmit(handle_, seq->sendQueue_, - overHead, kSyncTransmit); - } -#else - ret = sendQueueTransmit(handle_, seq->sendQueue_, - overHead, kSyncTransmit); -#endif - - if (ret >= 0) - { - long usecs = seq->usecDelay_ + overHead; - if (usecs > 0) - { - (*udelayFn_)(usecs); - overHead = 0; - } - else - overHead = usecs; - } - else - { - qDebug("error %d in sendQueueTransmit()", ret); - qDebug("overHead = %ld", overHead); - stop_ = false; - goto _exit; - } - } - } - - // Move to the next Packet Set - i += rptSz; - } - - if (returnToQIdx_ >= 0) - { - long usecs = loopDelay_ + overHead; - - if (usecs > 0) - { - (*udelayFn_)(usecs); - overHead = 0; - } - else - overHead = usecs; - - i = returnToQIdx_; - goto _restart; - } - -_exit: - state_ = kFinished; + txThread_.useExternalStats(stats); } -void PortTransmitter::start() +void PcapTransmitter::start() { - // FIXME: return error - if (state_ == kRunning) { - qWarning("Transmit start requested but is already running!"); - return; - } - - state_ = kNotStarted; - QThread::start(); - - while (state_ == kNotStarted) - QThread::msleep(10); + txThread_.start(); } -void PortTransmitter::stop() +void PcapTransmitter::stop() { - if (state_ == kRunning) { - stop_ = true; - while (state_ == kRunning) - QThread::msleep(10); - } - else { - // FIXME: return error - qWarning("Transmit stop requested but is not running!"); - return; - } + txThread_.stop(); } -bool PortTransmitter::isRunning() +bool PcapTransmitter::isRunning() { - return (state_ == kRunning); + return txThread_.isRunning(); } - -int PortTransmitter::sendQueueTransmit(pcap_t *p, - pcap_send_queue *queue, long &overHead, int sync) -{ - TimeStamp ovrStart, ovrEnd; - struct timeval ts; - struct pcap_pkthdr *hdr = (struct pcap_pkthdr*) queue->buffer; - char *end = queue->buffer + queue->len; - - ts = hdr->ts; - - getTimeStamp(&ovrStart); - while((char*) hdr < end) - { - uchar *pkt = (uchar*)hdr + sizeof(*hdr); - int pktLen = hdr->caplen; - - if (sync) - { - long usec = (hdr->ts.tv_sec - ts.tv_sec) * 1000000 + - (hdr->ts.tv_usec - ts.tv_usec); - - getTimeStamp(&ovrEnd); - - overHead -= udiffTimeStamp(&ovrStart, &ovrEnd); - Q_ASSERT(overHead <= 0); - usec += overHead; - if (usec > 0) - { - (*udelayFn_)(usec); - overHead = 0; - } - else - overHead = usec; - - ts = hdr->ts; - getTimeStamp(&ovrStart); - } - - Q_ASSERT(pktLen > 0); - - pcap_sendpacket(p, pkt, pktLen); - stats_->txPkts++; - stats_->txBytes += pktLen; - - // Step to the next packet in the buffer - hdr = (struct pcap_pkthdr*) (pkt + pktLen); - pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr)); - - if (stop_) - { - return -2; - } - } - - return 0; -} - -void PortTransmitter::udelay(unsigned long usec) -{ -#if defined(Q_OS_WIN32) - LARGE_INTEGER tgtTicks; - LARGE_INTEGER curTicks; - - QueryPerformanceCounter(&curTicks); - tgtTicks.QuadPart = curTicks.QuadPart + (usec*gTicksFreq)/1000000; - - while (curTicks.QuadPart < tgtTicks.QuadPart) - QueryPerformanceCounter(&curTicks); -#elif defined(Q_OS_LINUX) - struct timeval delay, target, now; - - //qDebug("usec delay = %ld", usec); - - delay.tv_sec = 0; - delay.tv_usec = usec; - - while (delay.tv_usec >= 1000000) - { - delay.tv_sec++; - delay.tv_usec -= 1000000; - } - - gettimeofday(&now, NULL); - timeradd(&now, &delay, &target); - - do { - gettimeofday(&now, NULL); - } while (timercmp(&now, &target, <)); -#else - QThread::usleep(usec); -#endif -} - diff --git a/server/pcaptransmitter.h b/server/pcaptransmitter.h index d5d4c28..3327b24 100644 --- a/server/pcaptransmitter.h +++ b/server/pcaptransmitter.h @@ -21,110 +21,31 @@ along with this program. If not, see #define _PCAP_TRANSMITTER_H #include "abstractport.h" -#include "pcapextra.h" +#include "pcaptxthread.h" -#include -#include - -class PortTransmitter: public QThread +class PcapTransmitter { public: - PortTransmitter(const char *device); - ~PortTransmitter(); + PcapTransmitter(const char *device); bool setRateAccuracy(AbstractPort::Accuracy accuracy); void clearPacketList(); void loopNextPacketSet(qint64 size, qint64 repeats, - long repeatDelaySec, long repeatDelayNsec); + long repeatDelaySec, long repeatDelayNsec); bool appendToPacketList(long sec, long usec, const uchar *packet, - int length); - void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay) { - returnToQIdx_ = loop ? 0 : -1; - loopDelay_ = secDelay*long(1e6) + nsecDelay/1000; - } + int length); + void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay); + void setHandle(pcap_t *handle); void useExternalStats(AbstractPort::PortStats *stats); - void run(); + void start(); void stop(); bool isRunning(); + private: - enum State - { - kNotStarted, - kRunning, - kFinished - }; - - class PacketSequence - { - public: - PacketSequence() { - sendQueue_ = pcap_sendqueue_alloc(1*1024*1024); - lastPacket_ = NULL; - packets_ = 0; - bytes_ = 0; - usecDuration_ = 0; - repeatCount_ = 1; - repeatSize_ = 1; - usecDelay_ = 0; - } - ~PacketSequence() { - pcap_sendqueue_destroy(sendQueue_); - } - bool hasFreeSpace(int size) { - if ((sendQueue_->len + size) <= sendQueue_->maxlen) - return true; - else - return false; - } - int appendPacket(const struct pcap_pkthdr *pktHeader, - const uchar *pktData) { - if (lastPacket_) - { - usecDuration_ += (pktHeader->ts.tv_sec - - lastPacket_->ts.tv_sec) * long(1e6); - usecDuration_ += (pktHeader->ts.tv_usec - - lastPacket_->ts.tv_usec); - } - packets_++; - bytes_ += pktHeader->caplen; - lastPacket_ = (struct pcap_pkthdr *) - (sendQueue_->buffer + sendQueue_->len); - return pcap_sendqueue_queue(sendQueue_, pktHeader, pktData); - } - pcap_send_queue *sendQueue_; - struct pcap_pkthdr *lastPacket_; - long packets_; - long bytes_; - ulong usecDuration_; - int repeatCount_; - int repeatSize_; - long usecDelay_; - }; - - static void udelay(unsigned long usec); - int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, long &overHead, - int sync); - - QList packetSequenceList_; - PacketSequence *currentPacketSequence_; - int repeatSequenceStart_; - quint64 repeatSize_; - quint64 packetCount_; - - int returnToQIdx_; - quint64 loopDelay_; - - void (*udelayFn_)(unsigned long); - - bool usingInternalStats_; - AbstractPort::PortStats *stats_; - bool usingInternalHandle_; - pcap_t *handle_; - volatile bool stop_; - volatile State state_; + PcapTxThread txThread_; }; #endif diff --git a/server/pcaptxthread.cpp b/server/pcaptxthread.cpp new file mode 100644 index 0000000..86076c6 --- /dev/null +++ b/server/pcaptxthread.cpp @@ -0,0 +1,461 @@ +/* +Copyright (C) 2010-2016 Srivats P. + +This file is part of "Ostinato" + +This is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +#include "pcaptransmitter.h" + +#include "timestamp.h" + +PcapTxThread::PcapTxThread(const char *device) +{ + char errbuf[PCAP_ERRBUF_SIZE] = ""; + +#ifdef Q_OS_WIN32 + LARGE_INTEGER freq; + if (QueryPerformanceFrequency(&freq)) + gTicksFreq = freq.QuadPart; + else + Q_ASSERT_X(false, "PcapTxThread::PcapTxThread", + "This Win32 platform does not support performance counter"); +#endif + state_ = kNotStarted; + returnToQIdx_ = -1; + loopDelay_ = 0; + stop_ = false; + stats_ = new AbstractPort::PortStats; + usingInternalStats_ = true; + handle_ = pcap_open_live(device, 64 /* FIXME */, 0, 1000 /* ms */, errbuf); + + if (handle_ == NULL) + goto _open_error; + + usingInternalHandle_ = true; + + return; + +_open_error: + qDebug("%s: Error opening port %s: %s\n", __FUNCTION__, device, errbuf); + usingInternalHandle_ = false; +} + +PcapTxThread::~PcapTxThread() +{ + if (usingInternalStats_) + delete stats_; + if (usingInternalHandle_) + pcap_close(handle_); +} + +bool PcapTxThread::setRateAccuracy( + AbstractPort::Accuracy accuracy) +{ + switch (accuracy) { + case AbstractPort::kHighAccuracy: + udelayFn_ = udelay; + qWarning("%s: rate accuracy set to High - busy wait", __FUNCTION__); + break; + case AbstractPort::kLowAccuracy: + udelayFn_ = QThread::usleep; + qWarning("%s: rate accuracy set to Low - usleep", __FUNCTION__); + break; + default: + qWarning("%s: unsupported rate accuracy value %d", __FUNCTION__, + accuracy); + return false; + } + return true; +} + +void PcapTxThread::clearPacketList() +{ + Q_ASSERT(!isRunning()); + // \todo lock for packetSequenceList + while(packetSequenceList_.size()) + delete packetSequenceList_.takeFirst(); + + currentPacketSequence_ = NULL; + repeatSequenceStart_ = -1; + repeatSize_ = 0; + packetCount_ = 0; + + returnToQIdx_ = -1; + + setPacketListLoopMode(false, 0, 0); +} + +void PcapTxThread::loopNextPacketSet(qint64 size, qint64 repeats, + long repeatDelaySec, long repeatDelayNsec) +{ + currentPacketSequence_ = new PacketSequence; + currentPacketSequence_->repeatCount_ = repeats; + currentPacketSequence_->usecDelay_ = repeatDelaySec * long(1e6) + + repeatDelayNsec/1000; + + repeatSequenceStart_ = packetSequenceList_.size(); + repeatSize_ = size; + packetCount_ = 0; + + packetSequenceList_.append(currentPacketSequence_); +} + +bool PcapTxThread::appendToPacketList(long sec, long nsec, + const uchar *packet, int length) +{ + bool op = true; + pcap_pkthdr pktHdr; + + pktHdr.caplen = pktHdr.len = length; + pktHdr.ts.tv_sec = sec; + pktHdr.ts.tv_usec = nsec/1000; + + if (currentPacketSequence_ == NULL || + !currentPacketSequence_->hasFreeSpace(2*sizeof(pcap_pkthdr)+length)) + { + if (currentPacketSequence_ != NULL) + { + long usecs; + + usecs = (pktHdr.ts.tv_sec + - currentPacketSequence_->lastPacket_->ts.tv_sec) + * long(1e6); + usecs += (pktHdr.ts.tv_usec + - currentPacketSequence_->lastPacket_->ts.tv_usec); + currentPacketSequence_->usecDelay_ = usecs; + } + + //! \todo (LOW): calculate sendqueue size + currentPacketSequence_ = new PacketSequence; + + packetSequenceList_.append(currentPacketSequence_); + + // Validate that the pkt will fit inside the new currentSendQueue_ + Q_ASSERT(currentPacketSequence_->hasFreeSpace( + sizeof(pcap_pkthdr) + length)); + } + + if (currentPacketSequence_->appendPacket(&pktHdr, (u_char*) packet) < 0) + { + op = false; + } + + packetCount_++; + if (repeatSize_ > 0 && packetCount_ == repeatSize_) + { + qDebug("repeatSequenceStart_=%d, repeatSize_ = %llu", + repeatSequenceStart_, repeatSize_); + + // Set the packetSequence repeatSize + Q_ASSERT(repeatSequenceStart_ >= 0); + Q_ASSERT(repeatSequenceStart_ < packetSequenceList_.size()); + + if (currentPacketSequence_ != packetSequenceList_[repeatSequenceStart_]) + { + PacketSequence *start = packetSequenceList_[repeatSequenceStart_]; + + currentPacketSequence_->usecDelay_ = start->usecDelay_; + start->usecDelay_ = 0; + start->repeatSize_ = + packetSequenceList_.size() - repeatSequenceStart_; + } + + repeatSize_ = 0; + + // End current pktSeq and trigger a new pktSeq allocation for next pkt + currentPacketSequence_ = NULL; + } + + return op; +} + +void PcapTxThread::setPacketListLoopMode( + bool loop, + quint64 secDelay, + quint64 nsecDelay) +{ + returnToQIdx_ = loop ? 0 : -1; + loopDelay_ = secDelay*long(1e6) + nsecDelay/1000; +} + +void PcapTxThread::setHandle(pcap_t *handle) +{ + if (usingInternalHandle_) + pcap_close(handle_); + handle_ = handle; + usingInternalHandle_ = false; +} + +void PcapTxThread::useExternalStats(AbstractPort::PortStats *stats) +{ + if (usingInternalStats_) + delete stats_; + stats_ = stats; + usingInternalStats_ = false; +} + +void PcapTxThread::run() +{ + //! \todo (MED) Stream Mode - continuous: define before implement + + // NOTE1: We can't use pcap_sendqueue_transmit() directly even on Win32 + // 'coz of 2 reasons - there's no way of stopping it before all packets + // in the sendQueue are sent out and secondly, stats are available only + // when all packets have been sent - no periodic updates + // + // NOTE2: Transmit on the Rx Handle so that we can receive it back + // on the Tx Handle to do stats + // + // NOTE3: Update pcapExtra counters - port TxStats will be updated in the + // 'stats callback' function so that both Rx and Tx stats are updated + // together + + const int kSyncTransmit = 1; + int i; + long overHead = 0; // overHead should be negative or zero + + qDebug("packetSequenceList_.size = %d", packetSequenceList_.size()); + if (packetSequenceList_.size() <= 0) + goto _exit; + + for(i = 0; i < packetSequenceList_.size(); i++) { + qDebug("sendQ[%d]: rptCnt = %d, rptSz = %d, usecDelay = %ld", i, + packetSequenceList_.at(i)->repeatCount_, + packetSequenceList_.at(i)->repeatSize_, + packetSequenceList_.at(i)->usecDelay_); + qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld", i, + packetSequenceList_.at(i)->packets_, + packetSequenceList_.at(i)->usecDuration_); + } + + state_ = kRunning; + i = 0; + while (i < packetSequenceList_.size()) + { + +_restart: + int rptSz = packetSequenceList_.at(i)->repeatSize_; + int rptCnt = packetSequenceList_.at(i)->repeatCount_; + + for (int j = 0; j < rptCnt; j++) + { + for (int k = 0; k < rptSz; k++) + { + int ret; + PacketSequence *seq = packetSequenceList_.at(i+k); +#ifdef Q_OS_WIN32 + TimeStamp ovrStart, ovrEnd; + + if (seq->usecDuration_ <= long(1e6)) // 1s + { + getTimeStamp(&ovrStart); + ret = pcap_sendqueue_transmit(handle_, + seq->sendQueue_, kSyncTransmit); + if (ret >= 0) + { + stats_->txPkts += seq->packets_; + stats_->txBytes += seq->bytes_; + + getTimeStamp(&ovrEnd); + overHead += seq->usecDuration_ + - udiffTimeStamp(&ovrStart, &ovrEnd); + Q_ASSERT(overHead <= 0); + } + if (stop_) + ret = -2; + } + else + { + ret = sendQueueTransmit(handle_, seq->sendQueue_, + overHead, kSyncTransmit); + } +#else + ret = sendQueueTransmit(handle_, seq->sendQueue_, + overHead, kSyncTransmit); +#endif + + if (ret >= 0) + { + long usecs = seq->usecDelay_ + overHead; + if (usecs > 0) + { + (*udelayFn_)(usecs); + overHead = 0; + } + else + overHead = usecs; + } + else + { + qDebug("error %d in sendQueueTransmit()", ret); + qDebug("overHead = %ld", overHead); + stop_ = false; + goto _exit; + } + } + } + + // Move to the next Packet Set + i += rptSz; + } + + if (returnToQIdx_ >= 0) + { + long usecs = loopDelay_ + overHead; + + if (usecs > 0) + { + (*udelayFn_)(usecs); + overHead = 0; + } + else + overHead = usecs; + + i = returnToQIdx_; + goto _restart; + } + +_exit: + state_ = kFinished; +} + +void PcapTxThread::start() +{ + // FIXME: return error + if (state_ == kRunning) { + qWarning("Transmit start requested but is already running!"); + return; + } + + state_ = kNotStarted; + QThread::start(); + + while (state_ == kNotStarted) + QThread::msleep(10); +} + +void PcapTxThread::stop() +{ + if (state_ == kRunning) { + stop_ = true; + while (state_ == kRunning) + QThread::msleep(10); + } + else { + // FIXME: return error + qWarning("Transmit stop requested but is not running!"); + return; + } +} + +bool PcapTxThread::isRunning() +{ + return (state_ == kRunning); +} + +int PcapTxThread::sendQueueTransmit(pcap_t *p, + pcap_send_queue *queue, long &overHead, int sync) +{ + TimeStamp ovrStart, ovrEnd; + struct timeval ts; + struct pcap_pkthdr *hdr = (struct pcap_pkthdr*) queue->buffer; + char *end = queue->buffer + queue->len; + + ts = hdr->ts; + + getTimeStamp(&ovrStart); + while((char*) hdr < end) + { + uchar *pkt = (uchar*)hdr + sizeof(*hdr); + int pktLen = hdr->caplen; + + if (sync) + { + long usec = (hdr->ts.tv_sec - ts.tv_sec) * 1000000 + + (hdr->ts.tv_usec - ts.tv_usec); + + getTimeStamp(&ovrEnd); + + overHead -= udiffTimeStamp(&ovrStart, &ovrEnd); + Q_ASSERT(overHead <= 0); + usec += overHead; + if (usec > 0) + { + (*udelayFn_)(usec); + overHead = 0; + } + else + overHead = usec; + + ts = hdr->ts; + getTimeStamp(&ovrStart); + } + + Q_ASSERT(pktLen > 0); + + pcap_sendpacket(p, pkt, pktLen); + stats_->txPkts++; + stats_->txBytes += pktLen; + + // Step to the next packet in the buffer + hdr = (struct pcap_pkthdr*) (pkt + pktLen); + pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr)); + + if (stop_) + { + return -2; + } + } + + return 0; +} + +void PcapTxThread::udelay(unsigned long usec) +{ +#if defined(Q_OS_WIN32) + LARGE_INTEGER tgtTicks; + LARGE_INTEGER curTicks; + + QueryPerformanceCounter(&curTicks); + tgtTicks.QuadPart = curTicks.QuadPart + (usec*gTicksFreq)/1000000; + + while (curTicks.QuadPart < tgtTicks.QuadPart) + QueryPerformanceCounter(&curTicks); +#elif defined(Q_OS_LINUX) + struct timeval delay, target, now; + + //qDebug("usec delay = %ld", usec); + + delay.tv_sec = 0; + delay.tv_usec = usec; + + while (delay.tv_usec >= 1000000) + { + delay.tv_sec++; + delay.tv_usec -= 1000000; + } + + gettimeofday(&now, NULL); + timeradd(&now, &delay, &target); + + do { + gettimeofday(&now, NULL); + } while (timercmp(&now, &target, <)); +#else + QThread::usleep(usec); +#endif +} + diff --git a/server/pcaptxthread.h b/server/pcaptxthread.h new file mode 100644 index 0000000..b9064b1 --- /dev/null +++ b/server/pcaptxthread.h @@ -0,0 +1,84 @@ +/* +Copyright (C) 2010-2016 Srivats P. + +This file is part of "Ostinato" + +This is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +#ifndef _PCAP_TX_THREAD_H +#define _PCAP_TX_THREAD_H + +#include "abstractport.h" +#include "packetsequence.h" + +#include +#include + +class PcapTxThread: public QThread +{ +public: + PcapTxThread(const char *device); + ~PcapTxThread(); + + bool setRateAccuracy(AbstractPort::Accuracy accuracy); + + void clearPacketList(); + void loopNextPacketSet(qint64 size, qint64 repeats, + long repeatDelaySec, long repeatDelayNsec); + bool appendToPacketList(long sec, long usec, const uchar *packet, + int length); + void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay); + + void setHandle(pcap_t *handle); + void useExternalStats(AbstractPort::PortStats *stats); + + void run(); + + void start(); + void stop(); + bool isRunning(); +private: + enum State + { + kNotStarted, + kRunning, + kFinished + }; + + static void udelay(unsigned long usec); + int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, long &overHead, + int sync); + + QList packetSequenceList_; + PacketSequence *currentPacketSequence_; + int repeatSequenceStart_; + quint64 repeatSize_; + quint64 packetCount_; + + int returnToQIdx_; + quint64 loopDelay_; + + void (*udelayFn_)(unsigned long); + + bool usingInternalStats_; + AbstractPort::PortStats *stats_; + bool usingInternalHandle_; + pcap_t *handle_; + volatile bool stop_; + volatile State state_; +}; + +#endif + diff --git a/server/timestamp.h b/server/timestamp.h new file mode 100644 index 0000000..b6a8bc7 --- /dev/null +++ b/server/timestamp.h @@ -0,0 +1,72 @@ +/* +Copyright (C) 2010-2016 Srivats P. + +This file is part of "Ostinato" + +This is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +#ifndef _TIMESTAMP_H +#define _TIMESTAMP_H + +#include + +#if defined(Q_OS_LINUX) +typedef struct timeval TimeStamp; +static void inline getTimeStamp(TimeStamp *stamp) +{ + gettimeofday(stamp, NULL); +} + +// Returns time diff in usecs between end and start +static long inline udiffTimeStamp(const TimeStamp *start, const TimeStamp *end) +{ + struct timeval diff; + long usecs; + + timersub(end, start, &diff); + + usecs = diff.tv_usec; + if (diff.tv_sec) + usecs += diff.tv_sec*1e6; + + return usecs; +} +#elif defined(Q_OS_WIN32) +static quint64 gTicksFreq; +typedef LARGE_INTEGER TimeStamp; +static void inline getTimeStamp(TimeStamp* stamp) +{ + QueryPerformanceCounter(stamp); +} + +static long inline udiffTimeStamp(const TimeStamp *start, const TimeStamp *end) +{ + if (end->QuadPart >= start->QuadPart) + return (end->QuadPart - start->QuadPart)*long(1e6)/gTicksFreq; + else + { + // FIXME: incorrect! what's the max value for this counter before + // it rolls over? + return (start->QuadPart)*long(1e6)/gTicksFreq; + } +} +#else +typedef int TimeStamp; +static void inline getTimeStamp(TimeStamp*) {} +static long inline udiffTimeStamp(const TimeStamp*, const TimeStamp*) { return 0; } +#endif + +#endif +