From 955be7108236b0d4788dfc2c26fe1d2070de2f94 Mon Sep 17 00:00:00 2001 From: "Srivats P." Date: Sun, 19 Jun 2011 16:26:35 +0530 Subject: [PATCH] Changes to improve pps performance for Linux. Changed implementation of stats collection for Linux - pps/Bps stats are now supported Fixes Issue 26 --- server/abstractport.h | 1 + server/drone.pro | 1 + server/linuxport.cpp | 275 +++++++++++++++++++++++++++++++++++++++++ server/linuxport.h | 56 +++++++++ server/pcapport.cpp | 96 +++++++++++--- server/pcapport.h | 1 + server/portmanager.cpp | 9 +- 7 files changed, 417 insertions(+), 22 deletions(-) create mode 100644 server/linuxport.cpp create mode 100644 server/linuxport.h diff --git a/server/abstractport.h b/server/abstractport.h index c4f798d..954014f 100644 --- a/server/abstractport.h +++ b/server/abstractport.h @@ -52,6 +52,7 @@ public: virtual void init(); int id() { return data_.port_id().id(); } + const char* name() { return data_.name().c_str(); } void protoDataCopyInto(OstProto::Port *port) { port->CopyFrom(data_); } bool modify(const OstProto::Port &port); diff --git a/server/drone.pro b/server/drone.pro index 160973b..a317414 100644 --- a/server/drone.pro +++ b/server/drone.pro @@ -35,6 +35,7 @@ SOURCES += \ portmanager.cpp \ abstractport.cpp \ pcapport.cpp \ + linuxport.cpp \ winpcapport.cpp SOURCES += myservice.cpp SOURCES += pcapextra.cpp diff --git a/server/linuxport.cpp b/server/linuxport.cpp new file mode 100644 index 0000000..8d52cc9 --- /dev/null +++ b/server/linuxport.cpp @@ -0,0 +1,275 @@ +/* +Copyright (C) 2011 Srivats P. + +This file is part of "Ostinato" + +This is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +#include "linuxport.h" + +#ifdef Q_OS_LINUX + +#include + +#include +#include +#include + +QList LinuxPort::allPorts_; +LinuxPort::StatsMonitor *LinuxPort::monitor_; + +LinuxPort::LinuxPort(int id, const char *device) + : PcapPort(id, device) +{ + // We don't need per port Rx/Tx monitors for Linux + delete monitorRx_; + delete monitorTx_; + + // We have one monitor for both Rx/Tx of all ports + if (!monitor_) + monitor_ = new StatsMonitor(); + + data_.set_is_exclusive_control(hasExclusiveControl()); + + qDebug("adding dev to all ports list <%s>", device); + allPorts_.append(this); +} + +LinuxPort::~LinuxPort() +{ +} + +void LinuxPort::init() +{ + // TODO: Update Notes with Promisc/Non-Promisc + + if (!monitor_->isRunning()) + monitor_->start(); +} + +OstProto::LinkState LinuxPort::linkState() +{ + // TODO + return linkState_; +} + +bool LinuxPort::hasExclusiveControl() +{ + // TODO + return false; +} + +bool LinuxPort::setExclusiveControl(bool /*exclusive*/) +{ + // TODO + return false; +} + +LinuxPort::StatsMonitor::StatsMonitor() + : QThread() +{ +} + +void LinuxPort::StatsMonitor::run() +{ + PortStats **portStats; + int fd; + QByteArray buf; + int len; + char *p, *end; + int count, index; + const char* fmtopt[] = { + "%llu%llu%u%u%u%u%u%u%llu%llu%u%u%u%u%u%u\n", + "%llu%llu%u%u%u%u%n%n%llu%llu%u%u%u%u%u%n\n", + }; + const char *fmt; + + // + // We first setup stuff before we start polling for stats + // + fd = open("/proc/net/dev", O_RDONLY); + if (fd < 0) + { + qWarning("Unable to open /proc/net/dev - no stats will be available"); + return; + } + + buf.fill('\0', 8192); + len = read(fd, (void*) buf.data(), buf.size()); + if (len < 0) + { + qWarning("initial buffer size is too small. no stats will be available"); + return; + } + + p = buf.data(); + end = p + len; + + // Select scanf format + if (strstr(buf, "compressed")) + fmt = fmtopt[0]; + else + fmt = fmtopt[1]; + + // Count number of lines - number of ports is 2 less than number of lines + count = 0; + while (p < end) + { + if (*p == '\n') + count++; + p++; + } + count -= 2; + + if (count <= 0) + { + qWarning("no ports in /proc/dev/net - no stats will be available"); + return; + } + + portStats = (PortStats**) calloc(count, sizeof(PortStats)); + Q_ASSERT(portStats != NULL); + + // + // Populate the port stats array + // + p = buf.data(); + + // Skip first two lines + while (*p != '\n') + p++; + p++; + while (*p != '\n') + p++; + p++; + + index = 0; + while (p < end) + { + char* q; + + // Skip whitespace + while ((p < end) && (*p == ' ')) + p++; + + q = p; + + // Get interface name + while ((q < end) && (*q != ':') && (*q != '\n')) + q++; + + if ((q < end) && (*q == ':')) + { + foreach(LinuxPort* port, allPorts_) + { + if (strncmp(port->name(), p, int(q-p)) == 0) + { + portStats[index] = &(port->stats_); + break; + } + } + } + index++; + + // Skip till newline + p = q; + while (*p != '\n') + p++; + p++; + } + Q_ASSERT(index == count); + + qDebug("stats for %d ports setup", count); + + // + // We are all set - Let's start polling for stats! + // + while (1) + { + lseek(fd, 0, SEEK_SET); + len = read(fd, (void*) buf.data(), buf.size()); + if (len < 0) + { + if (buf.size() > 1*1024*1024) + { + qWarning("buffer size hit limit. no more stats"); + return; + } + qDebug("doubling buffer size. curr = %d", buf.size()); + buf.resize(buf.size() * 2); + continue; + } + + p = buf.data(); + end = p + len; + + // Skip first two lines + while (*p != '\n') + p++; + p++; + while (*p != '\n') + p++; + p++; + + index = 0; + while (p < end) + { + uint dummy; + quint64 rxBytes, rxPkts; + quint64 txBytes, txPkts; + + // Skip interface name - we assume the number and order of ports + // won't change since we parsed the output before we started polling + while ((p < end) && (*p != ':') && (*p != '\n')) + p++; + if (p >= end) + break; + if (*p == '\n') + { + index++; + continue; + } + p++; + + sscanf(p, fmt, + &rxBytes, &rxPkts, &dummy, &dummy, &dummy, &dummy, &dummy, &dummy, + &txBytes, &txPkts, &dummy, &dummy, &dummy, &dummy, &dummy, &dummy); + + if (index < count) + { + AbstractPort::PortStats *stats = portStats[index]; + if (stats) + { + stats->rxPps = (rxPkts - stats->rxPkts)/kRefreshFreq_; + stats->rxBps = (rxBytes - stats->rxBytes)/kRefreshFreq_; + stats->rxPkts = rxPkts; + stats->rxBytes = rxBytes; + stats->txPps = (txPkts - stats->txPkts)/kRefreshFreq_; + stats->txBps = (txBytes - stats->txBytes)/kRefreshFreq_; + stats->txPkts = txPkts; + stats->txBytes = txBytes; + } + } + + while (*p != '\n') + p++; + p++; + index++; + } + QThread::sleep(kRefreshFreq_); + } +} + +#endif diff --git a/server/linuxport.h b/server/linuxport.h new file mode 100644 index 0000000..8fbedf5 --- /dev/null +++ b/server/linuxport.h @@ -0,0 +1,56 @@ +/* +Copyright (C) 2011 Srivats P. + +This file is part of "Ostinato" + +This is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +#ifndef _SERVER_LINUX_PORT_H +#define _SERVER_LINUX_PORT_H + +#include + +#ifdef Q_OS_LINUX + +#include "pcapport.h" + +class LinuxPort : public PcapPort +{ +public: + LinuxPort(int id, const char *device); + ~LinuxPort(); + + void init(); + + virtual OstProto::LinkState linkState(); + virtual bool hasExclusiveControl(); + virtual bool setExclusiveControl(bool exclusive); + +protected: + class StatsMonitor: public QThread + { + public: + StatsMonitor(); + void run(); + private: + static const int kRefreshFreq_ = 1; // in seconds + }; + + static QList allPorts_; + static StatsMonitor *monitor_; // rx/tx stats for ALL ports +}; +#endif + +#endif diff --git a/server/pcapport.cpp b/server/pcapport.cpp index ebb159a..0777543 100644 --- a/server/pcapport.cpp +++ b/server/pcapport.cpp @@ -27,6 +27,34 @@ along with this program. If not, see pcap_if_t *PcapPort::deviceList_ = NULL; + +#if defined(Q_OS_LINUX) +typedef struct timeval TimeStamp; +static void inline getTimeStamp(TimeStamp *stamp) +{ + gettimeofday(stamp, NULL); +} + +// Returns time diff in usecs between end and start +static long inline udiffTimeStamp(const TimeStamp *start, const TimeStamp *end) +{ + struct timeval diff; + long usecs; + + timersub(end, start, &diff); + + usecs = diff.tv_usec; + if (diff.tv_sec) + usecs += diff.tv_sec*1e6; + + return usecs; +} +#else +typedef int TimeStamp; +static void inline getTimeStamp(TimeStamp*) {} +static long inline udiffTimeStamp(const TimeStamp*, const TimeStamp*) { return 0; } +#endif + PcapPort::PcapPort(int id, const char *device) : AbstractPort(id, device) { @@ -169,6 +197,11 @@ _open_error: qDebug("%s: Error opening port %s: %s\n", __FUNCTION__, device, errbuf); } +PcapPort::PortMonitor::~PortMonitor() +{ + pcap_close(handle_); +} + void PcapPort::PortMonitor::run() { while (1) @@ -279,8 +312,15 @@ bool PcapPort::PortTransmitter::appendToPacketList(long sec, long usec, sendQ = sendQueueList_.isEmpty() ? NULL : sendQueueList_.last(); if ((sendQ == NULL) || - (sendQ->len + sizeof(pcap_pkthdr) + length) > sendQ->maxlen) + (sendQ->len + 2*sizeof(pcap_pkthdr) + length) > sendQ->maxlen) { + // Add a zero len packet at end of sendQ for inter-sendQ timing + if (sendQ) + { + pcap_pkthdr hdr = pktHdr; + hdr.caplen = 0; + pcap_sendqueue_queue(sendQ, &hdr, (u_char*)packet); + } //! \todo (LOW): calculate sendqueue size sendQ = pcap_sendqueue_alloc(1*1024*1024); sendQueueList_.append(sendQ); @@ -352,7 +392,8 @@ _restart: { i = returnToQIdx_; - udelay(loopDelay_); + if (loopDelay_) + udelay(loopDelay_); goto _restart; } } @@ -366,20 +407,32 @@ void PcapPort::PortTransmitter::stop() int PcapPort::PortTransmitter::sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, int sync) { + TimeStamp ovrStart, ovrEnd; struct timeval ts; struct pcap_pkthdr *hdr = (struct pcap_pkthdr*) queue->buffer; char *end = queue->buffer + queue->len; ts = hdr->ts; - while (1) + getTimeStamp(&ovrStart); + while((char*) hdr < end) { uchar *pkt = (uchar*)hdr + sizeof(*hdr); int pktLen = hdr->caplen; - if (stop_) + if (sync) { - return -2; + long usec = (hdr->ts.tv_sec - ts.tv_sec) * 1000000 + + (hdr->ts.tv_usec - ts.tv_usec); + + getTimeStamp(&ovrEnd); + + usec -= udiffTimeStamp(&ovrStart, &ovrEnd); + if (usec > 0) + udelay(usec); + + ts = hdr->ts; + getTimeStamp(&ovrStart); } // A pktLen of size 0 is used at the end of a sendQueue and before @@ -392,30 +445,21 @@ int PcapPort::PortTransmitter::sendQueueTransmit(pcap_t *p, } // Step to the next packet in the buffer - hdr = (struct pcap_pkthdr*) ((uchar*)hdr + sizeof(*hdr) + pktLen); + hdr = (struct pcap_pkthdr*) (pkt + pktLen); pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr)); - // Check if the end of the user buffer has been reached - if((char*) hdr >= end) - return 0; - - if (sync) + if (stop_) { - long usec = (hdr->ts.tv_sec - ts.tv_sec) * 1000000 + - (hdr->ts.tv_usec - ts.tv_usec); - - if (usec) - { - udelay(usec); - ts = hdr->ts; - } + return -2; } } + + return 0; } void PcapPort::PortTransmitter::udelay(long usec) { -#ifdef Q_OS_WIN32 +#if defined(Q_OS_WIN32) LARGE_INTEGER tgtTicks; LARGE_INTEGER curTicks; @@ -424,6 +468,18 @@ void PcapPort::PortTransmitter::udelay(long usec) while (curTicks.QuadPart < tgtTicks.QuadPart) QueryPerformanceCounter(&curTicks); +#elif defined(Q_OS_LINUX) + struct timeval delay, target, now; + + delay.tv_sec = 0; + delay.tv_usec = usec; + + gettimeofday(&now, NULL); + timeradd(&now, &delay, &target); + + do { + gettimeofday(&now, NULL); + } while (timercmp(&now, &target, <)); #else QThread::usleep(usec); #endif diff --git a/server/pcapport.h b/server/pcapport.h index e82589b..ab5b24a 100644 --- a/server/pcapport.h +++ b/server/pcapport.h @@ -74,6 +74,7 @@ protected: public: PortMonitor(const char *device, Direction direction, AbstractPort::PortStats *stats); + ~PortMonitor(); void run(); pcap_t* handle() { return handle_; } Direction direction() { return direction_; } diff --git a/server/portmanager.cpp b/server/portmanager.cpp index 081d5db..9a47ae7 100644 --- a/server/portmanager.cpp +++ b/server/portmanager.cpp @@ -22,6 +22,7 @@ along with this program. If not, see #include #include +#include "linuxport.h" #include "pcapport.h" #include "winpcapport.h" @@ -47,8 +48,10 @@ PortManager::PortManager() if (device->description) qDebug(" (%s)\n", device->description); -#ifdef Q_OS_WIN32 +#if defined(Q_OS_WIN32) port = new WinPcapPort(i, device->name); +#elif defined(Q_OS_LINUX) + port = new LinuxPort(i, device->name); #else port = new PcapPort(i, device->name); #endif @@ -62,11 +65,13 @@ PortManager::PortManager() continue; } - port->init(); portList_.append(port); } pcap_freealldevs(deviceList); + + foreach(AbstractPort *port, portList_) + port->init(); return; }