/* Copyright (C) 2010-2016 Srivats P. This file is part of "Ostinato" This is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ #include "myservice.h" #include "drone.h" #if 0 #include #include #include "qdebug.h" #include "../common/protocollistiterator.h" #include "../common/abstractprotocol.h" #endif #include "../common/framevalueattrib.h" #include "../common/streambase.h" #include "../rpc/pbrpccontroller.h" #include "device.h" #include "devicemanager.h" #include "portmanager.h" #include #include extern Drone *drone; extern char *version; MyService::MyService() { PortManager *portManager = PortManager::instance(); int n = portManager->portCount(); for (int i = 0; i < n; i++) { portInfo.append(portManager->port(i)); #if QT_VERSION >= 0x040400 portLock.append(new QReadWriteLock(QReadWriteLock::Recursive)); #else portLock.append(new QReadWriteLock()); #endif } } MyService::~MyService() { while (!portLock.isEmpty()) delete portLock.takeFirst(); //! \todo Use a singleton destroyer instead // http://www.research.ibm.com/designpatterns/pubs/ph-jun96.txt delete PortManager::instance(); } void MyService::getPortIdList(::google::protobuf::RpcController* /*controller*/, const ::OstProto::Void* /*request*/, ::OstProto::PortIdList* response, ::google::protobuf::Closure* done) { qDebug("In %s", __PRETTY_FUNCTION__); // No locks are needed here because the list does not change // and neither does the port_id for (int i = 0; i < portInfo.size(); i++) { ::OstProto::PortId *p; p = response->add_port_id(); p->set_id(portInfo[i]->id()); } done->Run(); } void MyService::getPortConfig(::google::protobuf::RpcController* /*controller*/, const ::OstProto::PortIdList* request, ::OstProto::PortConfigList* response, ::google::protobuf::Closure* done) { qDebug("In %s", __PRETTY_FUNCTION__); for (int i = 0; i < request->port_id_size(); i++) { int id; id = request->port_id(i).id(); if (id < portInfo.size()) { OstProto::Port *p; p = response->add_port(); portLock[id]->lockForRead(); portInfo[id]->protoDataCopyInto(p); portLock[id]->unlock(); } // XXX: no way to inform RPC caller of an invalid port! } done->Run(); } void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/, const ::OstProto::PortConfigList* request, ::OstProto::Ack *response, ::google::protobuf::Closure* done) { bool error = false; QString notes; // notification needs to be on heap because signal/slot is across threads! OstProto::Notification *notif = new OstProto::Notification; qDebug("In %s", __PRETTY_FUNCTION__); for (int i = 0; i < request->port_size(); i++) { OstProto::Port port; int id; port = request->port(i); id = port.port_id().id(); if (id < portInfo.size()) { bool dirty; if (!portInfo[id]->canModify(port, &dirty)) { qDebug("Port %d cannot be modified - stop tx and retry", id); error = true; notes += QString("Port %1 modify: operation disallowed on " "transmitting port\n").arg(id); continue; } portLock[id]->lockForWrite(); portInfo[id]->modify(port); portLock[id]->unlock(); notif->mutable_port_id_list()->add_port_id()->set_id(id); } else { error = true; notes += QString("Port %1 modify: invalid port\n").arg(id); } } if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); if (notif->port_id_list().port_id_size()) { notif->set_notif_type(OstProto::portConfigChanged); emit notification(notif->notif_type(), SharedProtobufMessage(notif)); } else delete notif; } void MyService::getStreamIdList(::google::protobuf::RpcController* controller, const ::OstProto::PortId* request, ::OstProto::StreamIdList* response, ::google::protobuf::Closure* done) { int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; response->mutable_port_id()->set_id(portId); portLock[portId]->lockForRead(); for (int i = 0; i < portInfo[portId]->streamCount(); i++) { OstProto::StreamId *s; s = response->add_stream_id(); s->set_id(portInfo[portId]->streamAtIndex(i)->id()); } portLock[portId]->unlock(); done->Run(); return; _invalid_port: controller->SetFailed(QString("Port %1 get stream id list: invalid port") .arg(portId).toStdString()); done->Run(); } void MyService::getStreamConfig(::google::protobuf::RpcController* controller, const ::OstProto::StreamIdList* request, ::OstProto::StreamConfigList* response, ::google::protobuf::Closure* done) { int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->port_id().id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; response->mutable_port_id()->set_id(portId); portLock[portId]->lockForRead(); for (int i = 0; i < request->stream_id_size(); i++) { StreamBase *stream; OstProto::Stream *s; stream = portInfo[portId]->stream(request->stream_id(i).id()); if (!stream) continue; //! XXX: no way to inform RPC caller of invalid stream id s = response->add_stream(); stream->protoDataCopyInto(*s); } portLock[portId]->unlock(); done->Run(); return; _invalid_port: controller->SetFailed(QString("Port %1 get stream config: invalid port") .arg(portId).toStdString()); done->Run(); } void MyService::addStream(::google::protobuf::RpcController* controller, const ::OstProto::StreamIdList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->port_id().id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; if (portInfo[portId]->isTransmitOn()) goto _port_busy; portLock[portId]->lockForWrite(); for (int i = 0; i < request->stream_id_size(); i++) { StreamBase *stream; // If stream with same id as in request exists already ==> error!! stream = portInfo[portId]->stream(request->stream_id(i).id()); if (stream) { error = true; notes += QString("Port %1 Stream %2 add stream: " "stream already exists\n") .arg(portId).arg(request->stream_id(i).id()); continue; } // Append a new "default" stream - actual contents of the new stream is // expected in a subsequent "modifyStream" request - set the stream id // now itself however!!! stream = new StreamBase(portId); stream->setId(request->stream_id(i).id()); portInfo[portId]->addStream(stream); } portLock[portId]->unlock(); if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); return; _port_busy: controller->SetFailed(QString("Port %1 add stream: operation disallowed on " "transmitting port") .arg(portId).toStdString()); goto _exit; _invalid_port: controller->SetFailed(QString("Port %1 add stream: invalid port") .arg(portId).toStdString()); _exit: done->Run(); } void MyService::deleteStream(::google::protobuf::RpcController* controller, const ::OstProto::StreamIdList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->port_id().id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; if (portInfo[portId]->isTransmitOn()) goto _port_busy; portLock[portId]->lockForWrite(); for (int i = 0; i < request->stream_id_size(); i++) { if (!portInfo[portId]->deleteStream(request->stream_id(i).id())) { error = true; notes += QString("Port %1 Stream %2 stream delete: " "stream not found\n") .arg(portId).arg(request->stream_id(i).id()); } } portLock[portId]->unlock(); if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); return; _port_busy: controller->SetFailed(QString("Port %1 delete stream: operation disallowed " "on transmitting port") .arg(portId).toStdString()); goto _exit; _invalid_port: controller->SetFailed(QString("Port %1 delete stream: invalid port") .arg(portId).toStdString()); _exit: done->Run(); } void MyService::modifyStream(::google::protobuf::RpcController* controller, const ::OstProto::StreamConfigList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->port_id().id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; if (portInfo[portId]->isTransmitOn()) goto _port_busy; portLock[portId]->lockForWrite(); for (int i = 0; i < request->stream_size(); i++) { quint32 sid = request->stream(i).stream_id().id(); StreamBase *stream = portInfo[portId]->stream(sid); if (stream) { stream->protoDataCopyFrom(request->stream(i)); portInfo[portId]->setDirty(); } else { error = true; notes += QString("Port %1 Stream %2 modify stream: " "stream not found\n").arg(portId).arg(sid); } } portLock[portId]->unlock(); if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); return; _port_busy: controller->SetFailed(QString("Port %1 modify stream: operation disallowed " "on transmitting port") .arg(portId).toStdString()); goto _exit; _invalid_port: controller->SetFailed(QString("Port %1 modify stream: invalid port") .arg(portId).toStdString()); _exit: done->Run(); } void MyService::startTransmit(::google::protobuf::RpcController* /*controller*/, const ::OstProto::PortIdList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; qDebug("In %s", __PRETTY_FUNCTION__); // XXX: stream stats uses port tx duration to calculate per stream // rates; tx duration is for the last tx run only - so stream stats // should also correspond to the last run only. // Hence clear stream stats before Tx. // XXX: clear stream stats on ALL ports in user provided list before // starting Tx on any of them for (int i = 0; i < request->port_id_size(); i++) { int portId = request->port_id(i).id(); if ((portId < 0) || (portId >= portInfo.size())) { error = true; notes += QString("Port %1 start transmit: invalid port\n") .arg(portId); continue; } portLock[portId]->lockForWrite(); portInfo[portId]->resetStreamStatsAll(); portLock[portId]->unlock(); } for (int i = 0; i < request->port_id_size(); i++) { int frameError = 0; int portId = request->port_id(i).id(); if ((portId < 0) || (portId >= portInfo.size())) { continue; } portLock[portId]->lockForWrite(); if (portInfo[portId]->isDirty()) frameError = portInfo[portId]->updatePacketList(); portInfo[portId]->startTransmit(); portLock[portId]->unlock(); if (frameError) { error = true; notes += frameValueErrorNotes(portId, frameError); } } if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); } void MyService::stopTransmit(::google::protobuf::RpcController* /*controller*/, const ::OstProto::PortIdList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; qDebug("In %s", __PRETTY_FUNCTION__); for (int i = 0; i < request->port_id_size(); i++) { int portId; portId = request->port_id(i).id(); if ((portId < 0) || (portId >= portInfo.size())) { error = true; notes += QString("Port %1 stop transmit: invalid port\n") .arg(portId); continue; } portLock[portId]->lockForWrite(); portInfo[portId]->stopTransmit(); portLock[portId]->unlock(); } if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); } void MyService::startCapture(::google::protobuf::RpcController* /*controller*/, const ::OstProto::PortIdList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; qDebug("In %s", __PRETTY_FUNCTION__); for (int i = 0; i < request->port_id_size(); i++) { int portId; portId = request->port_id(i).id(); if ((portId < 0) || (portId >= portInfo.size())) { error = true; notes += QString("Port %1 start capture: invalid port\n") .arg(portId); continue; } portLock[portId]->lockForWrite(); portInfo[portId]->startCapture(); portLock[portId]->unlock(); } if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); } void MyService::stopCapture(::google::protobuf::RpcController* /*controller*/, const ::OstProto::PortIdList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; qDebug("In %s", __PRETTY_FUNCTION__); for (int i=0; i < request->port_id_size(); i++) { int portId; portId = request->port_id(i).id(); if ((portId < 0) || (portId >= portInfo.size())) { error = true; notes += QString("Port %1 stop capture: invalid port\n") .arg(portId); continue; } portLock[portId]->lockForWrite(); portInfo[portId]->stopCapture(); portLock[portId]->unlock(); } if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); } void MyService::getCaptureBuffer(::google::protobuf::RpcController* controller, const ::OstProto::PortId* request, ::OstProto::CaptureBuffer* /*response*/, ::google::protobuf::Closure* done) { int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; portLock[portId]->lockForWrite(); portInfo[portId]->stopCapture(); static_cast(controller)->setBinaryBlob( portInfo[portId]->captureData()); portLock[portId]->unlock(); done->Run(); return; _invalid_port: controller->SetFailed("invalid portid"); controller->SetFailed(QString("Port %1 get capture buffer: invalid port") .arg(portId).toStdString()); done->Run(); } void MyService::getStats(::google::protobuf::RpcController* /*controller*/, const ::OstProto::PortIdList* request, ::OstProto::PortStatsList* response, ::google::protobuf::Closure* done) { //qDebug("In %s", __PRETTY_FUNCTION__); for (int i = 0; i < request->port_id_size(); i++) { int portId; AbstractPort::PortStats stats; OstProto::PortStats *s; OstProto::PortState *st; portId = request->port_id(i).id(); if ((portId < 0) || (portId >= portInfo.size())) continue; // XXX: no way to inform RPC caller of invalid port s = response->add_port_stats(); s->mutable_port_id()->set_id(request->port_id(i).id()); st = s->mutable_state(); portLock[portId]->lockForRead(); st->set_link_state(portInfo[portId]->linkState()); st->set_is_transmit_on(portInfo[portId]->isTransmitOn()); st->set_is_capture_on(portInfo[portId]->isCaptureOn()); portInfo[portId]->stats(&stats); portLock[portId]->unlock(); #if 0 if (portId == 2) qDebug(">%llu", stats.rxPkts); #endif s->set_rx_pkts(stats.rxPkts); s->set_rx_bytes(stats.rxBytes); s->set_rx_pps(stats.rxPps); s->set_rx_bps(stats.rxBps); s->set_tx_pkts(stats.txPkts); s->set_tx_bytes(stats.txBytes); s->set_tx_pps(stats.txPps); s->set_tx_bps(stats.txBps); s->set_rx_drops(stats.rxDrops); s->set_rx_errors(stats.rxErrors); s->set_rx_fifo_errors(stats.rxFifoErrors); s->set_rx_frame_errors(stats.rxFrameErrors); } done->Run(); } void MyService::clearStats(::google::protobuf::RpcController* /*controller*/, const ::OstProto::PortIdList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; qDebug("In %s", __PRETTY_FUNCTION__); for (int i = 0; i < request->port_id_size(); i++) { int portId; portId = request->port_id(i).id(); if ((portId < 0) || (portId >= portInfo.size())) { error = true; notes += QString("Port %1 clear statistics: invalid port\n") .arg(portId); continue; } portLock[portId]->lockForWrite(); portInfo[portId]->resetStats(); portLock[portId]->unlock(); } if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); } void MyService::getStreamStats( ::google::protobuf::RpcController* /*controller*/, const ::OstProto::StreamGuidList* request, ::OstProto::StreamStatsList* response, ::google::protobuf::Closure* done) { qDebug("In %s", __PRETTY_FUNCTION__); for (int i = 0; i < request->port_id_list().port_id_size(); i++) { int portId; portId = request->port_id_list().port_id(i).id(); if ((portId < 0) || (portId >= portInfo.size())) continue; // XXX: no way to inform RPC caller of invalid port portLock[portId]->lockForRead(); if (request->stream_guid_size()) for (int j = 0; j < request->stream_guid_size(); j++) portInfo[portId]->streamStats(request->stream_guid(j).id(), response); else portInfo[portId]->streamStatsAll(response); portLock[portId]->unlock(); } done->Run(); } void MyService::clearStreamStats( ::google::protobuf::RpcController* /*controller*/, const ::OstProto::StreamGuidList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; qDebug("In %s", __PRETTY_FUNCTION__); for (int i = 0; i < request->port_id_list().port_id_size(); i++) { int portId; portId = request->port_id_list().port_id(i).id(); if ((portId < 0) || (portId >= portInfo.size())) { error = true; notes += QString("Port %1 clear stream statistics: invalid port\n") .arg(portId); continue; } portLock[portId]->lockForWrite(); if (request->stream_guid_size()) for (int j = 0; j < request->stream_guid_size(); j++) portInfo[portId]->resetStreamStats( request->stream_guid(j).id()); else portInfo[portId]->resetStreamStatsAll(); portLock[portId]->unlock(); } if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); } void MyService::checkVersion(::google::protobuf::RpcController* controller, const ::OstProto::VersionInfo* request, ::OstProto::VersionCompatibility* response, ::google::protobuf::Closure* done) { QString myVersion(version); QString clientVersion; QStringList my, client; qDebug("In %s", __PRETTY_FUNCTION__); my = myVersion.split('.'); Q_ASSERT(my.size() >= 2); clientVersion = QString::fromStdString(request->version()); client = clientVersion.split('.'); qDebug("client = %s, my = %s", qPrintable(clientVersion), qPrintable(myVersion)); if (client.size() < 2) goto _invalid_version; // Compare only major and minor numbers if (client[0] == my[0] && client[1] == my[1]) { response->set_result(OstProto::VersionCompatibility::kCompatible); static_cast(controller)->EnableNotif( request->client_name() == "python-ostinato" ? false : true); } else { response->set_result(OstProto::VersionCompatibility::kIncompatible); response->set_notes(QString("Drone needs controller version %1.%2.x") .arg(my[0], my[1]).toStdString()); static_cast(controller)->TriggerDisconnect(); } done->Run(); return; _invalid_version: controller->SetFailed("invalid version information"); done->Run(); } void MyService::build(::google::protobuf::RpcController* controller, const ::OstProto::BuildConfig* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { QString notes; int portId; int frameError = 0; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->port_id().id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; if (portInfo[portId]->isTransmitOn()) goto _port_busy; portLock[portId]->lockForWrite(); if (portInfo[portId]->isDirty()) frameError = portInfo[portId]->updatePacketList(); portLock[portId]->unlock(); if (frameError) { notes += frameValueErrorNotes(portId, frameError); response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); return; _port_busy: controller->SetFailed(QString("Port %1 build: operation disallowed " "on transmitting port") .arg(portId).toStdString()); goto _exit; _invalid_port: controller->SetFailed(QString("Port %1 build: invalid port") .arg(portId).toStdString()); _exit: done->Run(); } /* * =================================================================== * Device Emulation * =================================================================== * XXX: Streams and Devices are largely non-overlapping from a RPC * point of view but they *do* intersect e.g. when a stream is trying to * find its associated device and info from that device such as src/dst * mac addresses. For this reason, both set of RPCs share the common * port level locking * =================================================================== */ void MyService::getDeviceGroupIdList( ::google::protobuf::RpcController* controller, const ::OstProto::PortId* request, ::OstProto::DeviceGroupIdList* response, ::google::protobuf::Closure* done) { DeviceManager *devMgr; int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; devMgr = portInfo[portId]->deviceManager(); response->mutable_port_id()->set_id(portId); portLock[portId]->lockForRead(); for (int i = 0; i < devMgr->deviceGroupCount(); i++) { OstProto::DeviceGroupId *dgid; dgid = response->add_device_group_id(); dgid->CopyFrom(devMgr->deviceGroupAtIndex(i)->device_group_id()); } portLock[portId]->unlock(); done->Run(); return; _invalid_port: controller->SetFailed(QString("Port %1 get device group id list: " "invalid port").arg(portId).toStdString()); done->Run(); } void MyService::getDeviceGroupConfig( ::google::protobuf::RpcController* controller, const ::OstProto::DeviceGroupIdList* request, ::OstProto::DeviceGroupConfigList* response, ::google::protobuf::Closure* done) { DeviceManager *devMgr; int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->port_id().id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; devMgr = portInfo[portId]->deviceManager(); response->mutable_port_id()->set_id(portId); portLock[portId]->lockForRead(); for (int i = 0; i < request->device_group_id_size(); i++) { const OstProto::DeviceGroup *dg; dg = devMgr->deviceGroup(request->device_group_id(i).id()); if (!dg) continue; // XXX: no way to inform RPC caller of invalid dgid response->add_device_group()->CopyFrom(*dg); } portLock[portId]->unlock(); done->Run(); return; _invalid_port: controller->SetFailed(QString("Port %1 get device group config: " "invalid port").arg(portId).toStdString()); done->Run(); } void MyService::addDeviceGroup( ::google::protobuf::RpcController* controller, const ::OstProto::DeviceGroupIdList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; DeviceManager *devMgr; int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->port_id().id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; devMgr = portInfo[portId]->deviceManager(); if (portInfo[portId]->isTransmitOn()) goto _port_busy; portLock[portId]->lockForWrite(); for (int i = 0; i < request->device_group_id_size(); i++) { quint32 id = request->device_group_id(i).id(); const OstProto::DeviceGroup *dg = devMgr->deviceGroup(id); // If device group with same id as in request exists already ==> error! if (dg) { error = true; notes += QString("Port %1 DeviceGroup %2 add device group: " " device group already exists\n") .arg(portId).arg(id); continue; } devMgr->addDeviceGroup(id); } portLock[portId]->unlock(); if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); return; _port_busy: controller->SetFailed(QString("Port %1 add device group: " "operation disallowed on transmitting port") .arg(portId).toStdString()); goto _exit; _invalid_port: controller->SetFailed(QString("Port %1 add device group: " "invalid port") .arg(portId).toStdString()); _exit: done->Run(); } void MyService::deleteDeviceGroup( ::google::protobuf::RpcController* controller, const ::OstProto::DeviceGroupIdList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; DeviceManager *devMgr; int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->port_id().id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; devMgr = portInfo[portId]->deviceManager(); if (portInfo[portId]->isTransmitOn()) goto _port_busy; portLock[portId]->lockForWrite(); for (int i = 0; i < request->device_group_id_size(); i++) { quint32 id = request->device_group_id(i).id(); if (!devMgr->deleteDeviceGroup(id)) { error = true; notes += QString("Port %1 DeviceGroup %2 delete device group: " "device group not found\n").arg(portId).arg(id); } } portLock[portId]->unlock(); if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); return; _port_busy: controller->SetFailed(QString("Port %1 delete device group: " "operation disallowed on transmitting port") .arg(portId).toStdString()); goto _exit; _invalid_port: controller->SetFailed(QString("Port %1 delete device group: " "invalid port").arg(portId).toStdString()); _exit: done->Run(); } void MyService::modifyDeviceGroup( ::google::protobuf::RpcController* controller, const ::OstProto::DeviceGroupConfigList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; DeviceManager *devMgr; int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->port_id().id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; devMgr = portInfo[portId]->deviceManager(); if (portInfo[portId]->isTransmitOn()) goto _port_busy; portLock[portId]->lockForWrite(); for (int i = 0; i < request->device_group_size(); i++) { quint32 id = request->device_group(i).device_group_id().id(); if (!devMgr->modifyDeviceGroup(&request->device_group(i))) { error = true; notes += QString("Port %1 DeviceGroup %2 modify device group: " "device group not found\n").arg(portId).arg(id); } } portLock[portId]->unlock(); if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); return; _port_busy: controller->SetFailed(QString("Port %1 modify device group: " "operation disallowed on transmitting port") .arg(portId).toStdString()); goto _exit; _invalid_port: controller->SetFailed(QString("Port %1 modify device group: " "invalid port").arg(portId).toStdString()); _exit: done->Run(); } void MyService::getDeviceList( ::google::protobuf::RpcController* controller, const ::OstProto::PortId* request, ::OstProto::PortDeviceList* response, ::google::protobuf::Closure* done) { DeviceManager *devMgr; int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; devMgr = portInfo[portId]->deviceManager(); response->mutable_port_id()->set_id(portId); portLock[portId]->lockForRead(); devMgr->getDeviceList(response); portLock[portId]->unlock(); done->Run(); return; _invalid_port: controller->SetFailed(QString("Port %1 get device list: " "invalid port").arg(portId).toStdString()); done->Run(); } void MyService::resolveDeviceNeighbors( ::google::protobuf::RpcController* /*controller*/, const ::OstProto::PortIdList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; qDebug("In %s", __PRETTY_FUNCTION__); for (int i = 0; i < request->port_id_size(); i++) { int portId; portId = request->port_id(i).id(); if ((portId < 0) || (portId >= portInfo.size())) { error = true; notes += QString("Port %1 resolve device neighbors: " "invalid port\n").arg(portId); continue; } portLock[portId]->lockForWrite(); portInfo[portId]->resolveDeviceNeighbors(); portLock[portId]->unlock(); } if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else { // XXX: allow time for ARP/ND to finish; if more time is required, // the client should wait/check before invoking build() QThread::msleep(500); response->set_status(OstProto::Ack::kRpcSuccess); } done->Run(); } void MyService::clearDeviceNeighbors( ::google::protobuf::RpcController* /*controller*/, const ::OstProto::PortIdList* request, ::OstProto::Ack* response, ::google::protobuf::Closure* done) { bool error = false; QString notes; qDebug("In %s", __PRETTY_FUNCTION__); for (int i = 0; i < request->port_id_size(); i++) { int portId; portId = request->port_id(i).id(); if ((portId < 0) || (portId >= portInfo.size())) { error = true; notes += QString("Port %1 clear device neighbors: " "invalid port\n").arg(portId); continue; } portLock[portId]->lockForWrite(); portInfo[portId]->clearDeviceNeighbors(); portLock[portId]->unlock(); } if (error) { response->set_status(OstProto::Ack::kRpcError); response->set_notes(notes.toStdString()); } else response->set_status(OstProto::Ack::kRpcSuccess); done->Run(); } void MyService::getDeviceNeighbors( ::google::protobuf::RpcController* controller, const ::OstProto::PortId* request, ::OstProto::PortNeighborList* response, ::google::protobuf::Closure* done) { DeviceManager *devMgr; int portId; qDebug("In %s", __PRETTY_FUNCTION__); portId = request->id(); if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; devMgr = portInfo[portId]->deviceManager(); response->mutable_port_id()->set_id(portId); portLock[portId]->lockForRead(); devMgr->getDeviceNeighbors(response); portLock[portId]->unlock(); done->Run(); return; _invalid_port: controller->SetFailed(QString("Port %1 get device neighbors: " "invalid port").arg(portId).toStdString()); done->Run(); } QString MyService::frameValueErrorNotes(int portId, int error) { if (!error) return QString(); QString pfx = QString("Port %1: ").arg(portId); auto errorFlags = static_cast(error); if (errorFlags & FrameValueAttrib::OutOfMemoryError) return pfx + "Error building packet buffers - out of buffer memory\n"; // If smac resolve fails, dmac will always fail - so check that first // and report only that so as not to confuse users (they may not realize // that without a source device, we have no ARP table to lookup for dmac) if (errorFlags & FrameValueAttrib::UnresolvedSrcMacError) return pfx + "Source mac resolve failed for one or more " "streams - Device matching stream's source IP not found\n"; if (errorFlags & FrameValueAttrib::UnresolvedDstMacError) return pfx + "Destination mac resolve failed for one or more " "streams - possible ARP/ND failure\n"; return QString(); } /* * =================================================================== * Friends * TODO: Encap these global functions into a DeviceBroker singleton? * =================================================================== */ quint64 getDeviceMacAddress(int portId, int streamId, int frameIndex) { MyService *service = drone->rpcService(); DeviceManager *devMgr = NULL; quint64 mac; if (!service) return 0; if ((portId >= 0) && (portId < service->portInfo.size())) devMgr = service->portInfo[portId]->deviceManager(); if (!devMgr || !devMgr->deviceCount()) return 0; /* * FIXME: We don't need lockForWrite, only lockForRead here. * However, this function is called in the following sequence * modifyPort() --> updatePacketList() --> frameValue() * where modifyPort has already taken a write lock. Qt allows * recursive locks, but not of a different type, so we are * forced to use lockForWrite here - till we find a different * solution. */ service->portLock[portId]->lockForWrite(); mac = service->portInfo[portId]->deviceMacAddress(streamId, frameIndex); service->portLock[portId]->unlock(); return mac; } quint64 getNeighborMacAddress(int portId, int streamId, int frameIndex) { MyService *service = drone->rpcService(); DeviceManager *devMgr = NULL; quint64 mac; if (!service) return 0; if ((portId >= 0) && (portId < service->portInfo.size())) devMgr = service->portInfo[portId]->deviceManager(); if (!devMgr || !devMgr->deviceCount()) return 0; /* * FIXME: We don't need lockForWrite, only lockForRead here. * See comment in getDeviceMacAddress() for more */ service->portLock[portId]->lockForWrite(); mac = service->portInfo[portId]->neighborMacAddress(streamId, frameIndex); service->portLock[portId]->unlock(); return mac; }