Refactoring

- Major code reorganization of the server code across several classes with fewer 'friends'
	- New server classes - AbstractPort, PcapPort, WinPcapPort, PortManager
	- With this reorg classes have more focus than earlier and will be hopefully easy to extend

Fixes
	- Ostinato client is now able to successfully reconnect and talk to the Ostinato server after a disconnect - earlier, if a method had been pending during the disconnect, the communication was not up after a reconnect; pending methods are cleaned up at disconnect now
This commit is contained in:
Srivats P. 2009-12-26 17:33:27 +00:00
parent bb6a9235c3
commit a1ae3e7e6c
16 changed files with 1469 additions and 1562 deletions

View File

@ -290,6 +290,15 @@ void PbRpcChannel::on_mpSocket_connected()
void PbRpcChannel::on_mpSocket_disconnected()
{
qDebug("In %s", __FUNCTION__);
pendingMethodId = -1;
controller = NULL;
response = NULL;
isPending = false;
// \todo convert parsing from static to data member
//parsing = false
pendingCallList.clear();
emit disconnected();
}

View File

@ -3,6 +3,8 @@
#include <google/protobuf/service.h>
class QIODevice;
class PbRpcController : public ::google::protobuf::RpcController
{
bool failed;

214
server/abstractport.cpp Normal file
View File

@ -0,0 +1,214 @@
#include "abstractport.h"
#include <QString>
#include <QIODevice>
#include "../common/streambase.h"
AbstractPort::AbstractPort(int id, const char *device)
{
data_.mutable_port_id()->set_id(id);
data_.set_name(QString("if%1 ").arg(id).toStdString());
//! \todo (LOW) admin enable/disable of port
data_.set_is_enabled(true);
//! \todo (HIGH) port exclusive control
data_.set_is_exclusive_control(false);
isSendQueueDirty_ = true;
linkState_ = OstProto::LinkStateUnknown;
memset((void*) &stats_, 0, sizeof(stats_));
resetStats();
}
void AbstractPort::init()
{
}
AbstractPort::~AbstractPort()
{
}
StreamBase* AbstractPort::stream(int streamId)
{
for (int i = 0; i < streamList_.size(); i++)
{
if (streamId == streamList_.at(i)->id())
return streamList_.at(i);
}
return NULL;
}
bool AbstractPort::addStream(StreamBase *stream)
{
streamList_.append(stream);
isSendQueueDirty_ = true;
return true;
}
bool AbstractPort::deleteStream(int streamId)
{
for (int i = 0; i < streamList_.size(); i++)
{
StreamBase *stream;
if (streamId == streamList_.at(i)->id())
{
stream = streamList_.takeAt(i);
delete stream;
isSendQueueDirty_ = true;
return true;
}
}
return false;
}
void AbstractPort::updatePacketList()
{
int len;
bool isVariable;
uchar pktBuf[2000];
long sec;
long usec;
qDebug("In %s", __FUNCTION__);
clearPacketList();
//returnToQIdx = -1;
// First sort the streams by ordinalValue
qSort(streamList_);
sec = 0;
usec = 0;
for (int i = 0; i < streamList_.size(); i++)
{
//_restart:
if (streamList_[i]->isEnabled())
{
long numPackets, numBursts;
long ibg, ipg;
switch (streamList_[i]->sendUnit())
{
case OstProto::StreamControl::e_su_bursts:
numBursts = streamList_[i]->numBursts();
numPackets = streamList_[i]->burstSize();
ibg = 1000000/streamList_[i]->burstRate();
ipg = 0;
break;
case OstProto::StreamControl::e_su_packets:
numBursts = 1;
numPackets = streamList_[i]->numPackets();
ibg = 0;
ipg = 1000000/streamList_[i]->packetRate();
break;
default:
qWarning("Unhandled stream control unit %d",
streamList_[i]->sendUnit());
continue;
}
qDebug("numBursts = %ld, numPackets = %ld\n",
numBursts, numPackets);
qDebug("ibg = %ld, ipg = %ld\n", ibg, ipg);
if (streamList_[i]->isFrameVariable())
{
isVariable = true;
}
else
{
isVariable = false;
len = streamList_[i]->frameValue(pktBuf, sizeof(pktBuf), 0);
}
for (int j = 0; j < numBursts; j++)
{
for (int k = 0; k < numPackets; k++)
{
if (isVariable)
{
len = streamList_[i]->frameValue(pktBuf,
sizeof(pktBuf), j * numPackets + k);
}
if (len <= 0)
continue;
usec += ipg;
if (usec > 1000000)
{
sec++;
usec -= 1000000;
}
qDebug("q(%d, %d, %d) sec = %lu usec = %lu",
i, j, k, sec, usec);
appendToPacketList(sec, usec, pktBuf, len);
} // for (numPackets)
usec += ibg;
if (usec > 1000000)
{
sec++;
usec -= 1000000;
}
} // for (numBursts)
switch(streamList_[i]->nextWhat())
{
case ::OstProto::StreamControl::e_nw_stop:
goto _stop_no_more_pkts;
case ::OstProto::StreamControl::e_nw_goto_id:
/*! \todo (MED): define and use
streamList_[i].d.control().goto_stream_id(); */
/*! \todo (MED): assumes goto Id is less than current!!!!
To support goto to any id, do
if goto_id > curr_id then
i = goto_id;
goto restart;
else
returnToQIdx = 0;
*/
setPacketListLoopMode(true);
goto _stop_no_more_pkts;
case ::OstProto::StreamControl::e_nw_goto_next:
break;
default:
qFatal("---------- %s: Unhandled case (%d) -----------",
__FUNCTION__, streamList_[i]->nextWhat() );
break;
}
} // if (stream is enabled)
} // for (numStreams)
_stop_no_more_pkts:
isSendQueueDirty_ = false;
}
void AbstractPort::stats(PortStats *stats)
{
stats->rxPkts = stats_.rxPkts - epochStats_.rxPkts;
stats->rxBytes = stats_.rxBytes - epochStats_.rxBytes;
stats->rxPps = stats_.rxPps;
stats->rxBps = stats_.rxBps;
stats->txPkts = stats_.txPkts - epochStats_.txPkts;
stats->txBytes = stats_.txBytes - epochStats_.txBytes;
stats->txPps = stats_.txPps;
stats->txBps = stats_.txBps;
}
/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */

82
server/abstractport.h Normal file
View File

@ -0,0 +1,82 @@
#ifndef _SERVER_ABSTRACT_PORT_H
#define _SERVER_ABSTRACT_PORT_H
#include <QList>
#include <QtGlobal>
#include "../common/protocol.pb.h"
class StreamBase;
class QIODevice;
class AbstractPort
{
public:
struct PortStats
{
quint64 rxPkts;
quint64 rxBytes;
quint64 rxPps;
quint64 rxBps;
quint64 txPkts;
quint64 txBytes;
quint64 txPps;
quint64 txBps;
};
AbstractPort(int id, const char *device);
virtual ~AbstractPort();
virtual void init();
int id() { return data_.port_id().id(); }
void protoDataCopyInto(OstProto::Port *port) { port->CopyFrom(data_); }
int streamCount() { return streamList_.size(); }
StreamBase* stream(int streamId);
bool addStream(StreamBase *stream);
bool deleteStream(int streamId);
bool isDirty() { return isSendQueueDirty_; }
void setDirty() { isSendQueueDirty_ = true; }
virtual OstProto::LinkState linkState() { return linkState_; }
virtual void clearPacketList() = 0;
virtual bool appendToPacketList(long sec, long usec, const uchar *packet,
int length) = 0;
virtual void setPacketListLoopMode(bool loop) = 0;
void updatePacketList();
virtual void startTransmit() = 0;
virtual void stopTransmit() = 0;
virtual bool isTransmitOn() = 0;
virtual void startCapture() = 0;
virtual void stopCapture() = 0;
virtual bool isCaptureOn() = 0;
virtual QIODevice* captureData() = 0;
void stats(PortStats *stats);
void resetStats() { epochStats_ = stats_; }
protected:
OstProto::Port data_;
OstProto::LinkState linkState_;
struct PortStats stats_;
//! \todo Need lock for stats access/update
private:
bool isSendQueueDirty_;
/*! \note StreamBase::id() and index into streamList[] are NOT same! */
QList<StreamBase*> streamList_;
struct PortStats epochStats_;
};
#endif
/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */

View File

@ -3,18 +3,24 @@ CONFIG += qt debug
QT += network script
DEFINES += HAVE_REMOTE WPCAP
INCLUDEPATH += "../rpc"
LIBS += -lprotobuf
win32:LIBS += -lwpcap -lpacket
unix:LIBS += -lpcap
win32:LIBS += -L"../common/debug" -lostproto
unix:LIBS += -L"../common" -lostproto
win32:LIBS += -L"../rpc/debug" -lpbrpc
unix:LIBS += -L"../rpc" -lpbrpc
LIBS += -lprotobuf
POST_TARGETDEPS += "../common/debug/libostproto.a" "../rpc/debug/libpbrpc.a"
RESOURCES += drone.qrc
HEADERS += drone.h
FORMS += drone.ui
SOURCES += drone_main.cpp drone.cpp
SOURCES += \
drone_main.cpp \
drone.cpp \
portmanager.cpp \
abstractport.cpp \
pcapport.cpp \
winpcapport.cpp
SOURCES += myservice.cpp
SOURCES += pcapextra.cpp

5
server/drone.qrc Normal file
View File

@ -0,0 +1,5 @@
<RCC>
<qresource prefix="/" >
<file>icons/portgroup.png</file>
</qresource>
</RCC>

File diff suppressed because it is too large Load Diff

View File

@ -1,264 +1,85 @@
#ifndef _MY_SERVICE_H
#define _MY_SERVICE_H
#if 0
#include <google/protobuf/message.h>
#include <google/protobuf/service.h>
#endif
#include "../common/protocol.pb.h"
#include "../common/streambase.h"
#include <pcap.h>
#include <QtGlobal>
#include <QList>
#include <QThread>
#include <QTemporaryFile>
#include "../rpc/pbhelper.h"
#include "pcapextra.h"
#ifndef _MY_SERVICE_H
#define _MY_SERVICE_H
#ifdef Q_OS_WIN32
#include <packet32.h>
#endif
#include <QList>
#ifdef Q_OS_WIN32
#define OID_GEN_MEDIA_CONNECT_STATUS 0x00010114
#endif
#define MAX_PKT_HDR_SIZE 1536
#define MAX_STREAM_NAME_SIZE 64
//! 7 byte Preamble + 1 byte SFD + 4 byte FCS
#define ETH_FRAME_HDR_SIZE 12
#include "../common/protocol.pb.h"
class MyService;
class StreamInfo : public StreamBase
{
friend class MyService;
friend class PortInfo;
OstProto::StreamId mStreamId;
#define MAX_PKT_HDR_SIZE 1536
#define MAX_STREAM_NAME_SIZE 64
class AbstractPort;
class MyService: public OstProto::OstService
{
public:
StreamInfo();
~StreamInfo();
};
class PortInfo
{
friend class MyService;
class PortMonitorRx: public QThread
{
friend class PortInfo;
PortInfo *port;
#ifdef Q_OS_WIN32
PPACKET_OID_DATA oidData;
#endif
public:
PortMonitorRx(PortInfo *port);
static void callbackRx(u_char *state,
const struct pcap_pkthdr *header, const u_char *pkt_data);
void run();
};
MyService();
virtual ~MyService();
class PortMonitorTx: public QThread
{
friend class PortInfo;
PortInfo *port;
#ifdef Q_OS_WIN32
PPACKET_OID_DATA oidData;
#endif
public:
PortMonitorTx(PortInfo *port);
static void callbackTx(u_char *state,
const struct pcap_pkthdr *header, const u_char *pkt_data);
void run();
};
/* Methods provided by the service */
virtual void getPortIdList(::google::protobuf::RpcController* controller,
const ::OstProto::Void* request,
::OstProto::PortIdList* response,
::google::protobuf::Closure* done);
virtual void getPortConfig(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::PortConfigList* response,
::google::protobuf::Closure* done);
virtual void getStreamIdList(::google::protobuf::RpcController* controller,
const ::OstProto::PortId* request,
::OstProto::StreamIdList* response,
::google::protobuf::Closure* done);
virtual void getStreamConfig(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::StreamConfigList* response,
::google::protobuf::Closure* done);
virtual void addStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void deleteStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void modifyStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamConfigList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void startTx(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void stopTx(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void startCapture(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void stopCapture(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void getCaptureBuffer(::google::protobuf::RpcController* controller,
const ::OstProto::PortId* request,
::OstProto::CaptureBuffer* response,
::google::protobuf::Closure* done);
virtual void getStats(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::PortStatsList* response,
::google::protobuf::Closure* done);
virtual void clearStats(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
class PortTransmitter: public QThread
{
friend class PortInfo;
private:
/*! AbstractPort::id() and index into portInfo[] are same! */
QList<AbstractPort*> portInfo;
PortInfo *port;
int m_stop;
};
public:
PortTransmitter(PortInfo *port);
void run();
void stop();
};
#endif
class PortCapture: public QThread
{
friend class PortInfo;
PortInfo *port;
int m_stop;
pcap_t *capHandle;
pcap_dumper_t *dumpHandle;
QTemporaryFile capFile;
public:
PortCapture(PortInfo *port);
~PortCapture();
void run();
void stop();
QFile* captureFile();
};
#ifdef Q_OS_WIN32
LPADAPTER adapter;
PPACKET_OID_DATA oidData;
#endif
OstProto::Port d;
OstProto::LinkState linkState;
struct PortStats
{
quint64 rxPkts;
quint64 rxBytes;
quint64 rxPktsNic;
quint64 rxBytesNic;
quint64 rxPps;
quint64 rxBps;
quint64 txPkts;
quint64 txBytes;
quint64 txPktsNic;
quint64 txBytesNic;
quint64 txPps;
quint64 txBps;
};
//! \todo Need lock for stats access/update
//! Stuff we need to maintain since PCAP doesn't as of now. As and when
// PCAP supports it, we'll remove from here
struct PcapExtra
{
//! PCAP doesn't do any tx stats
quint64 txPkts;
quint64 txBytes;
};
pcap_if_t *dev;
pcap_t *devHandleRx;
pcap_t *devHandleTx;
QList<ost_pcap_send_queue> sendQueueList;
int returnToQIdx; // FIXME(MED): combine with sendQList
bool isSendQueueDirty;
PcapExtra pcapExtra;
PortMonitorRx monitorRx;
PortMonitorTx monitorTx;
PortTransmitter transmitter;
PortCapture capturer;
struct PortStats epochStats;
struct PortStats stats;
struct timeval lastTsRx; //! used for Rate Stats calculations
struct timeval lastTsTx; //! used for Rate Stats calculations
/*! StreamInfo::d::stream_id and index into streamList[] are NOT same! */
QList<StreamInfo*> streamList;
public:
PortInfo(uint id, pcap_if_t *dev);
uint id() { return d.port_id().id(); }
void updateLinkState();
bool isDirty() { return isSendQueueDirty; }
void setDirty(bool dirty) { isSendQueueDirty = dirty; }
void update();
void startTransmit();
void stopTransmit();
void startCapture();
void stopCapture();
QFile* captureFile();
void resetStats();
};
class MyService: public OstProto::OstService
{
uint numPorts;
/*! PortInfo::d::port_id and index into portInfo[] are same! */
QList<PortInfo*> portInfo;
pcap_if_t *alldevs;
int getStreamIndex(unsigned int portIdx,unsigned int streamId);
public:
MyService();
virtual ~MyService();
/* Methods provided by the service */
virtual void getPortIdList(::google::protobuf::RpcController* controller,
const ::OstProto::Void* request,
::OstProto::PortIdList* response,
::google::protobuf::Closure* done);
virtual void getPortConfig(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::PortConfigList* response,
::google::protobuf::Closure* done);
virtual void getStreamIdList(::google::protobuf::RpcController* controller,
const ::OstProto::PortId* request,
::OstProto::StreamIdList* response,
::google::protobuf::Closure* done);
virtual void getStreamConfig(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::StreamConfigList* response,
::google::protobuf::Closure* done);
virtual void addStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void deleteStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void modifyStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamConfigList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void startTx(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void stopTx(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void startCapture(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void stopCapture(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
virtual void getCaptureBuffer(::google::protobuf::RpcController* controller,
const ::OstProto::PortId* request,
::OstProto::CaptureBuffer* response,
::google::protobuf::Closure* done);
virtual void getStats(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::PortStatsList* response,
::google::protobuf::Closure* done);
virtual void clearStats(::google::protobuf::RpcController* controller,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
};
#endif
/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */

View File

@ -1,16 +1,11 @@
#include "pcapextra.h"
#include <string.h> // memcpy()
#include <stdlib.h> // malloc(), free()
#include "pcapextra.h"
/* NOTE: All code borrowed from WinPcap */
#ifndef Q_OS_WIN32
int pcap_setmode(pcap_t *p, int mode)
{
// no STAT mode in libpcap, so just return 0 to indicate success
return 0;
}
pcap_send_queue* pcap_sendqueue_alloc (u_int memsize)
{
pcap_send_queue *tqueue;
@ -61,111 +56,4 @@ int pcap_sendqueue_queue (pcap_send_queue *queue,
}
#endif
u_int ost_pcap_sendqueue_list_transmit(pcap_t *p,
QList<ost_pcap_send_queue> sendQueueList, int returnToQIdx, int sync,
int *p_stop, quint64* p_pkts, quint64* p_bytes,
void (*pf_usleep)(ulong))
{
uint i, ret = 0;
ost_pcap_send_queue sq;
for(i = 0; i < sendQueueList.size(); i++)
{
_restart:
sq = sendQueueList.at(i);
ret += ost_pcap_sendqueue_transmit(p, sq.sendQueue, sync,
p_stop, p_pkts, p_bytes, pf_usleep);
if (*p_stop)
return ret;
//! \todo (HIGH): Timing between subsequent sendQueues
}
if (returnToQIdx >= 0)
{
i = returnToQIdx;
//! \todo (HIGH) 1s fixed; Change this to ipg of last stream
(*pf_usleep)(1000000);
goto _restart;
}
return ret;
}
u_int ost_pcap_sendqueue_transmit(pcap_t *p,
pcap_send_queue *queue, int sync,
int *p_stop, quint64* p_pkts, quint64* p_bytes,
void (*pf_usleep)(ulong))
{
char* PacketBuff = queue->buffer;
int Size = queue->len;
struct pcap_pkthdr *winpcap_hdr;
struct timeval ts;
char* EndOfUserBuff = (char *)PacketBuff + Size;
int ret;
// Start from the first packet
winpcap_hdr = (struct pcap_pkthdr*)PacketBuff;
if((char*)winpcap_hdr + winpcap_hdr->caplen + sizeof(struct pcap_pkthdr) >
EndOfUserBuff )
{
// Malformed buffer
return 0;
}
if (sync)
ts = winpcap_hdr->ts;
while( true ){
if (*p_stop)
return (char*)winpcap_hdr - (char*)PacketBuff;
if(winpcap_hdr->caplen ==0 || winpcap_hdr->caplen > 65536)
{
// Malformed header
return 0;
}
// Send the packet
ret = pcap_sendpacket(p,
(unsigned char*)winpcap_hdr + sizeof(struct pcap_pkthdr),
winpcap_hdr->caplen);
if(ret < 0){
// Error sending the packet
return (char*)winpcap_hdr - (char*)PacketBuff;
}
if (p_pkts) (*p_pkts)++;
if (p_bytes) (*p_bytes) += winpcap_hdr->caplen;
// Step to the next packet in the buffer
//(char*)winpcap_hdr += winpcap_hdr->caplen + sizeof(struct pcap_pkthdr);
winpcap_hdr = (struct pcap_pkthdr*) ((char*)winpcap_hdr +
winpcap_hdr->caplen + sizeof(struct pcap_pkthdr));
// Check if the end of the user buffer has been reached
if( (char*)winpcap_hdr >= EndOfUserBuff )
{
return (char*)winpcap_hdr - (char*)PacketBuff;
}
if (sync)
{
long usec = (winpcap_hdr->ts.tv_sec-ts.tv_sec)*1000000 +
(winpcap_hdr->ts.tv_usec - ts.tv_usec);
if (usec)
{
(*pf_usleep)(usec);
ts = winpcap_hdr->ts;
}
}
}
}

View File

@ -1,36 +1,12 @@
#ifndef _PCAP_EXTRA_H
#define _PCAP_EXTRA_H
#include <Qt>
#include <QList>
#include "pcap.h"
struct ost_pcap_send_queue
{
pcap_send_queue *sendQueue;
//! Used to track num of packets (and their sizes) in the
// send queue. Also used to find out actual num of pkts sent
// in case of partial send in pcap_sendqueue_transmit()
QList<uint> sendQueueCumLen;
};
// Common for all OS - *nix or Win32
u_int ost_pcap_sendqueue_list_transmit(pcap_t *p,
QList<ost_pcap_send_queue> sendQueueList, int returnToQIdx, int sync,
int *p_stop, quint64* p_pkts, quint64* p_bytes,
void (*pf_usleep)(ulong));
u_int ost_pcap_sendqueue_transmit (pcap_t *p,
pcap_send_queue *queue, int sync,
int *p_stop, quint64* p_pkts, quint64* p_bytes,
void (*pf_usleep)(ulong));
#include <pcap.h>
#include <QtGlobal>
#ifndef Q_OS_WIN32
// Only for non Win32
#define PCAP_OPENFLAG_PROMISCUOUS 1
//#define PCAP_OPENFLAG_PROMISCUOUS 1
struct pcap_send_queue
{
@ -39,15 +15,11 @@ struct pcap_send_queue
char *buffer;
};
int pcap_setmode(pcap_t *p, int mode);
#define MODE_STAT 1
pcap_send_queue* pcap_sendqueue_alloc (u_int memsize);
void pcap_sendqueue_destroy (pcap_send_queue *queue);
int pcap_sendqueue_queue (pcap_send_queue *queue,
const struct pcap_pkthdr *pkt_header, const u_char *pkt_data);
#endif
#endif

407
server/pcapport.cpp Normal file
View File

@ -0,0 +1,407 @@
#include "pcapport.h"
pcap_if_t *PcapPort::deviceList_ = NULL;
PcapPort::PcapPort(int id, const char *device)
: AbstractPort(id, device)
{
monitorRx_ = new PortMonitor(device, kDirectionRx, &stats_);
monitorTx_ = new PortMonitor(device, kDirectionTx, &stats_);
transmitter_ = new PortTransmitter(device);
capturer_ = new PortCapturer(device);
if (!deviceList_)
{
char errbuf[PCAP_ERRBUF_SIZE];
if (pcap_findalldevs(&deviceList_, errbuf) == -1)
qDebug("Error in pcap_findalldevs_ex: %s\n", errbuf);
}
for (pcap_if_t *dev = deviceList_; dev != NULL; dev = dev->next)
{
if (strcmp(device, dev->name) == 0)
{
if (dev->description)
data_.set_description(dev->description);
//! \todo set port IP addr also
}
}
}
void PcapPort::init()
{
if (!monitorTx_->isDirectional())
transmitter_->useExternalStats(&stats_);
transmitter_->setHandle(monitorRx_->handle());
monitorRx_->start();
monitorTx_->start();
}
PcapPort::~PcapPort()
{
delete capturer_;
delete transmitter_;
delete monitorTx_;
delete monitorRx_;
}
PcapPort::PortMonitor::PortMonitor(const char *device, Direction direction,
AbstractPort::PortStats *stats)
{
int ret;
char errbuf[PCAP_ERRBUF_SIZE];
direction_ = direction;
isDirectional_ = true;
stats_ = stats;
handle_ = pcap_open_live(device, 64 /* FIXME */, PCAP_OPENFLAG_PROMISCUOUS,
1000 /* ms */, errbuf);
if (handle_ == NULL)
goto _open_error;
switch (direction_)
{
case kDirectionRx:
ret = pcap_setdirection(handle_, PCAP_D_IN);
break;
case kDirectionTx:
ret = pcap_setdirection(handle_, PCAP_D_OUT);
break;
default:
Q_ASSERT(false);
}
if (ret < 0)
goto _set_direction_error;
return;
_set_direction_error:
qDebug("Error setting direction(%d) %s: %s\n", direction, device,
pcap_geterr(handle_));
isDirectional_ = false;
return;
_open_error:
qDebug("Error opening port %s: %s\n", device, pcap_geterr(handle_));
}
void PcapPort::PortMonitor::run()
{
while (1)
{
int ret;
struct pcap_pkthdr *hdr;
const uchar *data;
ret = pcap_next_ex(handle_, &hdr, &data);
switch (ret)
{
case 1:
switch (direction_)
{
case kDirectionRx:
stats_->rxPkts++;
stats_->rxBytes += hdr->len;
break;
case kDirectionTx:
if (isDirectional_)
{
stats_->txPkts++;
stats_->txBytes += hdr->len;
}
break;
default:
Q_ASSERT(false);
}
//! \todo TODO pkt/bit rates
break;
case 0:
//qDebug("%s: timeout. continuing ...", __PRETTY_FUNCTION__);
continue;
case -1:
qWarning("%s: error reading packet (%d): %s",
__PRETTY_FUNCTION__, ret, pcap_geterr(handle_));
break;
case -2:
default:
qFatal("%s: Unexpected return value %d", __PRETTY_FUNCTION__, ret);
}
}
}
PcapPort::PortTransmitter::PortTransmitter(const char *device)
{
char errbuf[PCAP_ERRBUF_SIZE];
returnToQIdx_ = -1;
stop_ = false;
stats_ = new AbstractPort::PortStats;
usingInternalStats_ = true;
handle_ = pcap_open_live(device, 64 /* FIXME */, PCAP_OPENFLAG_PROMISCUOUS,
1000 /* ms */, errbuf);
if (handle_ == NULL)
goto _open_error;
usingInternalHandle_ = true;
return;
_open_error:
qDebug("Error opening port %s: %s\n", device, pcap_geterr(handle_));
usingInternalHandle_ = false;
}
void PcapPort::PortTransmitter::clearPacketList()
{
Q_ASSERT(!isRunning());
// \todo lock for sendQueueList
while(sendQueueList_.size())
{
pcap_send_queue *sq = sendQueueList_.takeFirst();
pcap_sendqueue_destroy(sq);
}
}
bool PcapPort::PortTransmitter::appendToPacketList(long sec, long usec,
const uchar *packet, int length)
{
bool op = true;
pcap_pkthdr pktHdr;
pcap_send_queue *sendQ;
pktHdr.caplen = pktHdr.len = length;
pktHdr.ts.tv_sec = sec;
pktHdr.ts.tv_usec = usec;
if (sendQueueList_.size())
sendQ = sendQueueList_.last();
else
sendQ = pcap_sendqueue_alloc(1*1024*1024);
// Not enough space? Alloc another one!
if ((sendQ->len + length + sizeof(pcap_pkthdr)) > sendQ->maxlen)
{
sendQueueList_.append(sendQ);
//! \todo (LOW): calculate sendqueue size
sendQ = pcap_sendqueue_alloc(1*1024*1024);
}
if (pcap_sendqueue_queue(sendQ, &pktHdr, (u_char*) packet) < 0)
op = false;
sendQueueList_.append(sendQ);
return op;
}
void PcapPort::PortTransmitter::setHandle(pcap_t *handle)
{
if (usingInternalHandle_)
pcap_close(handle_);
handle_ = handle;
usingInternalStats_ = false;
}
void PcapPort::PortTransmitter::useExternalStats(AbstractPort::PortStats *stats)
{
if (usingInternalStats_);
delete stats_;
stats_ = stats;
usingInternalStats_ = false;
}
void PcapPort::PortTransmitter::run()
{
//! \todo (MED) Stream Mode - continuous: define before implement
// NOTE1: We can't use pcap_sendqueue_transmit() directly even on Win32
// 'coz of 2 reasons - there's no way of stopping it before all packets
// in the sendQueue are sent out and secondly, stats are available only
// when all packets have been sent - no periodic updates
//
// NOTE2: Transmit on the Rx Handle so that we can receive it back
// on the Tx Handle to do stats
//
// NOTE3: Update pcapExtra counters - port TxStats will be updated in the
// 'stats callback' function so that both Rx and Tx stats are updated
// together
const int kSyncTransmit = 1;
int i;
for(i = 0; i < sendQueueList_.size(); i++)
{
int ret;
_restart:
ret = sendQueueTransmit(handle_, sendQueueList_.at(i), kSyncTransmit);
if (ret < 0)
return;
//! \todo (HIGH): Timing between subsequent sendQueues
}
if (returnToQIdx_ >= 0)
{
i = returnToQIdx_;
//! \todo (HIGH) 1s fixed; Change this to ipg of last stream
QThread::usleep(1000000);
goto _restart;
}
}
void PcapPort::PortTransmitter::stop()
{
stop_ = true;
}
int PcapPort::PortTransmitter::sendQueueTransmit(pcap_t *p,
pcap_send_queue *queue, int sync)
{
struct timeval ts;
struct pcap_pkthdr *hdr = (struct pcap_pkthdr*) queue->buffer;
char *end = queue->buffer + queue->len;
if (sync)
ts = hdr->ts;
while (1)
{
uchar *pkt = (uchar*)hdr + sizeof(*hdr);
int pktLen = hdr->caplen;
if (stop_)
{
stop_ = false;
return -2;
}
if(pktLen > 0)
pcap_sendpacket(p, pkt, pktLen);
stats_->txPkts++;
stats_->txBytes += pktLen;
// Step to the next packet in the buffer
hdr = (struct pcap_pkthdr*) ((uchar*)hdr + sizeof(*hdr) + pktLen);
pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr));
// Check if the end of the user buffer has been reached
if((char*) hdr >= end)
return 0;
if (sync)
{
long usec = (hdr->ts.tv_sec - ts.tv_sec) * 1000000 +
(hdr->ts.tv_usec - ts.tv_usec);
if (usec)
{
QThread::usleep(usec);
ts = hdr->ts;
}
}
}
}
PcapPort::PortCapturer::PortCapturer(const char *device)
{
device_ = QString::fromAscii(device);
stop_ = false;
if (!capFile_.open())
qWarning("Unable to open temp cap file");
qDebug("cap file = %s", capFile_.fileName().toAscii().constData());
dumpHandle_ = NULL;
handle_ = NULL;
}
PcapPort::PortCapturer::~PortCapturer()
{
capFile_.close();
}
void PcapPort::PortCapturer::run()
{
char errbuf[PCAP_ERRBUF_SIZE];
qDebug("In %s", __PRETTY_FUNCTION__);
if (!capFile_.isOpen())
{
qWarning("temp cap file is not open");
return;
}
handle_ = pcap_open_live(device_.toAscii().constData(), 65535,
PCAP_OPENFLAG_PROMISCUOUS, 1000 /* ms */, errbuf);
if (handle_ == NULL)
{
qDebug("Error opening port %s: %s\n",
device_.toAscii().constData(), pcap_geterr(handle_));
return;
}
dumpHandle_ = pcap_dump_open(handle_,
capFile_.fileName().toAscii().constData());
while (1)
{
int ret;
struct pcap_pkthdr *hdr;
const uchar *data;
ret = pcap_next_ex(handle_, &hdr, &data);
switch (ret)
{
case 1:
pcap_dump((uchar*) dumpHandle_, hdr, data);
break;
case 0:
// timeout: just go back to the loop
break;
case -1:
qWarning("%s: error reading packet (%d): %s",
__PRETTY_FUNCTION__, ret, pcap_geterr(handle_));
break;
case -2:
default:
qFatal("%s: Unexpected return value %d", __PRETTY_FUNCTION__, ret);
}
if (stop_)
{
stop_ = false;
break;
}
}
pcap_dump_close(dumpHandle_);
pcap_close(handle_);
dumpHandle_ = NULL;
handle_ = NULL;
}
void PcapPort::PortCapturer::stop()
{
stop_ = true;
}
QFile* PcapPort::PortCapturer::captureFile()
{
return &capFile_;
}
/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */

121
server/pcapport.h Normal file
View File

@ -0,0 +1,121 @@
#ifndef _SERVER_PCAP_PORT_H
#define _SERVER_PCAP_PORT_H
#include <QTemporaryFile>
#include <QThread>
#include <pcap.h>
#include "abstractport.h"
class PcapPort : public AbstractPort
{
public:
PcapPort(int id, const char *device);
~PcapPort();
void init();
virtual void clearPacketList() {
transmitter_->clearPacketList();
setPacketListLoopMode(false);
}
virtual bool appendToPacketList(long sec, long usec, const uchar *packet,
int length) {
return transmitter_->appendToPacketList(sec, usec, packet, length);
}
virtual void setPacketListLoopMode(bool loop) {
transmitter_->setPacketListLoopMode(loop);
}
virtual void startTransmit() {
if (isDirty())
updatePacketList();
transmitter_->start();
}
virtual void stopTransmit() { transmitter_->stop(); }
virtual bool isTransmitOn() { return transmitter_->isRunning(); }
virtual void startCapture() { capturer_->start(); }
virtual void stopCapture() { capturer_->stop(); }
virtual bool isCaptureOn() { return capturer_->isRunning(); }
virtual QIODevice* captureData() { return capturer_->captureFile(); }
protected:
enum Direction
{
kDirectionRx,
kDirectionTx
};
class PortMonitor: public QThread
{
public:
PortMonitor(const char *device, Direction direction,
AbstractPort::PortStats *stats);
void run();
pcap_t* handle() { return handle_; }
Direction direction() { return direction_; }
bool isDirectional() { return isDirectional_; }
protected:
AbstractPort::PortStats *stats_;
private:
pcap_t *handle_;
Direction direction_;
bool isDirectional_;
};
class PortTransmitter: public QThread
{
public:
PortTransmitter(const char *device);
void clearPacketList();
bool appendToPacketList(long sec, long usec, const uchar *packet,
int length);
void setPacketListLoopMode(bool loop) {
returnToQIdx_ = loop ? 0 : -1;
}
void setHandle(pcap_t *handle);
void useExternalStats(AbstractPort::PortStats *stats);
void run();
void stop();
private:
int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, int sync);
QList<pcap_send_queue*> sendQueueList_;
int returnToQIdx_;
bool usingInternalStats_;
AbstractPort::PortStats *stats_;
bool usingInternalHandle_;
pcap_t *handle_;
bool stop_;
};
class PortCapturer: public QThread
{
public:
PortCapturer(const char *device);
~PortCapturer();
void run();
void stop();
QFile* captureFile();
private:
QString device_;
bool stop_;
QTemporaryFile capFile_;
pcap_t *handle_;
pcap_dumper_t *dumpHandle_;
};
PortMonitor *monitorRx_;
PortMonitor *monitorTx_;
private:
PortTransmitter *transmitter_;
PortCapturer *capturer_;
static pcap_if_t *deviceList_;
};
#endif
/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */

57
server/portmanager.cpp Normal file
View File

@ -0,0 +1,57 @@
#include "portmanager.h"
#include <pcap.h>
#include "winpcapport.h"
PortManager *PortManager::instance_ = NULL;
PortManager::PortManager()
{
int i;
pcap_if_t *deviceList;
pcap_if_t *device;
char errbuf[PCAP_ERRBUF_SIZE];
qDebug("Retrieving the device list from the local machine\n");
if (pcap_findalldevs(&deviceList, errbuf) == -1)
qDebug("Error in pcap_findalldevs_ex: %s\n", errbuf);
for(device = deviceList, i = 0; device != NULL; device = device->next, i++)
{
AbstractPort *port;
#ifdef Q_OS_WIN32
port = new WinPcapPort(i, device->name);
#else
port = new PcapPort(i, device->name);
#endif
port->init();
portList_.append(port);
qDebug("%d. %s", i, device->name);
if (device->description)
qDebug(" (%s)\n", device->description);
}
pcap_freealldevs(deviceList);
return;
}
PortManager::~PortManager()
{
}
PortManager* PortManager::instance()
{
if (!instance_)
instance_ = new PortManager;
return instance_;
}
/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */

26
server/portmanager.h Normal file
View File

@ -0,0 +1,26 @@
#ifndef _SERVER_PORT_MANAGER_H
#define _SERVER_PORT_MANAGER_H
#include <QList>
#include "abstractport.h"
class PortManager
{
public:
PortManager();
~PortManager();
int portCount() { return portList_.size(); }
AbstractPort* port(int id) { return portList_[id]; }
static PortManager* instance();
private:
QList<AbstractPort*> portList_;
static PortManager *instance_;
};
#endif
/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */

144
server/winpcapport.cpp Normal file
View File

@ -0,0 +1,144 @@
#include "winpcapport.h"
const uint OID_GEN_MEDIA_CONNECT_STATUS = 0x00010114;
WinPcapPort::WinPcapPort(int id, const char *device)
: PcapPort(id, device)
{
delete monitorRx_;
delete monitorTx_;
monitorRx_ = new PortMonitor(device, kDirectionRx, &stats_);
monitorTx_ = new PortMonitor(device, kDirectionTx, &stats_);
adapter_ = PacketOpenAdapter((CHAR*)device);
if (!adapter_)
qFatal("Unable to open adapter %s", device);
linkStateOid_ = (PPACKET_OID_DATA) malloc(sizeof(PACKET_OID_DATA) +
sizeof(uint));
if (!linkStateOid_)
qFatal("failed to alloc oidData");
}
WinPcapPort::~WinPcapPort()
{
}
OstProto::LinkState WinPcapPort::linkState()
{
memset(linkStateOid_, 0, sizeof(PACKET_OID_DATA) + sizeof(uint));
linkStateOid_->Oid = OID_GEN_MEDIA_CONNECT_STATUS;
linkStateOid_->Length = sizeof(uint);
if (PacketRequest(adapter_, 0, linkStateOid_))
{
uint state;
if (linkStateOid_->Length == sizeof(state))
{
memcpy((void*)&state, (void*)linkStateOid_->Data,
linkStateOid_->Length);
if (state == 0)
linkState_ = OstProto::LinkStateUp;
else if (state == 1)
linkState_ = OstProto::LinkStateDown;
}
}
return linkState_;
}
WinPcapPort::PortMonitor::PortMonitor(const char *device, Direction direction,
AbstractPort::PortStats *stats)
: PcapPort::PortMonitor(device, direction, stats)
{
pcap_setmode(handle(), MODE_STAT);
}
void WinPcapPort::PortMonitor::run()
{
struct timeval lastTs;
quint64 lastTxPkts = 0;
quint64 lastTxBytes = 0;
qWarning("in %s", __PRETTY_FUNCTION__);
lastTs.tv_sec = 0;
lastTs.tv_usec = 0;
while (1)
{
int ret;
struct pcap_pkthdr *hdr;
const uchar *data;
ret = pcap_next_ex(handle(), &hdr, &data);
switch (ret)
{
case 1:
{
quint64 pkts = *((quint64*)(data + 0));
quint64 bytes = *((quint64*)(data + 8));
// TODO: is it 12 or 16?
bytes -= pkts * 12;
uint usec = (hdr->ts.tv_sec - lastTs.tv_sec) * 1000000 +
(hdr->ts.tv_usec - lastTs.tv_usec);
switch (direction())
{
case kDirectionRx:
stats_->rxPkts += pkts;
stats_->rxBytes += bytes;
stats_->rxPps = (pkts * 1000000) / usec;
stats_->rxBps = (bytes * 1000000) / usec;
break;
case kDirectionTx:
if (isDirectional())
{
stats_->txPkts += pkts;
stats_->txBytes += bytes;
}
else
{
// Assuming stats_->txXXX are updated externally
quint64 txPkts = stats_->txPkts;
quint64 txBytes = stats_->txBytes;
pkts = txPkts - lastTxPkts;
bytes = txBytes - lastTxBytes;
lastTxPkts = txPkts;
lastTxBytes = txBytes;
}
stats_->txPps = (pkts * 1000000) / usec;
stats_->txBps = (bytes * 1000000) / usec;
break;
default:
Q_ASSERT(false);
}
break;
}
case 0:
//qDebug("%s: timeout. continuing ...", __PRETTY_FUNCTION__);
continue;
case -1:
qWarning("%s: error reading packet (%d): %s",
__PRETTY_FUNCTION__, ret, pcap_geterr(handle()));
break;
case -2:
default:
qFatal("%s: Unexpected return value %d", __PRETTY_FUNCTION__, ret);
}
lastTs.tv_sec = hdr->ts.tv_sec;
lastTs.tv_usec = hdr->ts.tv_usec;
QThread::msleep(1000);
}
}
/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */

31
server/winpcapport.h Normal file
View File

@ -0,0 +1,31 @@
#ifndef _SERVER_WIN_PCAP_PORT_H
#define _SERVER_WIN_PCAP_PORT_H
#include "pcapport.h"
#include <packet32.h>
class WinPcapPort : public PcapPort
{
public:
WinPcapPort(int id, const char *device);
~WinPcapPort();
virtual OstProto::LinkState linkState();
protected:
class PortMonitor: public PcapPort::PortMonitor
{
public:
PortMonitor(const char *device, Direction direction,
AbstractPort::PortStats *stats);
void run();
};
private:
LPADAPTER adapter_;
PPACKET_OID_DATA linkStateOid_ ;
};
#endif
/* vim: set shiftwidth=4 tabstop=8 softtabstop=4 expandtab: */