c70811eaa4
The problem happens for bidirectional flows. The sequence of events is as follows when you start Tx on Ports p1, p2 with the current code - 1. Clear stream stats on p1 2. Start tx on p1 3. Clear stream stats on p2 4. Start tx on p2 By the time #3 is executed, it may have already rx packets from p1 which are being incorrectly cleared, this will cause these number of packets to show up as dropped instead - incorrectly. The fix is to change the order like this - 1. Clear stream stats on p1 2. Clear stream stats on p2 3. Start tx on p1 4. Start tx on p2 Unidirectional flows will not see this problem - as long as startTx is done only on the Tx port and not the Rx port. This bug is a regression caused due to the code changes introduced for the stream stats rates feature implemented in 1.2.0
1354 lines
39 KiB
C++
1354 lines
39 KiB
C++
/*
|
|
Copyright (C) 2010-2016 Srivats P.
|
|
|
|
This file is part of "Ostinato"
|
|
|
|
This is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>
|
|
*/
|
|
|
|
|
|
#include "myservice.h"
|
|
|
|
#include "drone.h"
|
|
|
|
#if 0
|
|
#include <qglobal.h>
|
|
#include <qendian.h>
|
|
#include "qdebug.h"
|
|
|
|
#include "../common/protocollistiterator.h"
|
|
#include "../common/abstractprotocol.h"
|
|
#endif
|
|
|
|
#include "../common/framevalueattrib.h"
|
|
#include "../common/streambase.h"
|
|
#include "../rpc/pbrpccontroller.h"
|
|
#include "device.h"
|
|
#include "devicemanager.h"
|
|
#include "portmanager.h"
|
|
|
|
#include <QStringList>
|
|
#include <QThread>
|
|
|
|
|
|
extern Drone *drone;
|
|
extern char *version;
|
|
|
|
MyService::MyService()
|
|
{
|
|
PortManager *portManager = PortManager::instance();
|
|
int n = portManager->portCount();
|
|
|
|
for (int i = 0; i < n; i++) {
|
|
portInfo.append(portManager->port(i));
|
|
#if QT_VERSION >= 0x040400
|
|
portLock.append(new QReadWriteLock(QReadWriteLock::Recursive));
|
|
#else
|
|
portLock.append(new QReadWriteLock());
|
|
#endif
|
|
}
|
|
}
|
|
|
|
MyService::~MyService()
|
|
{
|
|
while (!portLock.isEmpty())
|
|
delete portLock.takeFirst();
|
|
//! \todo Use a singleton destroyer instead
|
|
// http://www.research.ibm.com/designpatterns/pubs/ph-jun96.txt
|
|
delete PortManager::instance();
|
|
}
|
|
|
|
void MyService::getPortIdList(::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::Void* /*request*/,
|
|
::OstProto::PortIdList* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
// No locks are needed here because the list does not change
|
|
// and neither does the port_id
|
|
|
|
for (int i = 0; i < portInfo.size(); i++)
|
|
{
|
|
::OstProto::PortId *p;
|
|
|
|
p = response->add_port_id();
|
|
p->set_id(portInfo[i]->id());
|
|
}
|
|
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::getPortConfig(::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::PortIdList* request,
|
|
::OstProto::PortConfigList* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
for (int i = 0; i < request->port_id_size(); i++)
|
|
{
|
|
int id;
|
|
|
|
id = request->port_id(i).id();
|
|
if (id < portInfo.size())
|
|
{
|
|
OstProto::Port *p;
|
|
|
|
p = response->add_port();
|
|
portLock[id]->lockForRead();
|
|
portInfo[id]->protoDataCopyInto(p);
|
|
portLock[id]->unlock();
|
|
}
|
|
// XXX: no way to inform RPC caller of an invalid port!
|
|
}
|
|
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::PortConfigList* request,
|
|
::OstProto::Ack *response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
|
|
// notification needs to be on heap because signal/slot is across threads!
|
|
OstProto::Notification *notif = new OstProto::Notification;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
for (int i = 0; i < request->port_size(); i++)
|
|
{
|
|
OstProto::Port port;
|
|
int id;
|
|
|
|
port = request->port(i);
|
|
id = port.port_id().id();
|
|
if (id < portInfo.size())
|
|
{
|
|
bool dirty;
|
|
|
|
if (!portInfo[id]->canModify(port, &dirty)) {
|
|
qDebug("Port %d cannot be modified - stop tx and retry", id);
|
|
error = true;
|
|
notes += QString("Port %1 modify: operation disallowed on "
|
|
"transmitting port\n").arg(id);
|
|
continue;
|
|
}
|
|
|
|
portLock[id]->lockForWrite();
|
|
portInfo[id]->modify(port);
|
|
portLock[id]->unlock();
|
|
notif->mutable_port_id_list()->add_port_id()->set_id(id);
|
|
}
|
|
else {
|
|
error = true;
|
|
notes += QString("Port %1 modify: invalid port\n").arg(id);
|
|
}
|
|
}
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
|
|
if (notif->port_id_list().port_id_size()) {
|
|
notif->set_notif_type(OstProto::portConfigChanged);
|
|
emit notification(notif->notif_type(), SharedProtobufMessage(notif));
|
|
}
|
|
else
|
|
delete notif;
|
|
}
|
|
|
|
void MyService::getStreamIdList(::google::protobuf::RpcController* controller,
|
|
const ::OstProto::PortId* request,
|
|
::OstProto::StreamIdList* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
response->mutable_port_id()->set_id(portId);
|
|
portLock[portId]->lockForRead();
|
|
for (int i = 0; i < portInfo[portId]->streamCount(); i++)
|
|
{
|
|
OstProto::StreamId *s;
|
|
|
|
s = response->add_stream_id();
|
|
s->set_id(portInfo[portId]->streamAtIndex(i)->id());
|
|
}
|
|
portLock[portId]->unlock();
|
|
|
|
done->Run();
|
|
return;
|
|
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 get stream id list: invalid port")
|
|
.arg(portId).toStdString());
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::getStreamConfig(::google::protobuf::RpcController* controller,
|
|
const ::OstProto::StreamIdList* request,
|
|
::OstProto::StreamConfigList* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->port_id().id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
response->mutable_port_id()->set_id(portId);
|
|
portLock[portId]->lockForRead();
|
|
for (int i = 0; i < request->stream_id_size(); i++)
|
|
{
|
|
StreamBase *stream;
|
|
OstProto::Stream *s;
|
|
|
|
stream = portInfo[portId]->stream(request->stream_id(i).id());
|
|
if (!stream)
|
|
continue; //! XXX: no way to inform RPC caller of invalid stream id
|
|
|
|
s = response->add_stream();
|
|
stream->protoDataCopyInto(*s);
|
|
}
|
|
portLock[portId]->unlock();
|
|
|
|
done->Run();
|
|
return;
|
|
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 get stream config: invalid port")
|
|
.arg(portId).toStdString());
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::addStream(::google::protobuf::RpcController* controller,
|
|
const ::OstProto::StreamIdList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->port_id().id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
if (portInfo[portId]->isTransmitOn())
|
|
goto _port_busy;
|
|
|
|
portLock[portId]->lockForWrite();
|
|
for (int i = 0; i < request->stream_id_size(); i++)
|
|
{
|
|
StreamBase *stream;
|
|
|
|
// If stream with same id as in request exists already ==> error!!
|
|
stream = portInfo[portId]->stream(request->stream_id(i).id());
|
|
if (stream) {
|
|
error = true;
|
|
notes += QString("Port %1 Stream %2 add stream: "
|
|
"stream already exists\n")
|
|
.arg(portId).arg(request->stream_id(i).id());
|
|
continue;
|
|
}
|
|
|
|
// Append a new "default" stream - actual contents of the new stream is
|
|
// expected in a subsequent "modifyStream" request - set the stream id
|
|
// now itself however!!!
|
|
stream = new StreamBase(portId);
|
|
stream->setId(request->stream_id(i).id());
|
|
portInfo[portId]->addStream(stream);
|
|
}
|
|
portLock[portId]->unlock();
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
return;
|
|
|
|
_port_busy:
|
|
controller->SetFailed(QString("Port %1 add stream: operation disallowed on "
|
|
"transmitting port")
|
|
.arg(portId).toStdString());
|
|
goto _exit;
|
|
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 add stream: invalid port")
|
|
.arg(portId).toStdString());
|
|
_exit:
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::deleteStream(::google::protobuf::RpcController* controller,
|
|
const ::OstProto::StreamIdList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->port_id().id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
if (portInfo[portId]->isTransmitOn())
|
|
goto _port_busy;
|
|
|
|
portLock[portId]->lockForWrite();
|
|
for (int i = 0; i < request->stream_id_size(); i++) {
|
|
if (!portInfo[portId]->deleteStream(request->stream_id(i).id())) {
|
|
error = true;
|
|
notes += QString("Port %1 Stream %2 stream delete: "
|
|
"stream not found\n")
|
|
.arg(portId).arg(request->stream_id(i).id());
|
|
}
|
|
}
|
|
portLock[portId]->unlock();
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
return;
|
|
|
|
_port_busy:
|
|
controller->SetFailed(QString("Port %1 delete stream: operation disallowed " "on transmitting port")
|
|
.arg(portId).toStdString());
|
|
goto _exit;
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 delete stream: invalid port")
|
|
.arg(portId).toStdString());
|
|
_exit:
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::modifyStream(::google::protobuf::RpcController* controller,
|
|
const ::OstProto::StreamConfigList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->port_id().id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
if (portInfo[portId]->isTransmitOn())
|
|
goto _port_busy;
|
|
|
|
portLock[portId]->lockForWrite();
|
|
for (int i = 0; i < request->stream_size(); i++)
|
|
{
|
|
quint32 sid = request->stream(i).stream_id().id();
|
|
StreamBase *stream = portInfo[portId]->stream(sid);
|
|
if (stream)
|
|
{
|
|
stream->protoDataCopyFrom(request->stream(i));
|
|
portInfo[portId]->setDirty();
|
|
}
|
|
else {
|
|
error = true;
|
|
notes += QString("Port %1 Stream %2 modify stream: "
|
|
"stream not found\n").arg(portId).arg(sid);
|
|
}
|
|
}
|
|
portLock[portId]->unlock();
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
return;
|
|
|
|
_port_busy:
|
|
controller->SetFailed(QString("Port %1 modify stream: operation disallowed " "on transmitting port")
|
|
.arg(portId).toStdString());
|
|
goto _exit;
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 modify stream: invalid port")
|
|
.arg(portId).toStdString());
|
|
_exit:
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::startTransmit(::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::PortIdList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
// XXX: stream stats uses port tx duration to calculate per stream
|
|
// rates; tx duration is for the last tx run only - so stream stats
|
|
// should also correspond to the last run only.
|
|
// Hence clear stream stats before Tx.
|
|
// XXX: clear stream stats on ALL ports in user provided list before
|
|
// starting Tx on any of them
|
|
for (int i = 0; i < request->port_id_size(); i++)
|
|
{
|
|
int portId = request->port_id(i).id();
|
|
if ((portId < 0) || (portId >= portInfo.size())) {
|
|
error = true;
|
|
notes += QString("Port %1 start transmit: invalid port\n")
|
|
.arg(portId);
|
|
continue;
|
|
}
|
|
|
|
portLock[portId]->lockForWrite();
|
|
portInfo[portId]->resetStreamStatsAll();
|
|
portLock[portId]->unlock();
|
|
}
|
|
|
|
for (int i = 0; i < request->port_id_size(); i++)
|
|
{
|
|
int frameError = 0;
|
|
int portId = request->port_id(i).id();
|
|
|
|
if ((portId < 0) || (portId >= portInfo.size())) {
|
|
continue;
|
|
}
|
|
|
|
portLock[portId]->lockForWrite();
|
|
if (portInfo[portId]->isDirty())
|
|
frameError = portInfo[portId]->updatePacketList();
|
|
portInfo[portId]->startTransmit();
|
|
portLock[portId]->unlock();
|
|
if (frameError) {
|
|
error = true;
|
|
notes += frameValueErrorNotes(portId, frameError);
|
|
}
|
|
}
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::stopTransmit(::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::PortIdList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
for (int i = 0; i < request->port_id_size(); i++)
|
|
{
|
|
int portId;
|
|
|
|
portId = request->port_id(i).id();
|
|
if ((portId < 0) || (portId >= portInfo.size())) {
|
|
error = true;
|
|
notes += QString("Port %1 stop transmit: invalid port\n")
|
|
.arg(portId);
|
|
continue;
|
|
}
|
|
|
|
portLock[portId]->lockForWrite();
|
|
portInfo[portId]->stopTransmit();
|
|
portLock[portId]->unlock();
|
|
}
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::startCapture(::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::PortIdList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
for (int i = 0; i < request->port_id_size(); i++)
|
|
{
|
|
int portId;
|
|
|
|
portId = request->port_id(i).id();
|
|
if ((portId < 0) || (portId >= portInfo.size())) {
|
|
error = true;
|
|
notes += QString("Port %1 start capture: invalid port\n")
|
|
.arg(portId);
|
|
continue;
|
|
}
|
|
|
|
portLock[portId]->lockForWrite();
|
|
portInfo[portId]->startCapture();
|
|
portLock[portId]->unlock();
|
|
}
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::stopCapture(::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::PortIdList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
for (int i=0; i < request->port_id_size(); i++)
|
|
{
|
|
int portId;
|
|
|
|
portId = request->port_id(i).id();
|
|
if ((portId < 0) || (portId >= portInfo.size())) {
|
|
error = true;
|
|
notes += QString("Port %1 stop capture: invalid port\n")
|
|
.arg(portId);
|
|
continue;
|
|
}
|
|
|
|
portLock[portId]->lockForWrite();
|
|
portInfo[portId]->stopCapture();
|
|
portLock[portId]->unlock();
|
|
}
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::getCaptureBuffer(::google::protobuf::RpcController* controller,
|
|
const ::OstProto::PortId* request,
|
|
::OstProto::CaptureBuffer* /*response*/,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
portLock[portId]->lockForWrite();
|
|
portInfo[portId]->stopCapture();
|
|
static_cast<PbRpcController*>(controller)->setBinaryBlob(
|
|
portInfo[portId]->captureData());
|
|
portLock[portId]->unlock();
|
|
|
|
done->Run();
|
|
return;
|
|
|
|
_invalid_port:
|
|
controller->SetFailed("invalid portid");
|
|
controller->SetFailed(QString("Port %1 get capture buffer: invalid port")
|
|
.arg(portId).toStdString());
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::getStats(::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::PortIdList* request,
|
|
::OstProto::PortStatsList* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
//qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
for (int i = 0; i < request->port_id_size(); i++)
|
|
{
|
|
int portId;
|
|
AbstractPort::PortStats stats;
|
|
OstProto::PortStats *s;
|
|
OstProto::PortState *st;
|
|
|
|
portId = request->port_id(i).id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
continue; // XXX: no way to inform RPC caller of invalid port
|
|
|
|
s = response->add_port_stats();
|
|
s->mutable_port_id()->set_id(request->port_id(i).id());
|
|
|
|
st = s->mutable_state();
|
|
portLock[portId]->lockForRead();
|
|
st->set_link_state(portInfo[portId]->linkState());
|
|
st->set_is_transmit_on(portInfo[portId]->isTransmitOn());
|
|
st->set_is_capture_on(portInfo[portId]->isCaptureOn());
|
|
|
|
portInfo[portId]->stats(&stats);
|
|
portLock[portId]->unlock();
|
|
|
|
#if 0
|
|
if (portId == 2)
|
|
qDebug(">%llu", stats.rxPkts);
|
|
#endif
|
|
|
|
s->set_rx_pkts(stats.rxPkts);
|
|
s->set_rx_bytes(stats.rxBytes);
|
|
s->set_rx_pps(stats.rxPps);
|
|
s->set_rx_bps(stats.rxBps);
|
|
|
|
s->set_tx_pkts(stats.txPkts);
|
|
s->set_tx_bytes(stats.txBytes);
|
|
s->set_tx_pps(stats.txPps);
|
|
s->set_tx_bps(stats.txBps);
|
|
|
|
s->set_rx_drops(stats.rxDrops);
|
|
s->set_rx_errors(stats.rxErrors);
|
|
s->set_rx_fifo_errors(stats.rxFifoErrors);
|
|
s->set_rx_frame_errors(stats.rxFrameErrors);
|
|
}
|
|
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::clearStats(::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::PortIdList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
for (int i = 0; i < request->port_id_size(); i++)
|
|
{
|
|
int portId;
|
|
|
|
portId = request->port_id(i).id();
|
|
if ((portId < 0) || (portId >= portInfo.size())) {
|
|
error = true;
|
|
notes += QString("Port %1 clear statistics: invalid port\n")
|
|
.arg(portId);
|
|
continue;
|
|
}
|
|
|
|
portLock[portId]->lockForWrite();
|
|
portInfo[portId]->resetStats();
|
|
portLock[portId]->unlock();
|
|
}
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::getStreamStats(
|
|
::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::StreamGuidList* request,
|
|
::OstProto::StreamStatsList* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
for (int i = 0; i < request->port_id_list().port_id_size(); i++)
|
|
{
|
|
int portId;
|
|
|
|
portId = request->port_id_list().port_id(i).id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
continue; // XXX: no way to inform RPC caller of invalid port
|
|
|
|
portLock[portId]->lockForRead();
|
|
if (request->stream_guid_size())
|
|
for (int j = 0; j < request->stream_guid_size(); j++)
|
|
portInfo[portId]->streamStats(request->stream_guid(j).id(),
|
|
response);
|
|
else
|
|
portInfo[portId]->streamStatsAll(response);
|
|
portLock[portId]->unlock();
|
|
}
|
|
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::clearStreamStats(
|
|
::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::StreamGuidList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
for (int i = 0; i < request->port_id_list().port_id_size(); i++)
|
|
{
|
|
int portId;
|
|
|
|
portId = request->port_id_list().port_id(i).id();
|
|
if ((portId < 0) || (portId >= portInfo.size())) {
|
|
error = true;
|
|
notes += QString("Port %1 clear stream statistics: invalid port\n")
|
|
.arg(portId);
|
|
continue;
|
|
}
|
|
|
|
portLock[portId]->lockForWrite();
|
|
if (request->stream_guid_size())
|
|
for (int j = 0; j < request->stream_guid_size(); j++)
|
|
portInfo[portId]->resetStreamStats(
|
|
request->stream_guid(j).id());
|
|
else
|
|
portInfo[portId]->resetStreamStatsAll();
|
|
portLock[portId]->unlock();
|
|
}
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::checkVersion(::google::protobuf::RpcController* controller,
|
|
const ::OstProto::VersionInfo* request,
|
|
::OstProto::VersionCompatibility* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
QString myVersion(version);
|
|
QString clientVersion;
|
|
QStringList my, client;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
my = myVersion.split('.');
|
|
|
|
Q_ASSERT(my.size() >= 2);
|
|
|
|
clientVersion = QString::fromStdString(request->version());
|
|
client = clientVersion.split('.');
|
|
|
|
qDebug("client = %s, my = %s",
|
|
qPrintable(clientVersion), qPrintable(myVersion));
|
|
|
|
if (client.size() < 2)
|
|
goto _invalid_version;
|
|
|
|
// Compare only major and minor numbers
|
|
if (client[0] == my[0] && client[1] == my[1]) {
|
|
response->set_result(OstProto::VersionCompatibility::kCompatible);
|
|
static_cast<PbRpcController*>(controller)->EnableNotif(
|
|
request->client_name() == "python-ostinato" ? false : true);
|
|
}
|
|
else {
|
|
response->set_result(OstProto::VersionCompatibility::kIncompatible);
|
|
response->set_notes(QString("Drone needs controller version %1.%2.x")
|
|
.arg(my[0], my[1]).toStdString());
|
|
static_cast<PbRpcController*>(controller)->TriggerDisconnect();
|
|
}
|
|
|
|
done->Run();
|
|
return;
|
|
|
|
_invalid_version:
|
|
controller->SetFailed("invalid version information");
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::build(::google::protobuf::RpcController* controller,
|
|
const ::OstProto::BuildConfig* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
QString notes;
|
|
int portId;
|
|
int frameError = 0;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->port_id().id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
if (portInfo[portId]->isTransmitOn())
|
|
goto _port_busy;
|
|
|
|
portLock[portId]->lockForWrite();
|
|
if (portInfo[portId]->isDirty())
|
|
frameError = portInfo[portId]->updatePacketList();
|
|
portLock[portId]->unlock();
|
|
|
|
if (frameError) {
|
|
notes += frameValueErrorNotes(portId, frameError);
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
return;
|
|
|
|
_port_busy:
|
|
controller->SetFailed(QString("Port %1 build: operation disallowed " "on transmitting port")
|
|
.arg(portId).toStdString());
|
|
goto _exit;
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 build: invalid port")
|
|
.arg(portId).toStdString());
|
|
_exit:
|
|
done->Run();
|
|
}
|
|
|
|
/*
|
|
* ===================================================================
|
|
* Device Emulation
|
|
* ===================================================================
|
|
* XXX: Streams and Devices are largely non-overlapping from a RPC
|
|
* point of view but they *do* intersect e.g. when a stream is trying to
|
|
* find its associated device and info from that device such as src/dst
|
|
* mac addresses. For this reason, both set of RPCs share the common
|
|
* port level locking
|
|
* ===================================================================
|
|
*/
|
|
void MyService::getDeviceGroupIdList(
|
|
::google::protobuf::RpcController* controller,
|
|
const ::OstProto::PortId* request,
|
|
::OstProto::DeviceGroupIdList* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
DeviceManager *devMgr;
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
devMgr = portInfo[portId]->deviceManager();
|
|
|
|
response->mutable_port_id()->set_id(portId);
|
|
portLock[portId]->lockForRead();
|
|
for (int i = 0; i < devMgr->deviceGroupCount(); i++)
|
|
{
|
|
OstProto::DeviceGroupId *dgid;
|
|
|
|
dgid = response->add_device_group_id();
|
|
dgid->CopyFrom(devMgr->deviceGroupAtIndex(i)->device_group_id());
|
|
}
|
|
portLock[portId]->unlock();
|
|
|
|
done->Run();
|
|
return;
|
|
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 get device group id list: "
|
|
"invalid port").arg(portId).toStdString());
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::getDeviceGroupConfig(
|
|
::google::protobuf::RpcController* controller,
|
|
const ::OstProto::DeviceGroupIdList* request,
|
|
::OstProto::DeviceGroupConfigList* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
DeviceManager *devMgr;
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->port_id().id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
devMgr = portInfo[portId]->deviceManager();
|
|
|
|
response->mutable_port_id()->set_id(portId);
|
|
portLock[portId]->lockForRead();
|
|
for (int i = 0; i < request->device_group_id_size(); i++)
|
|
{
|
|
const OstProto::DeviceGroup *dg;
|
|
|
|
dg = devMgr->deviceGroup(request->device_group_id(i).id());
|
|
if (!dg)
|
|
continue; // XXX: no way to inform RPC caller of invalid dgid
|
|
|
|
response->add_device_group()->CopyFrom(*dg);
|
|
}
|
|
portLock[portId]->unlock();
|
|
|
|
done->Run();
|
|
return;
|
|
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 get device group config: "
|
|
"invalid port").arg(portId).toStdString());
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::addDeviceGroup(
|
|
::google::protobuf::RpcController* controller,
|
|
const ::OstProto::DeviceGroupIdList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
DeviceManager *devMgr;
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->port_id().id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
devMgr = portInfo[portId]->deviceManager();
|
|
|
|
if (portInfo[portId]->isTransmitOn())
|
|
goto _port_busy;
|
|
|
|
portLock[portId]->lockForWrite();
|
|
for (int i = 0; i < request->device_group_id_size(); i++)
|
|
{
|
|
quint32 id = request->device_group_id(i).id();
|
|
const OstProto::DeviceGroup *dg = devMgr->deviceGroup(id);
|
|
|
|
// If device group with same id as in request exists already ==> error!
|
|
if (dg) {
|
|
error = true;
|
|
notes += QString("Port %1 DeviceGroup %2 add device group: "
|
|
" device group already exists\n")
|
|
.arg(portId).arg(id);
|
|
continue;
|
|
}
|
|
|
|
devMgr->addDeviceGroup(id);
|
|
}
|
|
portLock[portId]->unlock();
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
return;
|
|
|
|
_port_busy:
|
|
controller->SetFailed(QString("Port %1 add device group: "
|
|
"operation disallowed on transmitting port")
|
|
.arg(portId).toStdString());
|
|
goto _exit;
|
|
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 add device group: "
|
|
"invalid port")
|
|
.arg(portId).toStdString());
|
|
_exit:
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::deleteDeviceGroup(
|
|
::google::protobuf::RpcController* controller,
|
|
const ::OstProto::DeviceGroupIdList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
DeviceManager *devMgr;
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->port_id().id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
devMgr = portInfo[portId]->deviceManager();
|
|
|
|
if (portInfo[portId]->isTransmitOn())
|
|
goto _port_busy;
|
|
|
|
portLock[portId]->lockForWrite();
|
|
for (int i = 0; i < request->device_group_id_size(); i++) {
|
|
quint32 id = request->device_group_id(i).id();
|
|
if (!devMgr->deleteDeviceGroup(id)) {
|
|
error = true;
|
|
notes += QString("Port %1 DeviceGroup %2 delete device group: "
|
|
"device group not found\n").arg(portId).arg(id);
|
|
}
|
|
}
|
|
portLock[portId]->unlock();
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
return;
|
|
|
|
_port_busy:
|
|
controller->SetFailed(QString("Port %1 delete device group: "
|
|
"operation disallowed on transmitting port")
|
|
.arg(portId).toStdString());
|
|
goto _exit;
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 delete device group: "
|
|
"invalid port").arg(portId).toStdString());
|
|
_exit:
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::modifyDeviceGroup(
|
|
::google::protobuf::RpcController* controller,
|
|
const ::OstProto::DeviceGroupConfigList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
DeviceManager *devMgr;
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->port_id().id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
devMgr = portInfo[portId]->deviceManager();
|
|
|
|
if (portInfo[portId]->isTransmitOn())
|
|
goto _port_busy;
|
|
|
|
portLock[portId]->lockForWrite();
|
|
for (int i = 0; i < request->device_group_size(); i++) {
|
|
quint32 id = request->device_group(i).device_group_id().id();
|
|
if (!devMgr->modifyDeviceGroup(&request->device_group(i))) {
|
|
error = true;
|
|
notes += QString("Port %1 DeviceGroup %2 modify device group: "
|
|
"device group not found\n").arg(portId).arg(id);
|
|
}
|
|
}
|
|
portLock[portId]->unlock();
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
return;
|
|
|
|
_port_busy:
|
|
controller->SetFailed(QString("Port %1 modify device group: "
|
|
"operation disallowed on transmitting port")
|
|
.arg(portId).toStdString());
|
|
goto _exit;
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 modify device group: "
|
|
"invalid port").arg(portId).toStdString());
|
|
_exit:
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::getDeviceList(
|
|
::google::protobuf::RpcController* controller,
|
|
const ::OstProto::PortId* request,
|
|
::OstProto::PortDeviceList* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
DeviceManager *devMgr;
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
devMgr = portInfo[portId]->deviceManager();
|
|
|
|
response->mutable_port_id()->set_id(portId);
|
|
portLock[portId]->lockForRead();
|
|
devMgr->getDeviceList(response);
|
|
portLock[portId]->unlock();
|
|
|
|
done->Run();
|
|
return;
|
|
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 get device list: "
|
|
"invalid port").arg(portId).toStdString());
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::resolveDeviceNeighbors(
|
|
::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::PortIdList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
for (int i = 0; i < request->port_id_size(); i++)
|
|
{
|
|
int portId;
|
|
|
|
portId = request->port_id(i).id();
|
|
if ((portId < 0) || (portId >= portInfo.size())) {
|
|
error = true;
|
|
notes += QString("Port %1 resolve device neighbors: "
|
|
"invalid port\n").arg(portId);
|
|
continue;
|
|
}
|
|
|
|
portLock[portId]->lockForWrite();
|
|
portInfo[portId]->resolveDeviceNeighbors();
|
|
portLock[portId]->unlock();
|
|
}
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else {
|
|
// XXX: allow time for ARP/ND to finish; if more time is required,
|
|
// the client should wait/check before invoking build()
|
|
QThread::msleep(500);
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
}
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::clearDeviceNeighbors(
|
|
::google::protobuf::RpcController* /*controller*/,
|
|
const ::OstProto::PortIdList* request,
|
|
::OstProto::Ack* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
bool error = false;
|
|
QString notes;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
for (int i = 0; i < request->port_id_size(); i++)
|
|
{
|
|
int portId;
|
|
|
|
portId = request->port_id(i).id();
|
|
if ((portId < 0) || (portId >= portInfo.size())) {
|
|
error = true;
|
|
notes += QString("Port %1 clear device neighbors: "
|
|
"invalid port\n").arg(portId);
|
|
continue;
|
|
}
|
|
|
|
portLock[portId]->lockForWrite();
|
|
portInfo[portId]->clearDeviceNeighbors();
|
|
portLock[portId]->unlock();
|
|
}
|
|
|
|
if (error) {
|
|
response->set_status(OstProto::Ack::kRpcError);
|
|
response->set_notes(notes.toStdString());
|
|
}
|
|
else
|
|
response->set_status(OstProto::Ack::kRpcSuccess);
|
|
done->Run();
|
|
}
|
|
|
|
void MyService::getDeviceNeighbors(
|
|
::google::protobuf::RpcController* controller,
|
|
const ::OstProto::PortId* request,
|
|
::OstProto::PortNeighborList* response,
|
|
::google::protobuf::Closure* done)
|
|
{
|
|
DeviceManager *devMgr;
|
|
int portId;
|
|
|
|
qDebug("In %s", __PRETTY_FUNCTION__);
|
|
|
|
portId = request->id();
|
|
if ((portId < 0) || (portId >= portInfo.size()))
|
|
goto _invalid_port;
|
|
|
|
devMgr = portInfo[portId]->deviceManager();
|
|
|
|
response->mutable_port_id()->set_id(portId);
|
|
portLock[portId]->lockForRead();
|
|
devMgr->getDeviceNeighbors(response);
|
|
portLock[portId]->unlock();
|
|
|
|
done->Run();
|
|
return;
|
|
|
|
_invalid_port:
|
|
controller->SetFailed(QString("Port %1 get device neighbors: "
|
|
"invalid port").arg(portId).toStdString());
|
|
done->Run();
|
|
}
|
|
|
|
QString MyService::frameValueErrorNotes(int portId, int error)
|
|
{
|
|
if (!error)
|
|
return QString();
|
|
|
|
QString pfx = QString("Port %1: ").arg(portId);
|
|
auto errorFlags = static_cast<FrameValueAttrib::ErrorFlags>(error);
|
|
|
|
if (errorFlags & FrameValueAttrib::OutOfMemoryError)
|
|
return pfx + "Error building packet buffers - out of buffer memory\n";
|
|
|
|
// If smac resolve fails, dmac will always fail - so check that first
|
|
// and report only that so as not to confuse users (they may not realize
|
|
// that without a source device, we have no ARP table to lookup for dmac)
|
|
|
|
if (errorFlags & FrameValueAttrib::UnresolvedSrcMacError)
|
|
return pfx + "Source mac resolve failed for one or more "
|
|
"streams - Device matching stream's source IP not found\n";
|
|
|
|
if (errorFlags & FrameValueAttrib::UnresolvedDstMacError)
|
|
return pfx + "Destination mac resolve failed for one or more "
|
|
"streams - possible ARP/ND failure\n";
|
|
|
|
return QString();
|
|
}
|
|
|
|
/*
|
|
* ===================================================================
|
|
* Friends
|
|
* TODO: Encap these global functions into a DeviceBroker singleton?
|
|
* ===================================================================
|
|
*/
|
|
quint64 getDeviceMacAddress(int portId, int streamId, int frameIndex)
|
|
{
|
|
MyService *service = drone->rpcService();
|
|
DeviceManager *devMgr = NULL;
|
|
quint64 mac;
|
|
|
|
if (!service)
|
|
return 0;
|
|
|
|
if ((portId >= 0) && (portId < service->portInfo.size()))
|
|
devMgr = service->portInfo[portId]->deviceManager();
|
|
|
|
if (!devMgr || !devMgr->deviceCount())
|
|
return 0;
|
|
|
|
/*
|
|
* FIXME: We don't need lockForWrite, only lockForRead here.
|
|
* However, this function is called in the following sequence
|
|
* modifyPort() --> updatePacketList() --> frameValue()
|
|
* where modifyPort has already taken a write lock. Qt allows
|
|
* recursive locks, but not of a different type, so we are
|
|
* forced to use lockForWrite here - till we find a different
|
|
* solution.
|
|
*/
|
|
service->portLock[portId]->lockForWrite();
|
|
mac = service->portInfo[portId]->deviceMacAddress(streamId, frameIndex);
|
|
service->portLock[portId]->unlock();
|
|
|
|
return mac;
|
|
}
|
|
|
|
quint64 getNeighborMacAddress(int portId, int streamId, int frameIndex)
|
|
{
|
|
MyService *service = drone->rpcService();
|
|
DeviceManager *devMgr = NULL;
|
|
quint64 mac;
|
|
|
|
if (!service)
|
|
return 0;
|
|
|
|
if ((portId >= 0) && (portId < service->portInfo.size()))
|
|
devMgr = service->portInfo[portId]->deviceManager();
|
|
|
|
if (!devMgr || !devMgr->deviceCount())
|
|
return 0;
|
|
|
|
/*
|
|
* FIXME: We don't need lockForWrite, only lockForRead here.
|
|
* See comment in getDeviceMacAddress() for more
|
|
*/
|
|
service->portLock[portId]->lockForWrite();
|
|
mac = service->portInfo[portId]->neighborMacAddress(streamId, frameIndex);
|
|
service->portLock[portId]->unlock();
|
|
|
|
return mac;
|
|
}
|