ostinato/server/myservice.cpp
Srivats P c70811eaa4 Fix spurious stream stats drops
The problem happens for bidirectional flows. The sequence of events is
as follows when you start Tx on Ports p1, p2 with the current code -

1. Clear stream stats on p1
2. Start tx on p1
3. Clear stream stats on p2
4. Start tx on p2

By the time #3 is executed, it may have already rx packets from p1 which
are being incorrectly cleared, this will cause these number of packets
to show up as dropped instead - incorrectly.

The fix is to change the order like this -

1. Clear stream stats on p1
2. Clear stream stats on p2
3. Start tx on p1
4. Start tx on p2

Unidirectional flows will not see this problem - as long as startTx is
done only on the Tx port and not the Rx port.

This bug is a regression caused due to the code changes introduced for the
stream stats rates feature implemented in 1.2.0
2023-02-08 16:34:03 +05:30

1354 lines
39 KiB
C++

/*
Copyright (C) 2010-2016 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#include "myservice.h"
#include "drone.h"
#if 0
#include <qglobal.h>
#include <qendian.h>
#include "qdebug.h"
#include "../common/protocollistiterator.h"
#include "../common/abstractprotocol.h"
#endif
#include "../common/framevalueattrib.h"
#include "../common/streambase.h"
#include "../rpc/pbrpccontroller.h"
#include "device.h"
#include "devicemanager.h"
#include "portmanager.h"
#include <QStringList>
#include <QThread>
extern Drone *drone;
extern char *version;
MyService::MyService()
{
PortManager *portManager = PortManager::instance();
int n = portManager->portCount();
for (int i = 0; i < n; i++) {
portInfo.append(portManager->port(i));
#if QT_VERSION >= 0x040400
portLock.append(new QReadWriteLock(QReadWriteLock::Recursive));
#else
portLock.append(new QReadWriteLock());
#endif
}
}
MyService::~MyService()
{
while (!portLock.isEmpty())
delete portLock.takeFirst();
//! \todo Use a singleton destroyer instead
// http://www.research.ibm.com/designpatterns/pubs/ph-jun96.txt
delete PortManager::instance();
}
void MyService::getPortIdList(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::Void* /*request*/,
::OstProto::PortIdList* response,
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
// No locks are needed here because the list does not change
// and neither does the port_id
for (int i = 0; i < portInfo.size(); i++)
{
::OstProto::PortId *p;
p = response->add_port_id();
p->set_id(portInfo[i]->id());
}
done->Run();
}
void MyService::getPortConfig(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::PortConfigList* response,
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int id;
id = request->port_id(i).id();
if (id < portInfo.size())
{
OstProto::Port *p;
p = response->add_port();
portLock[id]->lockForRead();
portInfo[id]->protoDataCopyInto(p);
portLock[id]->unlock();
}
// XXX: no way to inform RPC caller of an invalid port!
}
done->Run();
}
void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortConfigList* request,
::OstProto::Ack *response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
// notification needs to be on heap because signal/slot is across threads!
OstProto::Notification *notif = new OstProto::Notification;
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_size(); i++)
{
OstProto::Port port;
int id;
port = request->port(i);
id = port.port_id().id();
if (id < portInfo.size())
{
bool dirty;
if (!portInfo[id]->canModify(port, &dirty)) {
qDebug("Port %d cannot be modified - stop tx and retry", id);
error = true;
notes += QString("Port %1 modify: operation disallowed on "
"transmitting port\n").arg(id);
continue;
}
portLock[id]->lockForWrite();
portInfo[id]->modify(port);
portLock[id]->unlock();
notif->mutable_port_id_list()->add_port_id()->set_id(id);
}
else {
error = true;
notes += QString("Port %1 modify: invalid port\n").arg(id);
}
}
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
if (notif->port_id_list().port_id_size()) {
notif->set_notif_type(OstProto::portConfigChanged);
emit notification(notif->notif_type(), SharedProtobufMessage(notif));
}
else
delete notif;
}
void MyService::getStreamIdList(::google::protobuf::RpcController* controller,
const ::OstProto::PortId* request,
::OstProto::StreamIdList* response,
::google::protobuf::Closure* done)
{
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
response->mutable_port_id()->set_id(portId);
portLock[portId]->lockForRead();
for (int i = 0; i < portInfo[portId]->streamCount(); i++)
{
OstProto::StreamId *s;
s = response->add_stream_id();
s->set_id(portInfo[portId]->streamAtIndex(i)->id());
}
portLock[portId]->unlock();
done->Run();
return;
_invalid_port:
controller->SetFailed(QString("Port %1 get stream id list: invalid port")
.arg(portId).toStdString());
done->Run();
}
void MyService::getStreamConfig(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::StreamConfigList* response,
::google::protobuf::Closure* done)
{
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
response->mutable_port_id()->set_id(portId);
portLock[portId]->lockForRead();
for (int i = 0; i < request->stream_id_size(); i++)
{
StreamBase *stream;
OstProto::Stream *s;
stream = portInfo[portId]->stream(request->stream_id(i).id());
if (!stream)
continue; //! XXX: no way to inform RPC caller of invalid stream id
s = response->add_stream();
stream->protoDataCopyInto(*s);
}
portLock[portId]->unlock();
done->Run();
return;
_invalid_port:
controller->SetFailed(QString("Port %1 get stream config: invalid port")
.arg(portId).toStdString());
done->Run();
}
void MyService::addStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
if (portInfo[portId]->isTransmitOn())
goto _port_busy;
portLock[portId]->lockForWrite();
for (int i = 0; i < request->stream_id_size(); i++)
{
StreamBase *stream;
// If stream with same id as in request exists already ==> error!!
stream = portInfo[portId]->stream(request->stream_id(i).id());
if (stream) {
error = true;
notes += QString("Port %1 Stream %2 add stream: "
"stream already exists\n")
.arg(portId).arg(request->stream_id(i).id());
continue;
}
// Append a new "default" stream - actual contents of the new stream is
// expected in a subsequent "modifyStream" request - set the stream id
// now itself however!!!
stream = new StreamBase(portId);
stream->setId(request->stream_id(i).id());
portInfo[portId]->addStream(stream);
}
portLock[portId]->unlock();
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
return;
_port_busy:
controller->SetFailed(QString("Port %1 add stream: operation disallowed on "
"transmitting port")
.arg(portId).toStdString());
goto _exit;
_invalid_port:
controller->SetFailed(QString("Port %1 add stream: invalid port")
.arg(portId).toStdString());
_exit:
done->Run();
}
void MyService::deleteStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
if (portInfo[portId]->isTransmitOn())
goto _port_busy;
portLock[portId]->lockForWrite();
for (int i = 0; i < request->stream_id_size(); i++) {
if (!portInfo[portId]->deleteStream(request->stream_id(i).id())) {
error = true;
notes += QString("Port %1 Stream %2 stream delete: "
"stream not found\n")
.arg(portId).arg(request->stream_id(i).id());
}
}
portLock[portId]->unlock();
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
return;
_port_busy:
controller->SetFailed(QString("Port %1 delete stream: operation disallowed " "on transmitting port")
.arg(portId).toStdString());
goto _exit;
_invalid_port:
controller->SetFailed(QString("Port %1 delete stream: invalid port")
.arg(portId).toStdString());
_exit:
done->Run();
}
void MyService::modifyStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamConfigList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
if (portInfo[portId]->isTransmitOn())
goto _port_busy;
portLock[portId]->lockForWrite();
for (int i = 0; i < request->stream_size(); i++)
{
quint32 sid = request->stream(i).stream_id().id();
StreamBase *stream = portInfo[portId]->stream(sid);
if (stream)
{
stream->protoDataCopyFrom(request->stream(i));
portInfo[portId]->setDirty();
}
else {
error = true;
notes += QString("Port %1 Stream %2 modify stream: "
"stream not found\n").arg(portId).arg(sid);
}
}
portLock[portId]->unlock();
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
return;
_port_busy:
controller->SetFailed(QString("Port %1 modify stream: operation disallowed " "on transmitting port")
.arg(portId).toStdString());
goto _exit;
_invalid_port:
controller->SetFailed(QString("Port %1 modify stream: invalid port")
.arg(portId).toStdString());
_exit:
done->Run();
}
void MyService::startTransmit(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
qDebug("In %s", __PRETTY_FUNCTION__);
// XXX: stream stats uses port tx duration to calculate per stream
// rates; tx duration is for the last tx run only - so stream stats
// should also correspond to the last run only.
// Hence clear stream stats before Tx.
// XXX: clear stream stats on ALL ports in user provided list before
// starting Tx on any of them
for (int i = 0; i < request->port_id_size(); i++)
{
int portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size())) {
error = true;
notes += QString("Port %1 start transmit: invalid port\n")
.arg(portId);
continue;
}
portLock[portId]->lockForWrite();
portInfo[portId]->resetStreamStatsAll();
portLock[portId]->unlock();
}
for (int i = 0; i < request->port_id_size(); i++)
{
int frameError = 0;
int portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size())) {
continue;
}
portLock[portId]->lockForWrite();
if (portInfo[portId]->isDirty())
frameError = portInfo[portId]->updatePacketList();
portInfo[portId]->startTransmit();
portLock[portId]->unlock();
if (frameError) {
error = true;
notes += frameValueErrorNotes(portId, frameError);
}
}
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
}
void MyService::stopTransmit(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size())) {
error = true;
notes += QString("Port %1 stop transmit: invalid port\n")
.arg(portId);
continue;
}
portLock[portId]->lockForWrite();
portInfo[portId]->stopTransmit();
portLock[portId]->unlock();
}
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
}
void MyService::startCapture(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size())) {
error = true;
notes += QString("Port %1 start capture: invalid port\n")
.arg(portId);
continue;
}
portLock[portId]->lockForWrite();
portInfo[portId]->startCapture();
portLock[portId]->unlock();
}
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
}
void MyService::stopCapture(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i=0; i < request->port_id_size(); i++)
{
int portId;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size())) {
error = true;
notes += QString("Port %1 stop capture: invalid port\n")
.arg(portId);
continue;
}
portLock[portId]->lockForWrite();
portInfo[portId]->stopCapture();
portLock[portId]->unlock();
}
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
}
void MyService::getCaptureBuffer(::google::protobuf::RpcController* controller,
const ::OstProto::PortId* request,
::OstProto::CaptureBuffer* /*response*/,
::google::protobuf::Closure* done)
{
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
portLock[portId]->lockForWrite();
portInfo[portId]->stopCapture();
static_cast<PbRpcController*>(controller)->setBinaryBlob(
portInfo[portId]->captureData());
portLock[portId]->unlock();
done->Run();
return;
_invalid_port:
controller->SetFailed("invalid portid");
controller->SetFailed(QString("Port %1 get capture buffer: invalid port")
.arg(portId).toStdString());
done->Run();
}
void MyService::getStats(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::PortStatsList* response,
::google::protobuf::Closure* done)
{
//qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
AbstractPort::PortStats stats;
OstProto::PortStats *s;
OstProto::PortState *st;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size()))
continue; // XXX: no way to inform RPC caller of invalid port
s = response->add_port_stats();
s->mutable_port_id()->set_id(request->port_id(i).id());
st = s->mutable_state();
portLock[portId]->lockForRead();
st->set_link_state(portInfo[portId]->linkState());
st->set_is_transmit_on(portInfo[portId]->isTransmitOn());
st->set_is_capture_on(portInfo[portId]->isCaptureOn());
portInfo[portId]->stats(&stats);
portLock[portId]->unlock();
#if 0
if (portId == 2)
qDebug(">%llu", stats.rxPkts);
#endif
s->set_rx_pkts(stats.rxPkts);
s->set_rx_bytes(stats.rxBytes);
s->set_rx_pps(stats.rxPps);
s->set_rx_bps(stats.rxBps);
s->set_tx_pkts(stats.txPkts);
s->set_tx_bytes(stats.txBytes);
s->set_tx_pps(stats.txPps);
s->set_tx_bps(stats.txBps);
s->set_rx_drops(stats.rxDrops);
s->set_rx_errors(stats.rxErrors);
s->set_rx_fifo_errors(stats.rxFifoErrors);
s->set_rx_frame_errors(stats.rxFrameErrors);
}
done->Run();
}
void MyService::clearStats(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size())) {
error = true;
notes += QString("Port %1 clear statistics: invalid port\n")
.arg(portId);
continue;
}
portLock[portId]->lockForWrite();
portInfo[portId]->resetStats();
portLock[portId]->unlock();
}
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
}
void MyService::getStreamStats(
::google::protobuf::RpcController* /*controller*/,
const ::OstProto::StreamGuidList* request,
::OstProto::StreamStatsList* response,
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_list().port_id_size(); i++)
{
int portId;
portId = request->port_id_list().port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size()))
continue; // XXX: no way to inform RPC caller of invalid port
portLock[portId]->lockForRead();
if (request->stream_guid_size())
for (int j = 0; j < request->stream_guid_size(); j++)
portInfo[portId]->streamStats(request->stream_guid(j).id(),
response);
else
portInfo[portId]->streamStatsAll(response);
portLock[portId]->unlock();
}
done->Run();
}
void MyService::clearStreamStats(
::google::protobuf::RpcController* /*controller*/,
const ::OstProto::StreamGuidList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_list().port_id_size(); i++)
{
int portId;
portId = request->port_id_list().port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size())) {
error = true;
notes += QString("Port %1 clear stream statistics: invalid port\n")
.arg(portId);
continue;
}
portLock[portId]->lockForWrite();
if (request->stream_guid_size())
for (int j = 0; j < request->stream_guid_size(); j++)
portInfo[portId]->resetStreamStats(
request->stream_guid(j).id());
else
portInfo[portId]->resetStreamStatsAll();
portLock[portId]->unlock();
}
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
}
void MyService::checkVersion(::google::protobuf::RpcController* controller,
const ::OstProto::VersionInfo* request,
::OstProto::VersionCompatibility* response,
::google::protobuf::Closure* done)
{
QString myVersion(version);
QString clientVersion;
QStringList my, client;
qDebug("In %s", __PRETTY_FUNCTION__);
my = myVersion.split('.');
Q_ASSERT(my.size() >= 2);
clientVersion = QString::fromStdString(request->version());
client = clientVersion.split('.');
qDebug("client = %s, my = %s",
qPrintable(clientVersion), qPrintable(myVersion));
if (client.size() < 2)
goto _invalid_version;
// Compare only major and minor numbers
if (client[0] == my[0] && client[1] == my[1]) {
response->set_result(OstProto::VersionCompatibility::kCompatible);
static_cast<PbRpcController*>(controller)->EnableNotif(
request->client_name() == "python-ostinato" ? false : true);
}
else {
response->set_result(OstProto::VersionCompatibility::kIncompatible);
response->set_notes(QString("Drone needs controller version %1.%2.x")
.arg(my[0], my[1]).toStdString());
static_cast<PbRpcController*>(controller)->TriggerDisconnect();
}
done->Run();
return;
_invalid_version:
controller->SetFailed("invalid version information");
done->Run();
}
void MyService::build(::google::protobuf::RpcController* controller,
const ::OstProto::BuildConfig* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
QString notes;
int portId;
int frameError = 0;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
if (portInfo[portId]->isTransmitOn())
goto _port_busy;
portLock[portId]->lockForWrite();
if (portInfo[portId]->isDirty())
frameError = portInfo[portId]->updatePacketList();
portLock[portId]->unlock();
if (frameError) {
notes += frameValueErrorNotes(portId, frameError);
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
return;
_port_busy:
controller->SetFailed(QString("Port %1 build: operation disallowed " "on transmitting port")
.arg(portId).toStdString());
goto _exit;
_invalid_port:
controller->SetFailed(QString("Port %1 build: invalid port")
.arg(portId).toStdString());
_exit:
done->Run();
}
/*
* ===================================================================
* Device Emulation
* ===================================================================
* XXX: Streams and Devices are largely non-overlapping from a RPC
* point of view but they *do* intersect e.g. when a stream is trying to
* find its associated device and info from that device such as src/dst
* mac addresses. For this reason, both set of RPCs share the common
* port level locking
* ===================================================================
*/
void MyService::getDeviceGroupIdList(
::google::protobuf::RpcController* controller,
const ::OstProto::PortId* request,
::OstProto::DeviceGroupIdList* response,
::google::protobuf::Closure* done)
{
DeviceManager *devMgr;
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
devMgr = portInfo[portId]->deviceManager();
response->mutable_port_id()->set_id(portId);
portLock[portId]->lockForRead();
for (int i = 0; i < devMgr->deviceGroupCount(); i++)
{
OstProto::DeviceGroupId *dgid;
dgid = response->add_device_group_id();
dgid->CopyFrom(devMgr->deviceGroupAtIndex(i)->device_group_id());
}
portLock[portId]->unlock();
done->Run();
return;
_invalid_port:
controller->SetFailed(QString("Port %1 get device group id list: "
"invalid port").arg(portId).toStdString());
done->Run();
}
void MyService::getDeviceGroupConfig(
::google::protobuf::RpcController* controller,
const ::OstProto::DeviceGroupIdList* request,
::OstProto::DeviceGroupConfigList* response,
::google::protobuf::Closure* done)
{
DeviceManager *devMgr;
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
devMgr = portInfo[portId]->deviceManager();
response->mutable_port_id()->set_id(portId);
portLock[portId]->lockForRead();
for (int i = 0; i < request->device_group_id_size(); i++)
{
const OstProto::DeviceGroup *dg;
dg = devMgr->deviceGroup(request->device_group_id(i).id());
if (!dg)
continue; // XXX: no way to inform RPC caller of invalid dgid
response->add_device_group()->CopyFrom(*dg);
}
portLock[portId]->unlock();
done->Run();
return;
_invalid_port:
controller->SetFailed(QString("Port %1 get device group config: "
"invalid port").arg(portId).toStdString());
done->Run();
}
void MyService::addDeviceGroup(
::google::protobuf::RpcController* controller,
const ::OstProto::DeviceGroupIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
DeviceManager *devMgr;
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
devMgr = portInfo[portId]->deviceManager();
if (portInfo[portId]->isTransmitOn())
goto _port_busy;
portLock[portId]->lockForWrite();
for (int i = 0; i < request->device_group_id_size(); i++)
{
quint32 id = request->device_group_id(i).id();
const OstProto::DeviceGroup *dg = devMgr->deviceGroup(id);
// If device group with same id as in request exists already ==> error!
if (dg) {
error = true;
notes += QString("Port %1 DeviceGroup %2 add device group: "
" device group already exists\n")
.arg(portId).arg(id);
continue;
}
devMgr->addDeviceGroup(id);
}
portLock[portId]->unlock();
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
return;
_port_busy:
controller->SetFailed(QString("Port %1 add device group: "
"operation disallowed on transmitting port")
.arg(portId).toStdString());
goto _exit;
_invalid_port:
controller->SetFailed(QString("Port %1 add device group: "
"invalid port")
.arg(portId).toStdString());
_exit:
done->Run();
}
void MyService::deleteDeviceGroup(
::google::protobuf::RpcController* controller,
const ::OstProto::DeviceGroupIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
DeviceManager *devMgr;
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
devMgr = portInfo[portId]->deviceManager();
if (portInfo[portId]->isTransmitOn())
goto _port_busy;
portLock[portId]->lockForWrite();
for (int i = 0; i < request->device_group_id_size(); i++) {
quint32 id = request->device_group_id(i).id();
if (!devMgr->deleteDeviceGroup(id)) {
error = true;
notes += QString("Port %1 DeviceGroup %2 delete device group: "
"device group not found\n").arg(portId).arg(id);
}
}
portLock[portId]->unlock();
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
return;
_port_busy:
controller->SetFailed(QString("Port %1 delete device group: "
"operation disallowed on transmitting port")
.arg(portId).toStdString());
goto _exit;
_invalid_port:
controller->SetFailed(QString("Port %1 delete device group: "
"invalid port").arg(portId).toStdString());
_exit:
done->Run();
}
void MyService::modifyDeviceGroup(
::google::protobuf::RpcController* controller,
const ::OstProto::DeviceGroupConfigList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
DeviceManager *devMgr;
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
devMgr = portInfo[portId]->deviceManager();
if (portInfo[portId]->isTransmitOn())
goto _port_busy;
portLock[portId]->lockForWrite();
for (int i = 0; i < request->device_group_size(); i++) {
quint32 id = request->device_group(i).device_group_id().id();
if (!devMgr->modifyDeviceGroup(&request->device_group(i))) {
error = true;
notes += QString("Port %1 DeviceGroup %2 modify device group: "
"device group not found\n").arg(portId).arg(id);
}
}
portLock[portId]->unlock();
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
return;
_port_busy:
controller->SetFailed(QString("Port %1 modify device group: "
"operation disallowed on transmitting port")
.arg(portId).toStdString());
goto _exit;
_invalid_port:
controller->SetFailed(QString("Port %1 modify device group: "
"invalid port").arg(portId).toStdString());
_exit:
done->Run();
}
void MyService::getDeviceList(
::google::protobuf::RpcController* controller,
const ::OstProto::PortId* request,
::OstProto::PortDeviceList* response,
::google::protobuf::Closure* done)
{
DeviceManager *devMgr;
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
devMgr = portInfo[portId]->deviceManager();
response->mutable_port_id()->set_id(portId);
portLock[portId]->lockForRead();
devMgr->getDeviceList(response);
portLock[portId]->unlock();
done->Run();
return;
_invalid_port:
controller->SetFailed(QString("Port %1 get device list: "
"invalid port").arg(portId).toStdString());
done->Run();
}
void MyService::resolveDeviceNeighbors(
::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size())) {
error = true;
notes += QString("Port %1 resolve device neighbors: "
"invalid port\n").arg(portId);
continue;
}
portLock[portId]->lockForWrite();
portInfo[portId]->resolveDeviceNeighbors();
portLock[portId]->unlock();
}
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else {
// XXX: allow time for ARP/ND to finish; if more time is required,
// the client should wait/check before invoking build()
QThread::msleep(500);
response->set_status(OstProto::Ack::kRpcSuccess);
}
done->Run();
}
void MyService::clearDeviceNeighbors(
::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
bool error = false;
QString notes;
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size())) {
error = true;
notes += QString("Port %1 clear device neighbors: "
"invalid port\n").arg(portId);
continue;
}
portLock[portId]->lockForWrite();
portInfo[portId]->clearDeviceNeighbors();
portLock[portId]->unlock();
}
if (error) {
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
}
void MyService::getDeviceNeighbors(
::google::protobuf::RpcController* controller,
const ::OstProto::PortId* request,
::OstProto::PortNeighborList* response,
::google::protobuf::Closure* done)
{
DeviceManager *devMgr;
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
devMgr = portInfo[portId]->deviceManager();
response->mutable_port_id()->set_id(portId);
portLock[portId]->lockForRead();
devMgr->getDeviceNeighbors(response);
portLock[portId]->unlock();
done->Run();
return;
_invalid_port:
controller->SetFailed(QString("Port %1 get device neighbors: "
"invalid port").arg(portId).toStdString());
done->Run();
}
QString MyService::frameValueErrorNotes(int portId, int error)
{
if (!error)
return QString();
QString pfx = QString("Port %1: ").arg(portId);
auto errorFlags = static_cast<FrameValueAttrib::ErrorFlags>(error);
if (errorFlags & FrameValueAttrib::OutOfMemoryError)
return pfx + "Error building packet buffers - out of buffer memory\n";
// If smac resolve fails, dmac will always fail - so check that first
// and report only that so as not to confuse users (they may not realize
// that without a source device, we have no ARP table to lookup for dmac)
if (errorFlags & FrameValueAttrib::UnresolvedSrcMacError)
return pfx + "Source mac resolve failed for one or more "
"streams - Device matching stream's source IP not found\n";
if (errorFlags & FrameValueAttrib::UnresolvedDstMacError)
return pfx + "Destination mac resolve failed for one or more "
"streams - possible ARP/ND failure\n";
return QString();
}
/*
* ===================================================================
* Friends
* TODO: Encap these global functions into a DeviceBroker singleton?
* ===================================================================
*/
quint64 getDeviceMacAddress(int portId, int streamId, int frameIndex)
{
MyService *service = drone->rpcService();
DeviceManager *devMgr = NULL;
quint64 mac;
if (!service)
return 0;
if ((portId >= 0) && (portId < service->portInfo.size()))
devMgr = service->portInfo[portId]->deviceManager();
if (!devMgr || !devMgr->deviceCount())
return 0;
/*
* FIXME: We don't need lockForWrite, only lockForRead here.
* However, this function is called in the following sequence
* modifyPort() --> updatePacketList() --> frameValue()
* where modifyPort has already taken a write lock. Qt allows
* recursive locks, but not of a different type, so we are
* forced to use lockForWrite here - till we find a different
* solution.
*/
service->portLock[portId]->lockForWrite();
mac = service->portInfo[portId]->deviceMacAddress(streamId, frameIndex);
service->portLock[portId]->unlock();
return mac;
}
quint64 getNeighborMacAddress(int portId, int streamId, int frameIndex)
{
MyService *service = drone->rpcService();
DeviceManager *devMgr = NULL;
quint64 mac;
if (!service)
return 0;
if ((portId >= 0) && (portId < service->portInfo.size()))
devMgr = service->portInfo[portId]->deviceManager();
if (!devMgr || !devMgr->deviceCount())
return 0;
/*
* FIXME: We don't need lockForWrite, only lockForRead here.
* See comment in getDeviceMacAddress() for more
*/
service->portLock[portId]->lockForWrite();
mac = service->portInfo[portId]->neighborMacAddress(streamId, frameIndex);
service->portLock[portId]->unlock();
return mac;
}