Add new build() RPC

Make build an explicit RPC so that clients can call resolveNeighbor
before build
This commit is contained in:
Srivats P 2019-05-28 18:46:58 +05:30
parent a4bd6212ef
commit 7cf323202f
6 changed files with 138 additions and 19 deletions

View File

@ -502,6 +502,7 @@ void PortGroup::when_configApply(int portIndex)
{ {
OstProto::StreamIdList *streamIdList; OstProto::StreamIdList *streamIdList;
OstProto::StreamConfigList *streamConfigList; OstProto::StreamConfigList *streamConfigList;
OstProto::BuildConfig *buildConfig;
OstProto::Ack *ack; OstProto::Ack *ack;
PbRpcController *controller; PbRpcController *controller;
@ -625,6 +626,17 @@ void PortGroup::when_configApply(int portIndex)
NewCallback(this, &PortGroup::processModifyStreamAck, NewCallback(this, &PortGroup::processModifyStreamAck,
portIndex, controller)); portIndex, controller));
qDebug("finish apply by building ...");
logInfo(id(), mPorts[portIndex]->id(),
QString("Re-building packets"));
buildConfig = new OstProto::BuildConfig;
ack = new OstProto::Ack;
controller = new PbRpcController(buildConfig, ack);
buildConfig->mutable_port_id()->set_id(mPorts[portIndex]->id());
serviceStub->build(controller, buildConfig, ack,
NewCallback(this, &PortGroup::processApplyBuildAck,
portIndex, controller));
} }
void PortGroup::processAddDeviceGroupAck(PbRpcController *controller) void PortGroup::processAddDeviceGroupAck(PbRpcController *controller)
@ -752,6 +764,30 @@ void PortGroup::processModifyStreamAck(int portIndex,
OstProto::Ack *ack = static_cast<OstProto::Ack*>(controller->response()); OstProto::Ack *ack = static_cast<OstProto::Ack*>(controller->response());
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
logError(id(), mPorts[portIndex]->id(), controller->ErrorString());
goto _error_exit;
}
if (ack->status())
logError(id(), mPorts[portIndex]->id(),
QString::fromStdString(ack->notes()));
_error_exit:
mPorts[portIndex]->when_syncComplete();
delete controller;
}
void PortGroup::processApplyBuildAck(int portIndex, PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__);
OstProto::Ack *ack = static_cast<OstProto::Ack*>(controller->response());
qDebug("apply completed"); qDebug("apply completed");
logInfo(id(), mPorts[portIndex]->id(), QString("All port changes applied")); logInfo(id(), mPorts[portIndex]->id(), QString("All port changes applied"));
@ -768,8 +804,6 @@ void PortGroup::processModifyStreamAck(int portIndex,
QString::fromStdString(ack->notes())); QString::fromStdString(ack->notes()));
_error_exit: _error_exit:
mPorts[portIndex]->when_syncComplete();
emit applyFinished(); emit applyFinished();
mainWindow->setEnabled(true); mainWindow->setEnabled(true);
@ -891,18 +925,47 @@ void PortGroup::modifyPort(int portIndex, OstProto::Port portConfig)
QApplication::setOverrideCursor(QCursor(Qt::WaitCursor)); QApplication::setOverrideCursor(QCursor(Qt::WaitCursor));
mainWindow->setDisabled(true); mainWindow->setDisabled(true);
logInfo(id(), mPorts[portIndex]->id(),
QString("Modifying port configuration"));
OstProto::Port *port = portConfigList->add_port(); OstProto::Port *port = portConfigList->add_port();
port->CopyFrom(portConfig); port->CopyFrom(portConfig);
port->mutable_port_id()->set_id(mPorts[portIndex]->id()); port->mutable_port_id()->set_id(mPorts[portIndex]->id());
PbRpcController *controller = new PbRpcController(portConfigList, ack); PbRpcController *controller = new PbRpcController(portConfigList, ack);
serviceStub->modifyPort(controller, portConfigList, ack, serviceStub->modifyPort(controller, portConfigList, ack,
NewCallback(this, &PortGroup::processModifyPortAck, true, controller)); NewCallback(this, &PortGroup::processModifyPortAck, controller));
logInfo(id(), mPorts[portIndex]->id(),
QString("Re-building packets"));
OstProto::BuildConfig *buildConfig = new OstProto::BuildConfig;
ack = new OstProto::Ack;
controller = new PbRpcController(buildConfig, ack);
buildConfig->mutable_port_id()->set_id(mPorts[portIndex]->id());
serviceStub->build(controller, buildConfig, ack,
NewCallback(this, &PortGroup::processModifyPortBuildAck,
true, controller));
} }
void PortGroup::processModifyPortAck(bool restoreUi,PbRpcController *controller) void PortGroup::processModifyPortAck(PbRpcController *controller)
{ {
qDebug("In %s", __FUNCTION__);
if (controller->Failed())
{
qDebug("%s: rpc failed(%s)", __FUNCTION__,
qPrintable(controller->ErrorString()));
logError(id(), controller->ErrorString());
}
OstProto::Ack *ack = static_cast<OstProto::Ack*>(controller->response());
if (ack->status())
logError(id(), QString::fromStdString(ack->notes()));
delete controller;
}
void PortGroup::processModifyPortBuildAck(bool restoreUi, PbRpcController *controller)
{
qDebug("In %s", __FUNCTION__); qDebug("In %s", __FUNCTION__);
if (controller->Failed()) if (controller->Failed())
@ -1048,7 +1111,7 @@ void PortGroup::processStreamIdList(int portIndex, PbRpcController *controller)
serviceStub->modifyPort(controller, portConfigList, ack, serviceStub->modifyPort(controller, portConfigList, ack,
NewCallback(this, &PortGroup::processModifyPortAck, NewCallback(this, &PortGroup::processModifyPortAck,
false, controller)); controller));
} }
// add/modify deviceGroups // add/modify deviceGroups
@ -1115,6 +1178,15 @@ void PortGroup::processStreamIdList(int portIndex, PbRpcController *controller)
portIndex, controller)); portIndex, controller));
} }
// build packets using the new config
OstProto::BuildConfig *buildConfig = new OstProto::BuildConfig;
OstProto::Ack *ack = new OstProto::Ack;
controller = new PbRpcController(buildConfig, ack);
buildConfig->mutable_port_id()->set_id(mPorts[portIndex]->id());
serviceStub->build(controller, buildConfig, ack,
NewCallback(this, &PortGroup::processModifyPortBuildAck,
false, controller));
// delete newPortConfig // delete newPortConfig
atConnectPortConfig_[portIndex] = NULL; atConnectPortConfig_[portIndex] = NULL;

