ostinato/client/portgroup.cpp
Srivats P db77563466 UX: Change port name color if local config is changed
Although the applyHint is also changed in this case, the applyHint is
visible only when the port is selected. Having the port name in a
different color is a visual hint to the user that Apply is pending
2017-09-16 12:12:43 +05:30

1599 lines
49 KiB
C++

/*
Copyright (C) 2010 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#include "portgroup.h"
#include "settings.h"
#include "emulproto.pb.h"
#include "fileformat.pb.h"
#include <QApplication>
#include <QCursor>
#include <QDesktopServices>
#include <QMainWindow>
#include <QMessageBox>
#include <QProcess>
#include <QRegExp>
#include <QTemporaryFile>
#include <QTimer>
#include <QtGlobal>
#include <QUrl>
using ::google::protobuf::NewCallback;
extern QMainWindow *mainWindow;
extern char *version;
quint32 PortGroup::mPortGroupAllocId = 0;
PortGroup::PortGroup(QString serverName, quint16 port)
{
// Allocate an id for self
mPortGroupId = PortGroup::mPortGroupAllocId++;
portIdList_ = new OstProto::PortIdList;
portStatsList_ = new OstProto::PortStatsList;
statsController = new PbRpcController(portIdList_, portStatsList_);
isGetStatsPending_ = false;
atConnectConfig_ = NULL;
compat = kUnknown;
reconnect = false;
reconnectAfter = kMinReconnectWaitTime;
reconnectTimer = new QTimer(this);
reconnectTimer->setSingleShot(true);
connect(reconnectTimer, SIGNAL(timeout()),
this, SLOT(on_reconnectTimer_timeout()));
rpcChannel = new PbRpcChannel(serverName, port,
OstProto::Notification::default_instance());
serviceStub = new OstProto::OstService::Stub(rpcChannel);
// FIXME(LOW):Can't for my life figure out why this ain't working!
//QMetaObject::connectSlotsByName(this);
connect(rpcChannel, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
this, SLOT(on_rpcChannel_stateChanged(QAbstractSocket::SocketState)));
connect(rpcChannel, SIGNAL(connected()),
this, SLOT(on_rpcChannel_connected()));
connect(rpcChannel, SIGNAL(disconnected()),
this, SLOT(on_rpcChannel_disconnected()));
connect(rpcChannel, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(on_rpcChannel_error(QAbstractSocket::SocketError)));
connect(rpcChannel,
SIGNAL(notification(int, ::google::protobuf::Message*)),
this,
SLOT(on_rpcChannel_notification(int, ::google::protobuf::Message*)));
connect(this, SIGNAL(portListChanged(quint32)),
this, SLOT(when_portListChanged(quint32)), Qt::QueuedConnection);
}
PortGroup::~PortGroup()
{
qDebug("PortGroup Destructor");
// Disconnect and free rpc channel etc.
PortGroup::disconnectFromHost();
delete serviceStub;
delete rpcChannel;
delete statsController;
delete atConnectConfig_;
}
void PortGroup::setConfigAtConnect(const OstProto::PortGroupContent *config)
{
if (!config) {
delete atConnectConfig_;
atConnectConfig_ = NULL;
return;
}
if (!atConnectConfig_)
atConnectConfig_ = new OstProto::PortGroupContent;
atConnectConfig_->CopyFrom(*config);
}
int PortGroup::numReservedPorts() const
{
int count = 0;
for (int i = 0; i < mPorts.size(); i++)
{
if (!mPorts[i]->userName().isEmpty())
count++;
}
return count;
}
const QString PortGroup::serverFullName() const
{
return serverPort() == DEFAULT_SERVER_PORT ?
serverName() : QString("%1:%2").arg(serverName()).arg(serverPort());
}
// ------------------------------------------------
// Slots
// ------------------------------------------------
void PortGroup::on_reconnectTimer_timeout()
{
reconnectAfter *= 2;
if (reconnectAfter > kMaxReconnectWaitTime)
reconnectAfter = kMaxReconnectWaitTime;
connectToHost();
}
void PortGroup::on_rpcChannel_stateChanged(QAbstractSocket::SocketState state)
{
qDebug("state changed %d", state);
switch (state)
{
case QAbstractSocket::UnconnectedState:
case QAbstractSocket::ClosingState:
break;
default:
emit portGroupDataChanged(mPortGroupId);
}
}
void PortGroup::on_rpcChannel_connected()
{
OstProto::VersionInfo *verInfo = new OstProto::VersionInfo;
OstProto::VersionCompatibility *verCompat =
new OstProto::VersionCompatibility;
qDebug("connected\n");
emit portGroupDataChanged(mPortGroupId);
reconnectAfter = kMinReconnectWaitTime;
qDebug("requesting version check ...");
verInfo->set_client_name("ostinato");
verInfo->set_version(version);
PbRpcController *controller = new PbRpcController(verInfo, verCompat);
serviceStub->checkVersion(controller, verInfo, verCompat,
NewCallback(this, &PortGroup::processVersionCompatibility,
controller));
}
void PortGroup::processVersionCompatibility(PbRpcController *controller)
{
OstProto::VersionCompatibility *verCompat
= static_cast<OstProto::VersionCompatibility*>(controller->response());
Q_ASSERT(verCompat != NULL);
qDebug("got version result ...");
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
goto _error_exit;
}
if (verCompat->result() == OstProto::VersionCompatibility::kIncompatible) {
qWarning("incompatible version %s (%s)", version,
qPrintable(QString::fromStdString(verCompat->notes())));
compat = kIncompatible;
emit portGroupDataChanged(mPortGroupId);
goto _error_exit;
}
compat = kCompatible;
{
OstProto::Void *void_ = new OstProto::Void;
OstProto::PortIdList *portIdList = new OstProto::PortIdList;
qDebug("requesting portlist ...");
PbRpcController *controller = new PbRpcController(void_, portIdList);
serviceStub->getPortIdList(controller, void_, portIdList,
NewCallback(this, &PortGroup::processPortIdList, controller));
}
_error_exit:
delete controller;
}
void PortGroup::on_rpcChannel_disconnected()
{
qDebug("disconnected\n");
emit portListAboutToBeChanged(mPortGroupId);
while (!mPorts.isEmpty())
delete mPorts.takeFirst();
atConnectPortConfig_.clear();
emit portListChanged(mPortGroupId);
emit portGroupDataChanged(mPortGroupId);
isGetStatsPending_ = false;
if (reconnect)
{
qDebug("starting reconnect timer for %d ms ...", reconnectAfter);
reconnectTimer->start(reconnectAfter);
}
}
void PortGroup::on_rpcChannel_error(QAbstractSocket::SocketError socketError)
{
qDebug("%s: error %d", __FUNCTION__, socketError);
emit portGroupDataChanged(mPortGroupId);
if (socketError == QAbstractSocket::RemoteHostClosedError)
reconnect = false;
qDebug("%s: state %d", __FUNCTION__, rpcChannel->state());
if ((rpcChannel->state() == QAbstractSocket::UnconnectedState) && reconnect)
{
qDebug("starting reconnect timer for %d ms...", reconnectAfter);
reconnectTimer->start(reconnectAfter);
}
}
void PortGroup::on_rpcChannel_notification(int notifType,
::google::protobuf::Message *notification)
{
OstProto::Notification *notif =
dynamic_cast<OstProto::Notification*>(notification);
if (!notif) {
qWarning("unable to dynamic cast notif");
return;
}
if (notifType != notif->notif_type()) {
qWarning("notif type mismatch %d/%d msg = %s",
notifType, notif->notif_type(),
notification->DebugString().c_str());
return;
}
switch (notifType)
{
case OstProto::portConfigChanged: {
if (!notif->port_id_list().port_id_size()) {
qWarning("notif(portConfigChanged) has an empty port_id_list");
return;
}
OstProto::PortIdList *portIdList = new OstProto::PortIdList;
OstProto::PortConfigList *portConfigList =
new OstProto::PortConfigList;
PbRpcController *controller = new PbRpcController(portIdList,
portConfigList);
portIdList->CopyFrom(notif->port_id_list());
serviceStub->getPortConfig(controller, portIdList, portConfigList,
NewCallback(this, &PortGroup::processUpdatedPortConfig,
controller));
break;
}
default:
break;
}
}
void PortGroup::when_portListChanged(quint32 /*portGroupId*/)
{
QString faq("http://ostinato.org/docs/faq#q-port-group-has-no-interfaces");
if (state() == QAbstractSocket::ConnectedState && numPorts() <= 0)
{
if (QMessageBox::warning(NULL, tr("No ports in portgroup"),
QString("The portgroup %1:%2 does not contain any ports!\n\n"
"Packet Transmit/Capture requires elevated privileges. "
"Please ensure that you are running 'drone' - the server "
"component of Ostinato with admin/root OR setuid privilege.\n\n"
"For help see the Ostinato FAQ (%3)")
.arg(serverName())
.arg(int(serverPort()))
.arg(faq.remove(QRegExp("#.*$"))),
QMessageBox::Ok | QMessageBox::Help,
QMessageBox::Ok) == QMessageBox::Help)
{
QDesktopServices::openUrl(QUrl(faq));
}
}
}
void PortGroup::processPortIdList(PbRpcController *controller)
{
OstProto::PortIdList *portIdList
= static_cast<OstProto::PortIdList*>(controller->response());
Q_ASSERT(portIdList != NULL);
qDebug("got a portlist ...");
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
goto _error_exit;
}
emit portListAboutToBeChanged(mPortGroupId);
for(int i = 0; i < portIdList->port_id_size(); i++)
{
Port *p;
p = new Port(portIdList->port_id(i).id(), mPortGroupId);
connect(p, SIGNAL(portDataChanged(int, int)),
this, SIGNAL(portGroupDataChanged(int, int)));
connect(p, SIGNAL(localConfigChanged(int, int, bool)),
this, SIGNAL(portGroupDataChanged(int, int)));
qDebug("before port append\n");
mPorts.append(p);
atConnectPortConfig_.append(NULL); // will be filled later
}
emit portListChanged(mPortGroupId);
portIdList_->CopyFrom(*portIdList);
// Request PortConfigList
{
qDebug("requesting port config list ...");
OstProto::PortIdList *portIdList2 = new OstProto::PortIdList();
OstProto::PortConfigList *portConfigList = new OstProto::PortConfigList();
PbRpcController *controller2 = new PbRpcController(portIdList2,
portConfigList);
portIdList2->CopyFrom(*portIdList);
serviceStub->getPortConfig(controller, portIdList2, portConfigList,
NewCallback(this, &PortGroup::processPortConfigList, controller2));
goto _exit;
}
_error_exit:
_exit:
delete controller;
}
void PortGroup::processPortConfigList(PbRpcController *controller)
{
OstProto::PortConfigList *portConfigList
= static_cast<OstProto::PortConfigList*>(controller->response());
qDebug("In %s", __FUNCTION__);
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
goto _error_exit;
}
//emit portListAboutToBeChanged(mPortGroupId);
for(int i = 0; i < portConfigList->port_size(); i++)
{
uint id;
id = portConfigList->port(i).port_id().id();
// FIXME: don't mix port id & index into mPorts[]
mPorts[id]->updatePortConfig(portConfigList->mutable_port(i));
}
// FIXME: Ideally we should emit portGroupDataChanged since only
// port data is changing; but writing the corresponding slot in
// PortStatsModel for that signal turned out to be very bug prone
// causing assert failures when portgroups are added/deleted or
// connected/disconnected in different orders
// TODO: Revisit this when we refactor the domain-objects/model/view
// design
emit portListChanged(mPortGroupId);
if (numPorts() > 0) {
// XXX: The open session code (atConnectConfig_ related) assumes
// the following two RPCs are invoked in the below order
// Any change here without coressponding change in that code
// will break stuff
getDeviceGroupIdList();
getStreamIdList();
}
// Now that we have the port details, let's identify which ports
// need to be re-configured based on atConnectConfig_
if (atConnectConfig_ && numPorts() > 0)
{
QString myself = appSettings->value(kUserKey, kUserDefaultValue)
.toString();
for (int i = 0; i < atConnectConfig_->ports_size(); i++)
{
const OstProto::PortContent *pc = &atConnectConfig_->ports(i);
for (int j = 0; j < mPorts.size(); j++)
{
Port *port = mPorts[j];
if (port->name() == pc->port_config().name().c_str())
{
if (!port->userName().isEmpty() // rsvd?
&& port->userName() != myself) // by someone else?
{
QString warning =
QString("%1 - %2: %3 is reserved by %4.\n\n"
"Port will not be reconfigured.")
.arg(serverFullName())
.arg(j)
.arg(port->userAlias())
.arg(port->userName());
QMessageBox::warning(NULL, tr("Open Session"), warning);
qWarning("%s", qPrintable(warning));
continue;
}
atConnectPortConfig_[j] = pc;
qDebug("port %d will be reconfigured", j);
break;
}
}
}
}
_error_exit:
delete controller;
}
void PortGroup::when_configApply(int portIndex)
{
OstProto::StreamIdList *streamIdList;
OstProto::StreamConfigList *streamConfigList;
OstProto::Ack *ack;
PbRpcController *controller;
Q_ASSERT(portIndex < mPorts.size());
if (state() != QAbstractSocket::ConnectedState)
return;
QApplication::setOverrideCursor(QCursor(Qt::WaitCursor));
mainWindow->setDisabled(true);
// FIXME: as currently written this code will make unnecessary RPCs
// even if the request contains no data; the fix will need to take
// care to identify when sync is complete
// NOTE: DeviceGroup RPCs are no longer called unnecessarily;
// Stream RPCs need to be fixed similarly
// Also, drone currently updates its packet list at the end of
// modifyStream() implicitly assuming that will be the last API
// called - this will also need to be fixed
//
// Update/Sync DeviceGroups
//
OstProto::DeviceGroupIdList *deviceGroupIdList;
OstProto::DeviceGroupConfigList *deviceGroupConfigList;
bool refreshReqd = false;
qDebug("applying 'deleted deviceGroups' ...");
deviceGroupIdList = new OstProto::DeviceGroupIdList;
deviceGroupIdList->mutable_port_id()->set_id(mPorts[portIndex]->id());
mPorts[portIndex]->getDeletedDeviceGroupsSinceLastSync(*deviceGroupIdList);
if (deviceGroupIdList->device_group_id_size()) {
ack = new OstProto::Ack;
controller = new PbRpcController(deviceGroupIdList, ack);
serviceStub->deleteDeviceGroup(controller, deviceGroupIdList, ack,
NewCallback(this, &PortGroup::processDeleteDeviceGroupAck,
controller));
refreshReqd = true;
}
else
delete deviceGroupIdList;
qDebug("applying 'new deviceGroups' ...");
deviceGroupIdList = new OstProto::DeviceGroupIdList;
deviceGroupIdList->mutable_port_id()->set_id(mPorts[portIndex]->id());
mPorts[portIndex]->getNewDeviceGroupsSinceLastSync(*deviceGroupIdList);
if (deviceGroupIdList->device_group_id_size()) {
ack = new OstProto::Ack;
controller = new PbRpcController(deviceGroupIdList, ack);
serviceStub->addDeviceGroup(controller, deviceGroupIdList, ack,
NewCallback(this, &PortGroup::processAddDeviceGroupAck,
controller));
refreshReqd = true;
}
else
delete deviceGroupIdList;
qDebug("applying 'modified deviceGroups' ...");
deviceGroupConfigList = new OstProto::DeviceGroupConfigList;
deviceGroupConfigList->mutable_port_id()->set_id(mPorts[portIndex]->id());
mPorts[portIndex]->getModifiedDeviceGroupsSinceLastSync(
*deviceGroupConfigList);
if (deviceGroupConfigList->device_group_size()) {
ack = new OstProto::Ack;
controller = new PbRpcController(deviceGroupConfigList, ack);
serviceStub->modifyDeviceGroup(controller, deviceGroupConfigList, ack,
NewCallback(this, &PortGroup::processModifyDeviceGroupAck,
portIndex, controller));
refreshReqd = true;
}
else
delete deviceGroupConfigList;
if (refreshReqd)
getDeviceInfo(portIndex);
//
// Update/Sync Streams
//
qDebug("applying 'deleted streams' ...");
streamIdList = new OstProto::StreamIdList;
ack = new OstProto::Ack;
controller = new PbRpcController(streamIdList, ack);
streamIdList->mutable_port_id()->set_id(mPorts[portIndex]->id());
mPorts[portIndex]->getDeletedStreamsSinceLastSync(*streamIdList);
serviceStub->deleteStream(controller, streamIdList, ack,
NewCallback(this, &PortGroup::processDeleteStreamAck, controller));
qDebug("applying 'new streams' ...");
streamIdList = new OstProto::StreamIdList;
ack = new OstProto::Ack;
controller = new PbRpcController(streamIdList, ack);
streamIdList->mutable_port_id()->set_id(mPorts[portIndex]->id());
mPorts[portIndex]->getNewStreamsSinceLastSync(*streamIdList);
serviceStub->addStream(controller, streamIdList, ack,
NewCallback(this, &PortGroup::processAddStreamAck, controller));
qDebug("applying 'modified streams' ...");
streamConfigList = new OstProto::StreamConfigList;
ack = new OstProto::Ack;
controller = new PbRpcController(streamConfigList, ack);
streamConfigList->mutable_port_id()->set_id(mPorts[portIndex]->id());
mPorts[portIndex]->getModifiedStreamsSinceLastSync(*streamConfigList);
serviceStub->modifyStream(controller, streamConfigList, ack,
NewCallback(this, &PortGroup::processModifyStreamAck,
portIndex, controller));
}
void PortGroup::processAddDeviceGroupAck(PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
delete controller;
}
void PortGroup::processDeleteDeviceGroupAck(PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
delete controller;
}
void PortGroup::processModifyDeviceGroupAck(int /*portIndex*/,
PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
delete controller;
}
void PortGroup::processAddStreamAck(PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
delete controller;
}
void PortGroup::processDeleteStreamAck(PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
delete controller;
}
void PortGroup::processModifyStreamAck(int portIndex,
PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
qDebug("apply completed");
mPorts[portIndex]->when_syncComplete();
mainWindow->setEnabled(true);
QApplication::restoreOverrideCursor();
delete controller;
}
void PortGroup::getDeviceInfo(int portIndex)
{
OstProto::PortId *portId;
OstProto::PortDeviceList *deviceList;
OstProto::PortNeighborList *neighList;
PbRpcController *controller;
Q_ASSERT(portIndex < mPorts.size());
if (state() != QAbstractSocket::ConnectedState)
return;
portId = new OstProto::PortId;
portId->set_id(mPorts[portIndex]->id());
deviceList = new OstProto::PortDeviceList;
controller = new PbRpcController(portId, deviceList);
serviceStub->getDeviceList(controller, portId, deviceList,
NewCallback(this, &PortGroup::processDeviceList,
portIndex, controller));
portId = new OstProto::PortId;
portId->set_id(mPorts[portIndex]->id());
neighList = new OstProto::PortNeighborList;
controller = new PbRpcController(portId, neighList);
serviceStub->getDeviceNeighbors(controller, portId, neighList,
NewCallback(this, &PortGroup::processDeviceNeighbors,
portIndex, controller));
}
void PortGroup::processDeviceList(int portIndex, PbRpcController *controller)
{
OstProto::PortDeviceList *deviceList
= static_cast<OstProto::PortDeviceList*>(controller->response());
qDebug("In %s (portIndex = %d)", __FUNCTION__, portIndex);
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
goto _exit;
}
Q_ASSERT(portIndex < numPorts());
if (deviceList->port_id().id() != mPorts[portIndex]->id())
{
qDebug("Invalid portId %d (expected %d) received for portIndex %d",
deviceList->port_id().id(), mPorts[portIndex]->id(), portIndex);
goto _exit;
}
mPorts[portIndex]->clearDeviceList();
for(int i = 0; i < deviceList->ExtensionSize(OstEmul::device); i++) {
mPorts[portIndex]->insertDevice(
deviceList->GetExtension(OstEmul::device, i));
}
_exit:
delete controller;
}
void PortGroup::processDeviceNeighbors(
int portIndex, PbRpcController *controller)
{
OstProto::PortNeighborList *neighList
= static_cast<OstProto::PortNeighborList*>(controller->response());
qDebug("In %s (portIndex = %d)", __FUNCTION__, portIndex);
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
goto _exit;
}
Q_ASSERT(portIndex < numPorts());
if (neighList->port_id().id() != mPorts[portIndex]->id())
{
qDebug("Invalid portId %d (expected %d) received for portIndex %d",
neighList->port_id().id(), mPorts[portIndex]->id(), portIndex);
goto _exit;
}
mPorts[portIndex]->clearDeviceNeighbors();
for(int i=0; i < neighList->ExtensionSize(OstEmul::device_neighbor); i++) {
mPorts[portIndex]->insertDeviceNeighbors(
neighList->GetExtension(OstEmul::device_neighbor, i));
}
mPorts[portIndex]->deviceInfoRefreshed();
_exit:
delete controller;
}
void PortGroup::modifyPort(int portIndex, OstProto::Port portConfig)
{
OstProto::PortConfigList *portConfigList = new OstProto::PortConfigList;
OstProto::Ack *ack = new OstProto::Ack;
qDebug("%s: portIndex = %d", __FUNCTION__, portIndex);
Q_ASSERT(portIndex < mPorts.size());
QApplication::setOverrideCursor(QCursor(Qt::WaitCursor));
mainWindow->setDisabled(true);
OstProto::Port *port = portConfigList->add_port();
port->CopyFrom(portConfig);
port->mutable_port_id()->set_id(mPorts[portIndex]->id());
PbRpcController *controller = new PbRpcController(portConfigList, ack);
serviceStub->modifyPort(controller, portConfigList, ack,
NewCallback(this, &PortGroup::processModifyPortAck, true, controller));
}
void PortGroup::processModifyPortAck(bool restoreUi,PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
}
if (restoreUi) {
mainWindow->setEnabled(true);
QApplication::restoreOverrideCursor();
}
delete controller;
}
void PortGroup::processUpdatedPortConfig(PbRpcController *controller)
{
OstProto::PortConfigList *portConfigList
= static_cast<OstProto::PortConfigList*>(controller->response());
qDebug("In %s", __FUNCTION__);
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
goto _exit;
}
for(int i = 0; i < portConfigList->port_size(); i++)
{
uint id;
id = portConfigList->port(i).port_id().id();
// FIXME: don't mix port id & index into mPorts[]
mPorts[id]->updatePortConfig(portConfigList->mutable_port(i));
emit portGroupDataChanged(mPortGroupId, id);
}
_exit:
delete controller;
}
void PortGroup::getStreamIdList()
{
for (int portIndex = 0; portIndex < numPorts(); portIndex++)
{
OstProto::PortId *portId = new OstProto::PortId;
OstProto::StreamIdList *streamIdList = new OstProto::StreamIdList;
PbRpcController *controller = new PbRpcController(portId, streamIdList);
portId->set_id(mPorts[portIndex]->id());
serviceStub->getStreamIdList(controller, portId, streamIdList,
NewCallback(this, &PortGroup::processStreamIdList,
portIndex, controller));
}
}
void PortGroup::processStreamIdList(int portIndex, PbRpcController *controller)
{
OstProto::StreamIdList *streamIdList
= static_cast<OstProto::StreamIdList*>(controller->response());
const OstProto::PortContent *newPortContent
= atConnectPortConfig_.at(portIndex);
qDebug("In %s (portIndex = %d)", __FUNCTION__, portIndex);
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
goto _exit;
}
Q_ASSERT(portIndex < numPorts());
if (streamIdList->port_id().id() != mPorts[portIndex]->id())
{
qDebug("Invalid portId %d (expected %d) received for portIndex %d",
streamIdList->port_id().id(), mPorts[portIndex]->id(), portIndex);
goto _exit;
}
if (newPortContent)
{
// This port needs to configured with new content - to do this
// we'll insert the following RPC sequence at this point and once
// this sequence is over, return to the regular RPC sequence by
// re-requesting getStreamId()
// * delete (existing) deviceGroups
// (already done by processDeviceIdList)
// * delete (existing) streams
// * modify port
// * add (new) deviceGroup ids
// * modify (new) deviceGroups
// * add (new) stream ids
// * modify (new) streams
// XXX: This assumes getDeviceGroupIdList() was invoked before
// getStreamIdList() - if the order changes this code will break!
// XXX: same name as input param, but shouldn't cause any problem
PbRpcController *controller;
quint32 portId = mPorts[portIndex]->id();
QString myself = appSettings->value(kUserKey, kUserDefaultValue)
.toString();
// delete all existing streams
if (streamIdList->stream_id_size())
{
OstProto::StreamIdList *streamIdList2 = new OstProto::StreamIdList;
streamIdList2->CopyFrom(*streamIdList);
OstProto::Ack *ack = new OstProto::Ack;
controller = new PbRpcController(streamIdList2, ack);
serviceStub->deleteStream(controller, streamIdList2, ack,
NewCallback(this, &PortGroup::processDeleteStreamAck,
controller));
}
OstProto::Port portCfg = newPortContent->port_config();
if (mPorts[portIndex]->modifiablePortConfig(portCfg))
{
OstProto::PortConfigList *portConfigList =
new OstProto::PortConfigList;
OstProto::Port *port = portConfigList->add_port();
port->CopyFrom(portCfg);
if (port->has_user_name())
port->set_user_name(qPrintable(myself)); // overwrite
OstProto::Ack *ack = new OstProto::Ack;
controller = new PbRpcController(portConfigList, ack);
serviceStub->modifyPort(controller, portConfigList, ack,
NewCallback(this, &PortGroup::processModifyPortAck,
false, controller));
}
// add/modify deviceGroups
if (newPortContent->device_groups_size())
{
OstProto::DeviceGroupIdList *deviceGroupIdList
= new OstProto::DeviceGroupIdList;
OstProto::DeviceGroupConfigList *deviceGroupConfigList
= new OstProto::DeviceGroupConfigList;
deviceGroupIdList->mutable_port_id()->set_id(portId);
deviceGroupConfigList->mutable_port_id()->set_id(portId);
for (int i = 0; i < newPortContent->device_groups_size(); i++)
{
const OstProto::DeviceGroup &dg
= newPortContent->device_groups(i);
deviceGroupIdList->add_device_group_id()->set_id(
dg.device_group_id().id());
deviceGroupConfigList->add_device_group()->CopyFrom(dg);
}
OstProto::Ack *ack = new OstProto::Ack;
controller = new PbRpcController(deviceGroupIdList, ack);
serviceStub->addDeviceGroup(controller, deviceGroupIdList, ack,
NewCallback(this, &PortGroup::processAddDeviceGroupAck,
controller));
ack = new OstProto::Ack;
controller = new PbRpcController(deviceGroupConfigList, ack);
serviceStub->modifyDeviceGroup(controller,
deviceGroupConfigList, ack,
NewCallback(this, &PortGroup::processModifyDeviceGroupAck,
portIndex, controller));
}
// add/modify streams
if (newPortContent->streams_size())
{
OstProto::StreamIdList *streamIdList = new OstProto::StreamIdList;
OstProto::StreamConfigList *streamConfigList =
new OstProto::StreamConfigList;
streamIdList->mutable_port_id()->set_id(portId);
streamConfigList->mutable_port_id()->set_id(portId);
for (int i = 0; i < newPortContent->streams_size(); i++)
{
const OstProto::Stream &s = newPortContent->streams(i);
streamIdList->add_stream_id()->set_id(s.stream_id().id());
streamConfigList->add_stream()->CopyFrom(s);
}
OstProto::Ack *ack = new OstProto::Ack;
controller = new PbRpcController(streamIdList, ack);
serviceStub->addStream(controller, streamIdList, ack,
NewCallback(this, &PortGroup::processAddStreamAck,
controller));
ack = new OstProto::Ack;
controller = new PbRpcController(streamConfigList, ack);
serviceStub->modifyStream(controller, streamConfigList, ack,
NewCallback(this, &PortGroup::processModifyStreamAck,
portIndex, controller));
}
// delete newPortConfig
atConnectPortConfig_[portIndex] = NULL;
// return to normal sequence re-starting from
// getDeviceGroupIdList() and getStreamIdList()
OstProto::PortId *portId2 = new OstProto::PortId;
portId2->set_id(portId);
OstProto::DeviceGroupIdList *devGrpIdList
= new OstProto::DeviceGroupIdList;
controller = new PbRpcController(portId2, devGrpIdList);
serviceStub->getDeviceGroupIdList(controller, portId2, devGrpIdList,
NewCallback(this, &PortGroup::processDeviceGroupIdList,
portIndex, controller));
portId2 = new OstProto::PortId;
portId2->set_id(portId);
OstProto::StreamIdList *streamIdList = new OstProto::StreamIdList;
controller = new PbRpcController(portId2, streamIdList);
serviceStub->getStreamIdList(controller, portId2, streamIdList,
NewCallback(this, &PortGroup::processStreamIdList,
portIndex, controller));
}
else
{
for(int i = 0; i < streamIdList->stream_id_size(); i++)
{
uint streamId;
streamId = streamIdList->stream_id(i).id();
mPorts[portIndex]->insertStream(streamId);
}
mPorts[portIndex]->when_syncComplete();
getStreamConfigList(portIndex);
}
_exit:
delete controller;
}
void PortGroup::getStreamConfigList(int portIndex)
{
if (mPorts[portIndex]->numStreams() == 0)
return;
qDebug("requesting stream config list (port %d)...", portIndex);
OstProto::StreamIdList *streamIdList = new OstProto::StreamIdList;
OstProto::StreamConfigList *streamConfigList
= new OstProto::StreamConfigList;
PbRpcController *controller = new PbRpcController(
streamIdList, streamConfigList);
streamIdList->mutable_port_id()->set_id(mPorts[portIndex]->id());
for (int j = 0; j < mPorts[portIndex]->numStreams(); j++)
{
OstProto::StreamId *s = streamIdList->add_stream_id();
s->set_id(mPorts[portIndex]->streamByIndex(j)->id());
}
serviceStub->getStreamConfig(controller, streamIdList, streamConfigList,
NewCallback(this, &PortGroup::processStreamConfigList,
portIndex, controller));
}
void PortGroup::processStreamConfigList(int portIndex,
PbRpcController *controller)
{
OstProto::StreamConfigList *streamConfigList
= static_cast<OstProto::StreamConfigList*>(controller->response());
qDebug("In %s", __PRETTY_FUNCTION__);
Q_ASSERT(portIndex < numPorts());
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
goto _exit;
}
Q_ASSERT(portIndex < numPorts());
if (streamConfigList->port_id().id() != mPorts[portIndex]->id())
{
qDebug("Invalid portId %d (expected %d) received for portIndex %d",
streamConfigList->port_id().id(), mPorts[portIndex]->id(), portIndex);
goto _exit;
}
for(int i = 0; i < streamConfigList->stream_size(); i++)
{
uint streamId;
streamId = streamConfigList->stream(i).stream_id().id();
mPorts[portIndex]->updateStream(streamId,
streamConfigList->mutable_stream(i));
}
#if 0
// FIXME: incorrect check - will never be true if last port does not have any streams configured
// Are we done for all ports?
if (portIndex >= (numPorts()-1))
{
// FIXME(HI): some way to reset streammodel
}
#endif
_exit:
delete controller;
}
void PortGroup::getDeviceGroupIdList()
{
using OstProto::PortId;
using OstProto::DeviceGroupIdList;
for (int portIndex = 0; portIndex < numPorts(); portIndex++)
{
PortId *portId = new PortId;
DeviceGroupIdList *devGrpIdList = new DeviceGroupIdList;
PbRpcController *controller = new PbRpcController(portId, devGrpIdList);
portId->set_id(mPorts[portIndex]->id());
serviceStub->getDeviceGroupIdList(controller, portId, devGrpIdList,
NewCallback(this, &PortGroup::processDeviceGroupIdList,
portIndex, controller));
}
}
void PortGroup::processDeviceGroupIdList(
int portIndex,
PbRpcController *controller)
{
using OstProto::DeviceGroupIdList;
DeviceGroupIdList *devGrpIdList = static_cast<DeviceGroupIdList*>(
controller->response());
const OstProto::PortContent *newPortContent = atConnectPortConfig_.at(
portIndex);
qDebug("In %s (portIndex = %d)", __FUNCTION__, portIndex);
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
goto _exit;
}
Q_ASSERT(portIndex < numPorts());
if (devGrpIdList->port_id().id() != mPorts[portIndex]->id())
{
qDebug("Invalid portId %d (expected %d) received for portIndex %d",
devGrpIdList->port_id().id(), mPorts[portIndex]->id(), portIndex);
goto _exit;
}
if (newPortContent)
{
// We delete all existing deviceGroups
// Remaining stuff is done in processStreamIdList() - see notes there
if (devGrpIdList->device_group_id_size())
{
OstProto::DeviceGroupIdList *devGrpIdList2
= new OstProto::DeviceGroupIdList;
devGrpIdList2->CopyFrom(*devGrpIdList);
OstProto::Ack *ack = new OstProto::Ack;
PbRpcController *controller
= new PbRpcController(devGrpIdList2, ack);
serviceStub->deleteDeviceGroup(controller, devGrpIdList2, ack,
NewCallback(this, &PortGroup::processDeleteDeviceGroupAck,
controller));
}
}
else
{
for(int i = 0; i < devGrpIdList->device_group_id_size(); i++)
{
uint devGrpId;
devGrpId = devGrpIdList->device_group_id(i).id();
mPorts[portIndex]->insertDeviceGroup(devGrpId);
}
getDeviceGroupConfigList(portIndex);
}
_exit:
delete controller;
}
void PortGroup::getDeviceGroupConfigList(int portIndex)
{
using OstProto::DeviceGroupId;
using OstProto::DeviceGroupIdList;
using OstProto::DeviceGroupConfigList;
if (mPorts[portIndex]->numDeviceGroups() == 0)
return;
qDebug("requesting device group config list (port %d) ...", portIndex);
DeviceGroupIdList *devGrpIdList = new DeviceGroupIdList;
DeviceGroupConfigList *devGrpCfgList = new DeviceGroupConfigList;
PbRpcController *controller = new PbRpcController(
devGrpIdList, devGrpCfgList);
devGrpIdList->mutable_port_id()->set_id(mPorts[portIndex]->id());
for (int j = 0; j < mPorts[portIndex]->numDeviceGroups(); j++)
{
DeviceGroupId *dgid = devGrpIdList->add_device_group_id();
dgid->set_id(mPorts[portIndex]->deviceGroupByIndex(j)
->device_group_id().id());
}
serviceStub->getDeviceGroupConfig(controller,
devGrpIdList, devGrpCfgList,
NewCallback(this, &PortGroup::processDeviceGroupConfigList,
portIndex, controller));
}
void PortGroup::processDeviceGroupConfigList(int portIndex,
PbRpcController *controller)
{
using OstProto::DeviceGroupConfigList;
DeviceGroupConfigList *devGrpCfgList =
static_cast<OstProto::DeviceGroupConfigList*>(controller->response());
qDebug("In %s", __PRETTY_FUNCTION__);
Q_ASSERT(portIndex < numPorts());
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
goto _exit;
}
Q_ASSERT(portIndex < numPorts());
if (devGrpCfgList->port_id().id() != mPorts[portIndex]->id())
{
qDebug("Invalid portId %d (expected %d) received for portIndex %d",
devGrpCfgList->port_id().id(), mPorts[portIndex]->id(), portIndex);
goto _exit;
}
for(int i = 0; i < devGrpCfgList->device_group_size(); i++)
{
uint dgid = devGrpCfgList->device_group(i).device_group_id().id();
mPorts[portIndex]->updateDeviceGroup(dgid,
devGrpCfgList->mutable_device_group(i));
}
if (devGrpCfgList->device_group_size())
getDeviceInfo(portIndex);
#if 0
// FIXME: incorrect check - will never be true if last port does not have any deviceGroups configured
// Are we done for all ports?
if (portIndex >= (numPorts()-1))
{
// FIXME: reset deviceGroupModel?
}
#endif
_exit:
delete controller;
}
void PortGroup::startTx(QList<uint> *portList)
{
qDebug("In %s", __FUNCTION__);
if (state() != QAbstractSocket::ConnectedState)
goto _exit;
if (portList == NULL)
goto _exit;
{
OstProto::PortIdList *portIdList = new OstProto::PortIdList;
OstProto::Ack *ack = new OstProto::Ack;
PbRpcController *controller = new PbRpcController(portIdList, ack);
for (int i = 0; i < portList->size(); i++)
{
OstProto::PortId *portId = portIdList->add_port_id();
portId->set_id(portList->at(i));
}
serviceStub->startTransmit(controller, portIdList, ack,
NewCallback(this, &PortGroup::processStartTxAck, controller));
}
_exit:
return;
}
void PortGroup::processStartTxAck(PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
delete controller;
}
void PortGroup::stopTx(QList<uint> *portList)
{
qDebug("In %s", __FUNCTION__);
if (state() != QAbstractSocket::ConnectedState)
goto _exit;
if ((portList == NULL) || (portList->size() == 0))
goto _exit;
{
OstProto::PortIdList *portIdList = new OstProto::PortIdList;
OstProto::Ack *ack = new OstProto::Ack;
PbRpcController *controller = new PbRpcController(portIdList, ack);
for (int i = 0; i < portList->size(); i++)
{
OstProto::PortId *portId = portIdList->add_port_id();
portId->set_id(portList->at(i));
}
serviceStub->stopTransmit(controller, portIdList, ack,
NewCallback(this, &PortGroup::processStopTxAck, controller));
}
_exit:
return;
}
void PortGroup::processStopTxAck(PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
delete controller;
}
void PortGroup::startCapture(QList<uint> *portList)
{
qDebug("In %s", __FUNCTION__);
if (state() != QAbstractSocket::ConnectedState)
return;
if ((portList == NULL) || (portList->size() == 0))
goto _exit;
{
OstProto::PortIdList *portIdList = new OstProto::PortIdList;
OstProto::Ack *ack = new OstProto::Ack;
PbRpcController *controller = new PbRpcController(portIdList, ack);
for (int i = 0; i < portList->size(); i++)
{
OstProto::PortId *portId = portIdList->add_port_id();
portId->set_id(portList->at(i));
}
serviceStub->startCapture(controller, portIdList, ack,
NewCallback(this, &PortGroup::processStartCaptureAck, controller));
}
_exit:
return;
}
void PortGroup::processStartCaptureAck(PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
delete controller;
}
void PortGroup::stopCapture(QList<uint> *portList)
{
qDebug("In %s", __FUNCTION__);
if (state() != QAbstractSocket::ConnectedState)
return;
if ((portList == NULL) || (portList->size() == 0))
goto _exit;
{
OstProto::PortIdList *portIdList = new OstProto::PortIdList;
OstProto::Ack *ack = new OstProto::Ack;
PbRpcController *controller = new PbRpcController(portIdList, ack);
for (int i = 0; i < portList->size(); i++)
{
OstProto::PortId *portId = portIdList->add_port_id();
portId->set_id(portList->at(i));
}
serviceStub->stopCapture(controller, portIdList, ack,
NewCallback(this, &PortGroup::processStopCaptureAck, controller));
}
_exit:
return;
}
void PortGroup::processStopCaptureAck(PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
delete controller;
}
void PortGroup::viewCapture(QList<uint> *portList)
{
qDebug("In %s", __FUNCTION__);
if (state() != QAbstractSocket::ConnectedState)
goto _exit;
if ((portList == NULL) || (portList->size() != 1))
goto _exit;
for (int i = 0; i < portList->size(); i++)
{
OstProto::PortId *portId = new OstProto::PortId;
OstProto::CaptureBuffer *buf = new OstProto::CaptureBuffer;
PbRpcController *controller = new PbRpcController(portId, buf);
QFile *capFile = mPorts[portList->at(i)]->getCaptureFile();
portId->set_id(portList->at(i));
capFile->open(QIODevice::ReadWrite|QIODevice::Truncate);
qDebug("Temp CapFile = %s", capFile->fileName().toAscii().constData());
controller->setBinaryBlob(capFile);
serviceStub->getCaptureBuffer(controller, portId, buf,
NewCallback(this, &PortGroup::processViewCaptureAck, controller));
}
_exit:
return;
}
void PortGroup::processViewCaptureAck(PbRpcController *controller)
{
QFile *capFile = static_cast<QFile*>(controller->binaryBlob());
QString viewer = appSettings->value(kWiresharkPathKey,
kWiresharkPathDefaultValue).toString();
qDebug("In %s", __FUNCTION__);
capFile->flush();
capFile->close();
if (!QFile::exists(viewer))
{
QMessageBox::warning(NULL, "Can't find Wireshark",
viewer + QString(" does not exist!\n\nPlease correct the path"
" to Wireshark in the Preferences."));
goto _exit;
}
if (!QProcess::startDetached(viewer, QStringList() << capFile->fileName()))
qDebug("Failed starting Wireshark");
_exit:
delete controller;
}
void PortGroup::resolveDeviceNeighbors(QList<uint> *portList)
{
qDebug("In %s", __FUNCTION__);
if (state() != QAbstractSocket::ConnectedState)
return;
if ((portList == NULL) || (portList->size() == 0))
goto _exit;
{
OstProto::PortIdList *portIdList = new OstProto::PortIdList;
OstProto::Ack *ack = new OstProto::Ack;
PbRpcController *controller = new PbRpcController(portIdList, ack);
for (int i = 0; i < portList->size(); i++)
{
OstProto::PortId *portId = portIdList->add_port_id();
portId->set_id(portList->at(i));
}
serviceStub->resolveDeviceNeighbors(controller, portIdList, ack,
NewCallback(this, &PortGroup::processResolveDeviceNeighborsAck,
controller));
}
_exit:
return;
}
void PortGroup::processResolveDeviceNeighborsAck(PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
delete controller;
}
void PortGroup::clearDeviceNeighbors(QList<uint> *portList)
{
qDebug("In %s", __FUNCTION__);
if (state() != QAbstractSocket::ConnectedState)
return;
if ((portList == NULL) || (portList->size() == 0))
goto _exit;
{
OstProto::PortIdList *portIdList = new OstProto::PortIdList;
OstProto::Ack *ack = new OstProto::Ack;
PbRpcController *controller = new PbRpcController(portIdList, ack);
for (int i = 0; i < portList->size(); i++)
{
OstProto::PortId *portId = portIdList->add_port_id();
portId->set_id(portList->at(i));
}
serviceStub->clearDeviceNeighbors(controller, portIdList, ack,
NewCallback(this, &PortGroup::processClearDeviceNeighborsAck,
controller));
}
_exit:
return;
}
void PortGroup::processClearDeviceNeighborsAck(PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
delete controller;
}
void PortGroup::getPortStats()
{
//qDebug("In %s", __FUNCTION__);
if (state() != QAbstractSocket::ConnectedState)
goto _exit;
if (numPorts() <= 0)
goto _exit;
if (isGetStatsPending_)
goto _exit;
statsController->Reset();
isGetStatsPending_ = true;
serviceStub->getStats(statsController,
static_cast<OstProto::PortIdList*>(statsController->request()),
static_cast<OstProto::PortStatsList*>(statsController->response()),
NewCallback(this, &PortGroup::processPortStatsList));
_exit:
return;
}
void PortGroup::processPortStatsList()
{
//qDebug("In %s", __FUNCTION__);
if (statsController->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(statsController->ErrorString()));
goto _error_exit;
}
for(int i = 0; i < portStatsList_->port_stats_size(); i++)
{
uint id = portStatsList_->port_stats(i).port_id().id();
// FIXME: don't mix port id & index into mPorts[]
mPorts[id]->updateStats(portStatsList_->mutable_port_stats(i));
}
emit statsChanged(mPortGroupId);
_error_exit:
isGetStatsPending_ = false;
}
void PortGroup::clearPortStats(QList<uint> *portList)
{
qDebug("In %s", __FUNCTION__);
if (state() != QAbstractSocket::ConnectedState)
goto _exit;
{
OstProto::PortIdList *portIdList = new OstProto::PortIdList;
OstProto::Ack *ack = new OstProto::Ack;
PbRpcController *controller = new PbRpcController(portIdList, ack);
if (portList == NULL)
portIdList->CopyFrom(*portIdList_);
else
{
for (int i = 0; i < portList->size(); i++)
{
OstProto::PortId *portId = portIdList->add_port_id();
portId->set_id(portList->at(i));
}
}
serviceStub->clearStats(controller, portIdList, ack,
NewCallback(this, &PortGroup::processClearStatsAck, controller));
}
_exit:
return;
}
void PortGroup::processClearStatsAck(PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
// Refresh stats immediately after a stats clear/reset
getPortStats();
delete controller;
}