diff --git a/binding/rpc.py b/binding/rpc.py index aea0a53..2078bb3 100644 --- a/binding/rpc.py +++ b/binding/rpc.py @@ -74,6 +74,7 @@ class OstinatoRpcChannel(RpcChannel): MSG_TYPE_REQUEST = 1 MSG_TYPE_RESPONSE = 2 MSG_TYPE_BLOB = 3 + MSG_TYPE_ERROR = 4 error = '' try: @@ -118,6 +119,8 @@ class OstinatoRpcChannel(RpcChannel): self.log.debug('parsed response %s', response) elif msg_type == MSG_TYPE_BLOB: response = resp + elif msg_type == MSG_TYPE_ERROR: + raise RpcError(unicode(resp, 'utf-8')) else: raise RpcError('unknown RPC msg type %d' % msg_type) @@ -152,7 +155,7 @@ class OstinatoRpcChannel(RpcChannel): self.log.exception(error) raise except RpcError as e: - error = 'ERROR: Unknown reply received for RPC %s() (%s) ' % ( + error = 'ERROR: error received for RPC %s() (%s) ' % ( method.name, e) self.log.exception(error) raise diff --git a/rpc/pbrpcchannel.cpp b/rpc/pbrpcchannel.cpp index 5ef1003..f1fec25 100644 --- a/rpc/pbrpcchannel.cpp +++ b/rpc/pbrpcchannel.cpp @@ -22,6 +22,8 @@ along with this program. If not, see #include +static uchar msgBuf[4096]; + PbRpcChannel::PbRpcChannel(QHostAddress ip, quint16 port) { isPending = false; @@ -94,8 +96,7 @@ void PbRpcChannel::CallMethod( ::google::protobuf::Message *response, ::google::protobuf::Closure* done) { - char msgBuf[PB_HDR_SIZE]; - char* const msg = &msgBuf[0]; + char* const msg = (char*) &msgBuf[0]; int len; bool ret; @@ -161,8 +162,7 @@ void PbRpcChannel::CallMethod( void PbRpcChannel::on_mpSocket_readyRead() { - uchar msg[PB_HDR_SIZE]; - uchar *p = (uchar*) &msg; + uchar *msg = (uchar*) &msgBuf; int msgLen; static bool parsing = false; static quint16 type, method; @@ -183,9 +183,9 @@ void PbRpcChannel::on_mpSocket_readyRead() Q_ASSERT(msgLen == PB_HDR_SIZE); - type = qFromBigEndian(p+0); - method = qFromBigEndian(p+2); - len = qFromBigEndian(p+4); + type = qFromBigEndian(msg+0); + method = qFromBigEndian(msg+2); + len = qFromBigEndian(msg+4); //BUFDUMP(msg, PB_HDR_SIZE); //qDebug("type = %hu, method = %hu, len = %u", type, method, len); @@ -207,8 +207,8 @@ void PbRpcChannel::on_mpSocket_readyRead() { int l; - l = mpSocket->read((char*)msg, sizeof(msg)); - blob->write((char*)msg, l); + l = mpSocket->read((char*)msgBuf, sizeof(msgBuf)); + blob->write((char*)msgBuf, l); cumLen += l; } @@ -221,13 +221,13 @@ void PbRpcChannel::on_mpSocket_readyRead() if (!isPending) { - qDebug("not waiting for response"); + qWarning("not waiting for response"); goto _error_exit2; } if (pendingMethodId != method) { - qDebug("invalid method id %d (expected = %d)", method, + qWarning("invalid method id %d (expected = %d)", method, pendingMethodId); goto _error_exit2; } @@ -241,13 +241,13 @@ void PbRpcChannel::on_mpSocket_readyRead() if (!isPending) { - qDebug("not waiting for response"); + qWarning("not waiting for response"); goto _error_exit; } if (pendingMethodId != method) { - qDebug("invalid method id %d (expected = %d)", method, + qWarning("invalid method id %d (expected = %d)", method, pendingMethodId); goto _error_exit; } @@ -274,6 +274,47 @@ void PbRpcChannel::on_mpSocket_readyRead() } break; + case PB_MSG_TYPE_ERROR: + { + static quint32 cumLen = 0; + static QByteArray error; + + while ((cumLen < len) && mpSocket->bytesAvailable()) + { + int l; + + l = mpSocket->read((char*)msgBuf, sizeof(msgBuf)); + error.append(QByteArray((char*)msgBuf,l)); + cumLen += l; + } + + qDebug("%s: error rcvd %d/%d", __PRETTY_FUNCTION__, cumLen, len); + + if (cumLen < len) + return; + + static_cast(controller)->SetFailed( + QString::fromUtf8(error, len)); + + cumLen = 0; + error.resize(0); + + if (!isPending) + { + qWarning("not waiting for response"); + goto _error_exit2; + } + + if (pendingMethodId != method) + { + qWarning("invalid method id %d (expected = %d)", method, + pendingMethodId); + goto _error_exit2; + } + + break; + } + default: qFatal("%s: unexpected type %d", __PRETTY_FUNCTION__, type); goto _error_exit; diff --git a/rpc/pbrpccommon.h b/rpc/pbrpccommon.h index e1fbdf9..07c8013 100644 --- a/rpc/pbrpccommon.h +++ b/rpc/pbrpccommon.h @@ -35,5 +35,6 @@ along with this program. If not, see #define PB_MSG_TYPE_REQUEST 1 #define PB_MSG_TYPE_RESPONSE 2 #define PB_MSG_TYPE_BINBLOB 3 +#define PB_MSG_TYPE_ERROR 4 #endif diff --git a/rpc/pbrpccontroller.h b/rpc/pbrpccontroller.h index 88782e7..af9c292 100644 --- a/rpc/pbrpccontroller.h +++ b/rpc/pbrpccontroller.h @@ -44,14 +44,17 @@ public: ::google::protobuf::Message* response() { return response_; } // Client Side Methods - void Reset() { failed = false; blob = NULL; } + void Reset() { failed = false; blob = NULL; errStr = ""; } bool Failed() const { return failed; } void StartCancel() { /*! \todo (MED) */} - std::string ErrorText() const { return errStr; } + std::string ErrorText() const { return errStr.toStdString(); } // Server Side Methods + void SetFailed(const QString &reason) + { failed = true; errStr = reason; qWarning(qPrintable(errStr)); } void SetFailed(const std::string &reason) - { failed = true; errStr = reason; } + { SetFailed(QString::fromStdString(reason)); } + QString ErrorString() const { return errStr; } bool IsCanceled() const { return false; }; void NotifyOnCancel(::google::protobuf::Closure* /* callback */) { /*! \todo (MED) */ @@ -64,7 +67,7 @@ public: private: bool failed; QIODevice *blob; - std::string errStr; + QString errStr; ::google::protobuf::Message *request_; ::google::protobuf::Message *response_; diff --git a/rpc/rpcconn.cpp b/rpc/rpcconn.cpp index 9cdbd16..7273df7 100644 --- a/rpc/rpcconn.cpp +++ b/rpc/rpcconn.cpp @@ -101,6 +101,14 @@ void RpcConnection::start() this, SLOT(on_clientSock_error(QAbstractSocket::SocketError))); } +void RpcConnection::writeHeader(char* header, quint16 type, quint16 method, + quint32 length) +{ + *((quint16*)(header+0)) = qToBigEndian(type); + *((quint16*)(header+2)) = qToBigEndian(method); + *((quint32*)(header+4)) = qToBigEndian(length); +} + void RpcConnection::sendRpcReply(PbRpcController *controller) { google::protobuf::Message *response = controller->response(); @@ -111,7 +119,14 @@ void RpcConnection::sendRpcReply(PbRpcController *controller) if (controller->Failed()) { - qDebug("rpc failed"); + QByteArray err = controller->ErrorString().toUtf8(); + + qWarning("rpc failed (%s)", qPrintable(controller->ErrorString())); + len = err.size(); + writeHeader(msg, PB_MSG_TYPE_ERROR, pendingMethodId, len); + clientSock->write(msg, PB_HDR_SIZE); + clientSock->write(err.constData(), len); + goto _exit; } @@ -121,10 +136,7 @@ void RpcConnection::sendRpcReply(PbRpcController *controller) len = blob->size(); qDebug("is binary blob of len %d", len); - *((quint16*)(msg+0)) = qToBigEndian(quint16(PB_MSG_TYPE_BINBLOB)); // type - *((quint16*)(msg+2)) = qToBigEndian(quint16(pendingMethodId)); // method - (*(quint32*)(msg+4)) = qToBigEndian(quint32(len)); // len - + writeHeader(msg, PB_MSG_TYPE_BINBLOB, pendingMethodId, len); clientSock->write(msg, PB_HDR_SIZE); blob->seek(0); @@ -152,10 +164,7 @@ void RpcConnection::sendRpcReply(PbRpcController *controller) } len = response->ByteSize(); - - *((quint16*)(msg+0)) = qToBigEndian(quint16(PB_MSG_TYPE_RESPONSE)); // type - *((quint16*)(msg+2)) = qToBigEndian(quint16(pendingMethodId)); // method - *((quint32*)(msg+4)) = qToBigEndian(quint32(len)); // len + writeHeader(msg, PB_MSG_TYPE_RESPONSE, pendingMethodId, len); // Avoid printing stats since it happens once every couple of seconds if (pendingMethodId != 13) diff --git a/rpc/rpcconn.h b/rpc/rpcconn.h index fb23bb8..7ea581e 100644 --- a/rpc/rpcconn.h +++ b/rpc/rpcconn.h @@ -45,6 +45,8 @@ public: static void connIdMsgHandler(QtMsgType type, const char* msg); private: + void writeHeader(char* header, quint16 type, quint16 method, + quint32 length); void sendRpcReply(PbRpcController *controller); signals: diff --git a/server/myservice.cpp b/server/myservice.cpp index 5a591aa..1696cba 100644 --- a/server/myservice.cpp +++ b/server/myservice.cpp @@ -208,6 +208,9 @@ void MyService::addStream(::google::protobuf::RpcController* controller, if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; + if (portInfo[portId]->isTransmitOn()) + goto _port_busy; + portLock[portId]->lockForWrite(); for (int i = 0; i < request->stream_id_size(); i++) { @@ -232,8 +235,13 @@ void MyService::addStream(::google::protobuf::RpcController* controller, done->Run(); return; +_port_busy: + controller->SetFailed("Port Busy"); + goto _exit; + _invalid_port: controller->SetFailed("invalid portid"); +_exit: done->Run(); } @@ -250,6 +258,9 @@ void MyService::deleteStream(::google::protobuf::RpcController* controller, if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; + if (portInfo[portId]->isTransmitOn()) + goto _port_busy; + portLock[portId]->lockForWrite(); for (int i = 0; i < request->stream_id_size(); i++) portInfo[portId]->deleteStream(request->stream_id(i).id()); @@ -260,8 +271,12 @@ void MyService::deleteStream(::google::protobuf::RpcController* controller, done->Run(); return; +_port_busy: + controller->SetFailed("Port Busy"); + goto _exit; _invalid_port: controller->SetFailed("invalid portid"); +_exit: done->Run(); } @@ -278,6 +293,9 @@ void MyService::modifyStream(::google::protobuf::RpcController* controller, if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; + if (portInfo[portId]->isTransmitOn()) + goto _port_busy; + portLock[portId]->lockForWrite(); for (int i = 0; i < request->stream_size(); i++) { @@ -300,8 +318,12 @@ void MyService::modifyStream(::google::protobuf::RpcController* controller, done->Run(); return; +_port_busy: + controller->SetFailed("Port Busy"); + goto _exit; _invalid_port: controller->SetFailed("invalid portid"); +_exit: done->Run(); } diff --git a/server/pcapport.cpp b/server/pcapport.cpp index 1105401..8b500f2 100644 --- a/server/pcapport.cpp +++ b/server/pcapport.cpp @@ -597,8 +597,10 @@ _exit: void PcapPort::PortTransmitter::start() { // FIXME: return error - if (state_ == kRunning) + if (state_ == kRunning) { + qWarning("Transmit start requested but is already running!"); return; + } state_ = kNotStarted; QThread::start(); @@ -614,6 +616,16 @@ void PcapPort::PortTransmitter::stop() while (state_ == kRunning) QThread::msleep(10); } + else { + // FIXME: return error + qWarning("Transmit stop requested but is not running!"); + return; + } +} + +bool PcapPort::PortTransmitter::isRunning() +{ + return (state_ == kRunning); } int PcapPort::PortTransmitter::sendQueueTransmit(pcap_t *p, @@ -808,8 +820,10 @@ _exit: void PcapPort::PortCapturer::start() { // FIXME: return error - if (state_ == kRunning) + if (state_ == kRunning) { + qWarning("Capture start requested but is already running!"); return; + } state_ = kNotStarted; QThread::start(); @@ -825,6 +839,16 @@ void PcapPort::PortCapturer::stop() while (state_ == kRunning) QThread::msleep(10); } + else { + // FIXME: return error + qWarning("Capture stop requested but is not running!"); + return; + } +} + +bool PcapPort::PortCapturer::isRunning() +{ + return (state_ == kRunning); } QFile* PcapPort::PortCapturer::captureFile() diff --git a/server/pcapport.h b/server/pcapport.h index 5883c43..b25ab5c 100644 --- a/server/pcapport.h +++ b/server/pcapport.h @@ -116,6 +116,7 @@ protected: void run(); void start(); void stop(); + bool isRunning(); private: enum State { @@ -201,6 +202,7 @@ protected: void run(); void start(); void stop(); + bool isRunning(); QFile* captureFile(); private: diff --git a/test/rpctest.py b/test/rpctest.py new file mode 100644 index 0000000..759004c --- /dev/null +++ b/test/rpctest.py @@ -0,0 +1,412 @@ +#! /usr/bin/env python + +# standard modules +import logging +import os +import subprocess +import sys +import time + +sys.path.append('../binding') +from core import ost_pb, DroneProxy +from rpc import RpcError +from protocols.mac_pb2 import mac +from protocols.ip4_pb2 import ip4, Ip4 + +class Test: + pass + +class TestSuite: + def __init__(self): + self.results = [] + self.total = 0 + self.passed = 0 + self.completed = False + + def test_begin(self, name): + test = Test() + test.name = name + test.passed = False + self.running = test + print('-----------------------------------------------------------') + print('@@TEST: %s' % name) + print('-----------------------------------------------------------') + + def test_end(self, result): + if self.running: + self.running.passed = result + self.results.append(self.running) + self.total = self.total + 1 + if result: + self.passed = self.passed + 1 + self.running = None + print('@@RESULT: %s' % ('PASS' if result else 'FAIL')) + else: + raise Exception('Test end without a test begin') + + def report(self): + print('===========================================================') + print('TEST REPORT') + print('===========================================================') + for test in self.results: + print('%s: %d' % (test.name, test.passed)) + print('Passed: %d/%d' % (self.passed, self.total)) + print('Completed: %d' % (self.completed)) + + def complete(self): + self.completed = True + + def passed(self): + return passed == total and self.completed + +# initialize defaults +host_name = '127.0.0.1' +tx_port_number = -1 +rx_port_number = -1 + +if sys.platform == 'win32': + tshark = r'C:\Program Files\Wireshark\tshark.exe' +else: + tshark = 'tshark' + + +# setup logging +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +print('') +print('This test uses the following topology -') +print('') +print(' +-------+ ') +print(' | |Tx--->----+') +print(' | Drone | |') +print(' | |Rx---<----+') +print(' +-------+ ') +print('') +print('A loopback port is used as both the Tx and Rx ports') +print('') + +suite = TestSuite() +drone = DroneProxy(host_name) + +try: + # ----------------------------------------------------------------- # + # Baseline Configuration + # ----------------------------------------------------------------- # + + # connect to drone + log.info('connecting to drone(%s:%d)' + % (drone.hostName(), drone.portNumber())) + drone.connect() + + # retreive port id list + log.info('retreiving port list') + port_id_list = drone.getPortIdList() + + # retreive port config list + log.info('retreiving port config for all ports') + port_config_list = drone.getPortConfig(port_id_list) + + if len(port_config_list.port) == 0: + log.warning('drone has no ports!') + sys.exit(1) + + # iterate port list to find a loopback port to use as the tx/rx port id + print('Port List') + print('---------') + for port in port_config_list.port: + print('%d.%s (%s)' % (port.port_id.id, port.name, port.description)) + # use a loopback port as default tx/rx port + if ('lo' in port.name or 'loopback' in port.description.lower()): + tx_port_number = port.port_id.id + rx_port_number = port.port_id.id + + if tx_port_number < 0 or rx_port_number < 0: + log.warning('loopback port not found') + sys.exit(1) + + print('Using port %d as tx/rx port(s)') + + tx_port = ost_pb.PortIdList() + tx_port.port_id.add().id = tx_port_number; + + rx_port = ost_pb.PortIdList() + rx_port.port_id.add().id = rx_port_number; + + # add a stream + stream_id = ost_pb.StreamIdList() + stream_id.port_id.CopyFrom(tx_port.port_id[0]) + stream_id.stream_id.add().id = 1 + log.info('adding tx_stream %d' % stream_id.stream_id[0].id) + drone.addStream(stream_id) + + # configure the stream + stream_cfg = ost_pb.StreamConfigList() + stream_cfg.port_id.CopyFrom(tx_port.port_id[0]) + s = stream_cfg.stream.add() + s.stream_id.id = stream_id.stream_id[0].id + s.core.is_enabled = True + s.control.num_packets = 10 + + # setup stream protocols as mac:eth2:ip4:udp:payload + p = s.protocol.add() + p.protocol_id.id = ost_pb.Protocol.kMacFieldNumber + p.Extensions[mac].dst_mac = 0x001122334455 + p.Extensions[mac].src_mac = 0x00aabbccddee + + p = s.protocol.add() + p.protocol_id.id = ost_pb.Protocol.kEth2FieldNumber + + p = s.protocol.add() + p.protocol_id.id = ost_pb.Protocol.kIp4FieldNumber + # reduce typing by creating a shorter reference to p.Extensions[ip4] + ip = p.Extensions[ip4] + ip.src_ip = 0x01020304 + ip.dst_ip = 0x05060708 + ip.dst_ip_mode = Ip4.e_im_inc_host + + s.protocol.add().protocol_id.id = ost_pb.Protocol.kUdpFieldNumber + s.protocol.add().protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber + + log.info('configuring tx_stream %d' % stream_id.stream_id[0].id) + drone.modifyStream(stream_cfg) + + # clear tx/rx stats + log.info('clearing tx/rx stats') + drone.clearStats(tx_port) + drone.clearStats(rx_port) + + # ----------------------------------------------------------------- # + # TESTCASE: Verify invoking addStream() during transmit fails + # TESTCASE: Verify invoking modifyStream() during transmit fails + # TESTCASE: Verify invoking deleteStream() during transmit fails + # ----------------------------------------------------------------- # + sid = ost_pb.StreamIdList() + sid.port_id.CopyFrom(tx_port.port_id[0]) + sid.stream_id.add().id = 2 + + passed = False + suite.test_begin('addStreamDuringTransmitFails') + drone.startTx(tx_port) + try: + log.info('adding tx_stream %d' % sid.stream_id[0].id) + drone.addStream(sid) + except RpcError as e: + if ('Port Busy' in str(e)): + passed = True + else: + raise + finally: + drone.stopTx(tx_port) + suite.test_end(passed) + + passed = False + suite.test_begin('modifyStreamDuringTransmitFails') + scfg = ost_pb.StreamConfigList() + scfg.port_id.CopyFrom(tx_port.port_id[0]) + s = scfg.stream.add() + s.stream_id.id = sid.stream_id[0].id + s.protocol.add().protocol_id.id = ost_pb.Protocol.kMacFieldNumber + s.protocol.add().protocol_id.id = ost_pb.Protocol.kArpFieldNumber + s.protocol.add().protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber + drone.startTx(tx_port) + try: + log.info('configuring tx_stream %d' % sid.stream_id[0].id) + drone.modifyStream(scfg) + except RpcError as e: + if ('Port Busy' in str(e)): + passed = True + else: + raise + finally: + drone.stopTx(tx_port) + suite.test_end(passed) + + passed = False + suite.test_begin('deleteStreamDuringTransmitFails') + drone.startTx(tx_port) + try: + log.info('deleting tx_stream %d' % sid.stream_id[0].id) + drone.deleteStream(sid) + except RpcError as e: + if ('Port Busy' in str(e)): + passed = True + else: + raise + finally: + drone.stopTx(tx_port) + suite.test_end(passed) + + + # ----------------------------------------------------------------- # + # TESTCASE: Verify invoking startTx() during transmit is a NOP, + # not a restart + # ----------------------------------------------------------------- # + passed = False + suite.test_begin('startTxDuringTransmitIsNopNotRestart') + drone.startCapture(rx_port) + drone.startTx(tx_port) + try: + log.info('sleeping for 4s ...') + time.sleep(4) + log.info('starting transmit multiple times') + drone.startTx(tx_port) + time.sleep(1) + drone.startTx(tx_port) + time.sleep(1) + drone.startTx(tx_port) + time.sleep(1) + log.info('waiting for transmit to finish ...') + time.sleep(5) + drone.stopTx(tx_port) + drone.stopCapture(rx_port) + + buff = drone.getCaptureBuffer(rx_port.port_id[0]) + drone.saveCaptureBuffer(buff, 'capture.pcap') + log.info('dumping Rx capture buffer') + cap_pkts = subprocess.check_output([tshark, '-r', 'capture.pcap']) + print(cap_pkts) + if '5.6.7.8' in cap_pkts: + passed = True + os.remove('capture.pcap') + except RpcError as e: + raise + finally: + drone.stopTx(tx_port) + suite.test_end(passed) + + + # ----------------------------------------------------------------- # + # TESTCASE: Verify invoking startCapture() during capture is a NOP, + # not a restart + # ----------------------------------------------------------------- # + passed = False + suite.test_begin('startCaptureDuringTransmitIsNopNotRestart') + try: + drone.startCapture(rx_port) + drone.startTx(tx_port) + log.info('sleeping for 4s ...') + time.sleep(4) + log.info('starting capture multiple times') + drone.startCapture(rx_port) + time.sleep(1) + drone.startCapture(rx_port) + time.sleep(1) + drone.startCapture(rx_port) + time.sleep(1) + log.info('waiting for transmit to finish ...') + time.sleep(5) + drone.stopTx(tx_port) + drone.stopCapture(rx_port) + + buff = drone.getCaptureBuffer(rx_port.port_id[0]) + drone.saveCaptureBuffer(buff, 'capture.pcap') + log.info('dumping Rx capture buffer') + cap_pkts = subprocess.check_output([tshark, '-r', 'capture.pcap']) + print(cap_pkts) + if '5.6.7.8' in cap_pkts: + passed = True + os.remove('capture.pcap') + except RpcError as e: + raise + finally: + drone.stopTx(tx_port) + suite.test_end(passed) + + # ----------------------------------------------------------------- # + # TESTCASE: Verify invoking stopTx() when transmit is not running + # is a NOP + # ----------------------------------------------------------------- # + passed = False + suite.test_begin('stopTxWhenTransmitNotRunningIsNop') + try: + tx_stats = drone.getStats(tx_port) + log.info('--> (tx_stats)' + tx_stats.__str__()) + if tx_stats.port_stats[0].state.is_transmit_on: + raise Exception('Unexpected transmit ON state') + log.info('stopping transmit multiple times') + drone.stopTx(tx_port) + time.sleep(1) + drone.stopTx(tx_port) + time.sleep(1) + drone.stopTx(tx_port) + + # if we reached here, that means there was no exception + passed = True + except RpcError as e: + raise + finally: + suite.test_end(passed) + + # ----------------------------------------------------------------- # + # TESTCASE: Verify invoking stopCapture() when capture is not running + # is a NOP + # ----------------------------------------------------------------- # + passed = False + suite.test_begin('stopCaptureWhenCaptureNotRunningIsNop') + try: + rx_stats = drone.getStats(rx_port) + log.info('--> (rx_stats)' + rx_stats.__str__()) + if rx_stats.port_stats[0].state.is_capture_on: + raise Exception('Unexpected capture ON state') + log.info('stopping capture multiple times') + drone.stopCapture(rx_port) + time.sleep(1) + drone.stopCapture(rx_port) + time.sleep(1) + drone.stopCapture(rx_port) + + # if we reached here, that means there was no exception + passed = True + except RpcError as e: + raise + finally: + suite.test_end(passed) + + # ----------------------------------------------------------------- # + # TESTCASE: Verify startCapture(), startTx() sequence captures the + # first packet + # TESTCASE: Verify stopTx(), stopCapture() sequence captures the + # last packet + # ----------------------------------------------------------------- # + passed = False + suite.test_begin('startStopTransmitCaptureOrderCapturesAllPackets') + try: + drone.startCapture(rx_port) + drone.startTx(tx_port) + log.info('waiting for transmit to finish ...') + time.sleep(12) + drone.stopTx(tx_port) + drone.stopCapture(rx_port) + + log.info('getting Rx capture buffer') + buff = drone.getCaptureBuffer(rx_port.port_id[0]) + drone.saveCaptureBuffer(buff, 'capture.pcap') + log.info('dumping Rx capture buffer') + cap_pkts = subprocess.check_output([tshark, '-r', 'capture.pcap']) + print(cap_pkts) + if '5.6.7.8' in cap_pkts and '5.6.7.17' in cap_pkts: + passed = True + os.remove('capture.pcap') + except RpcError as e: + raise + finally: + drone.stopTx(tx_port) + suite.test_end(passed) + + suite.complete() + + # delete streams + log.info('deleting tx_stream %d' % stream_id.stream_id[0].id) + drone.deleteStream(stream_id) + + # bye for now + drone.disconnect() + +except Exception as ex: + log.exception(ex) + +finally: + suite.report() + if not suite.passed: + sys.exit(2);