View File

@ -118,6 +118,7 @@ public:
void processAddStreamAck(PbRpcController *controller); void processAddStreamAck(PbRpcController *controller);
void processDeleteStreamAck(PbRpcController *controller); void processDeleteStreamAck(PbRpcController *controller);
void processModifyStreamAck(int portIndex, PbRpcController *controller); void processModifyStreamAck(int portIndex, PbRpcController *controller);
void processApplyBuildAck(int portIndex, PbRpcController *controller);
void processAddDeviceGroupAck(PbRpcController *controller); void processAddDeviceGroupAck(PbRpcController *controller);
void processDeleteDeviceGroupAck(PbRpcController *controller); void processDeleteDeviceGroupAck(PbRpcController *controller);
@ -127,7 +128,8 @@ public:
void processDeviceNeighbors(int portIndex, PbRpcController *controller); void processDeviceNeighbors(int portIndex, PbRpcController *controller);
void modifyPort(int portId, OstProto::Port portConfig); void modifyPort(int portId, OstProto::Port portConfig);
void processModifyPortAck(bool restoreUi, PbRpcController *controller); void processModifyPortAck(PbRpcController *controller);
void processModifyPortBuildAck(bool restoreUi, PbRpcController *controller);
void processUpdatedPortConfig(PbRpcController *controller); void processUpdatedPortConfig(PbRpcController *controller);
void getStreamIdList(); void getStreamIdList();

View File

@ -304,6 +304,9 @@ message Notification {
optional PortIdList port_id_list = 6; optional PortIdList port_id_list = 6;
} }
message BuildConfig {
required PortId port_id = 1;
}
/* /*
* Protocol Emulation * Protocol Emulation
@ -393,6 +396,8 @@ service OstService {
rpc getStreamStats(StreamGuidList) returns (StreamStatsList); rpc getStreamStats(StreamGuidList) returns (StreamStatsList);
rpc clearStreamStats(StreamGuidList) returns (Ack); rpc clearStreamStats(StreamGuidList) returns (Ack);
rpc build(BuildConfig) returns (Ack);
// XXX: Add new RPCs at the end only to preserve backward compatibility // XXX: Add new RPCs at the end only to preserve backward compatibility
} }

View File

@ -87,6 +87,8 @@ bool AbstractPort::canModify(const OstProto::Port &port, bool *dirty)
*dirty = true; *dirty = true;
allow = !isTransmitOn(); allow = !isTransmitOn();
} }
if (*dirty)
isSendQueueDirty_ = true;
return allow; return allow;
} }

View File

@ -132,7 +132,7 @@ void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/,
for (int i = 0; i < request->port_size(); i++) for (int i = 0; i < request->port_size(); i++)
{ {
OstProto::Port port; OstProto::Port port;
int id, frameError = 0; int id;
port = request->port(i); port = request->port(i);
id = port.port_id().id(); id = port.port_id().id();
@ -150,13 +150,7 @@ void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/,
portLock[id]->lockForWrite(); portLock[id]->lockForWrite();
portInfo[id]->modify(port); portInfo[id]->modify(port);
if (dirty)
frameError = portInfo[id]->updatePacketList();
portLock[id]->unlock(); portLock[id]->unlock();
if (frameError) {
error = true;
notes += frameValueErrorNotes(id, frameError);
}
notif->mutable_port_id_list()->add_port_id()->set_id(id); notif->mutable_port_id_list()->add_port_id()->set_id(id);
} }
else { else {
@ -373,7 +367,6 @@ void MyService::modifyStream(::google::protobuf::RpcController* controller,
bool error = false; bool error = false;
QString notes; QString notes;
int portId; int portId;
int frameError = 0;
qDebug("In %s", __PRETTY_FUNCTION__); qDebug("In %s", __PRETTY_FUNCTION__);
@ -400,13 +393,9 @@ void MyService::modifyStream(::google::protobuf::RpcController* controller,
"stream not found\n").arg(portId).arg(sid); "stream not found\n").arg(portId).arg(sid);
} }
} }
if (portInfo[portId]->isDirty())
frameError = portInfo[portId]->updatePacketList();
portLock[portId]->unlock(); portLock[portId]->unlock();
if (error || frameError) { if (error) {
notes += frameValueErrorNotes(portId, frameError);
response->set_status(OstProto::Ack::kRpcError); response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString()); response->set_notes(notes.toStdString());
} }
@ -812,6 +801,50 @@ _invalid_version:
done->Run(); done->Run();
} }
void MyService::build(::google::protobuf::RpcController* controller,
const ::OstProto::BuildConfig* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done)
{
QString notes;
int portId;
int frameError = 0;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
if (portInfo[portId]->isTransmitOn())
goto _port_busy;
portLock[portId]->lockForWrite();
if (portInfo[portId]->isDirty())
frameError = portInfo[portId]->updatePacketList();
portLock[portId]->unlock();
if (frameError) {
notes += frameValueErrorNotes(portId, frameError);
response->set_status(OstProto::Ack::kRpcError);
response->set_notes(notes.toStdString());
}
else
response->set_status(OstProto::Ack::kRpcSuccess);
done->Run();
return;
_port_busy:
controller->SetFailed(QString("Port %1 build: operation disallowed " "on transmitting port")
.arg(portId).toStdString());
goto _exit;
_invalid_port:
controller->SetFailed(QString("Port %1 build: invalid port")
.arg(portId).toStdString());
_exit:
done->Run();
}
/* /*
* =================================================================== * ===================================================================
* Device Emulation * Device Emulation

View File

@ -115,6 +115,11 @@ public:
::OstProto::VersionCompatibility* response, ::OstProto::VersionCompatibility* response,
::google::protobuf::Closure* done); ::google::protobuf::Closure* done);
virtual void build(::google::protobuf::RpcController* controller,
const ::OstProto::BuildConfig* request,
::OstProto::Ack* response,
::google::protobuf::Closure* done);
// DeviceGroup and Protocol Emulation // DeviceGroup and Protocol Emulation
virtual void getDeviceGroupIdList( virtual void getDeviceGroupIdList(
::google::protobuf::RpcController* controller, ::google::protobuf::RpcController* controller,