ostinato/server/myservice.cpp
Srivats P. 2581562ec5 Fixes
- Queued RPC calls would cause crashes due to invalid pointers to request/response and/or controller; this has been fixed
    - PbRpcController now takes ownership of request and response messages and
      will delete them when it itself is being deleted
    - This design mandates that request and response messages for each RPC call
      have to be allocated on the heap.
    - The convention for the Closure 'done' call now is to allocate and pass a
      pointer to the controller object to it which will delete it after use;
      this requires that controller itself be also allocated on the heap
      (NOTE: this is just a convention - not mandatory)
    - All existing RPC calls (in portgroup.cpp) have been changed to follow the
      above convention
- Reordering of queued RPC calls has been fixed
- PortManager is now destroyed at exit; because of this fix the per port temporary capture files are auto-removed at exit
- WinPcapPort destructor no longer deletes the monitor threads because the parent class PcapPort already does it
- Capture does not automatically (and incorrectly) stop after one packet if started immediately after a View Capture operation
- User is prompted to stop transmit on a port first if he tries to apply configuration changes on a port in 'transmit' state
2010-02-17 15:26:42 +00:00

452 lines
11 KiB
C++

#include "myservice.h"
#if 0
#include <qglobal.h>
#include <qendian.h>
#include "qdebug.h"
#include "../common/protocollistiterator.h"
#include "../common/abstractprotocol.h"
#endif
#include "../common/streambase.h"
#include "../rpc/pbrpccontroller.h"
#include "portmanager.h"
MyService::MyService()
{
PortManager *portManager = PortManager::instance();
int n = portManager->portCount();
for (int i = 0; i < n; i++)
portInfo.append(portManager->port(i));
}
MyService::~MyService()
{
//! \todo Use a singleton destroyer instead
// http://www.research.ibm.com/designpatterns/pubs/ph-jun96.txt
delete PortManager::instance();
}
void MyService::getPortIdList(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::Void* /*request*/,
::OstProto::PortIdList* response,
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < portInfo.size(); i++)
{
::OstProto::PortId *p;
p = response->add_port_id();
p->set_id(portInfo[i]->id());
}
done->Run();
}
void MyService::getPortConfig(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::PortConfigList* response,
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int id;
id = request->port_id(i).id();
if (id < portInfo.size())
{
OstProto::Port *p;
p = response->add_port();
portInfo[id]->protoDataCopyInto(p);
}
}
done->Run();
}
void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortConfigList* request,
::OstProto::Ack* /*response*/,
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_size(); i++)
{
OstProto::Port port;
int id;
port = request->port(i);
id = port.port_id().id();
if (id < portInfo.size())
{
portInfo[id]->modify(port);
}
}
//! \todo (LOW): fill-in response "Ack"????
done->Run();
}
void MyService::getStreamIdList(::google::protobuf::RpcController* controller,
const ::OstProto::PortId* request,
::OstProto::StreamIdList* response,
::google::protobuf::Closure* done)
{
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
response->mutable_port_id()->set_id(portId);
for (int i = 0; i < portInfo[portId]->streamCount(); i++)
{
OstProto::StreamId *s;
s = response->add_stream_id();
s->set_id(portInfo[portId]->streamAtIndex(i)->id());
}
done->Run();
return;
_invalid_port:
controller->SetFailed("Invalid Port Id");
done->Run();
}
void MyService::getStreamConfig(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::StreamConfigList* response,
::google::protobuf::Closure* done)
{
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
response->mutable_port_id()->set_id(portId);
for (int i = 0; i < request->stream_id_size(); i++)
{
StreamBase *stream;
OstProto::Stream *s;
stream = portInfo[portId]->stream(request->stream_id(i).id());
if (!stream)
continue; //! \todo(LOW): Partial status of RPC
s = response->add_stream();
stream->protoDataCopyInto(*s);
}
done->Run();
return;
_invalid_port:
controller->SetFailed("invalid portid");
done->Run();
}
void MyService::addStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::Ack* /*response*/,
::google::protobuf::Closure* done)
{
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
for (int i = 0; i < request->stream_id_size(); i++)
{
StreamBase *stream;
// If stream with same id as in request exists already ==> error!!
stream = portInfo[portId]->stream(request->stream_id(i).id());
if (stream)
continue; //! \todo (LOW): Partial status of RPC
// Append a new "default" stream - actual contents of the new stream is
// expected in a subsequent "modifyStream" request - set the stream id
// now itself however!!!
stream = new StreamBase;
stream->setId(request->stream_id(i).id());
portInfo[portId]->addStream(stream);
}
//! \todo (LOW): fill-in response "Ack"????
done->Run();
return;
_invalid_port:
controller->SetFailed("invalid portid");
done->Run();
}
void MyService::deleteStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamIdList* request,
::OstProto::Ack* /*response*/,
::google::protobuf::Closure* done)
{
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
for (int i = 0; i < request->stream_id_size(); i++)
portInfo[portId]->deleteStream(request->stream_id(i).id());
//! \todo (LOW): fill-in response "Ack"????
done->Run();
return;
_invalid_port:
controller->SetFailed("invalid portid");
done->Run();
}
void MyService::modifyStream(::google::protobuf::RpcController* controller,
const ::OstProto::StreamConfigList* request,
::OstProto::Ack* /*response*/,
::google::protobuf::Closure* done)
{
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->port_id().id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
for (int i = 0; i < request->stream_size(); i++)
{
StreamBase *stream;
stream = portInfo[portId]->stream(request->stream(i).stream_id().id());
if (stream)
{
stream->protoDataCopyFrom(request->stream(i));
portInfo[portId]->setDirty();
}
}
if (portInfo[portId]->isDirty())
portInfo[portId]->updatePacketList();
//! \todo(LOW): fill-in response "Ack"????
done->Run();
return;
_invalid_port:
controller->SetFailed("invalid portid");
done->Run();
}
void MyService::startTx(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* /*response*/,
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo (LOW): partial RPC?
portInfo[portId]->startTransmit();
}
//! \todo (LOW): fill-in response "Ack"????
done->Run();
}
void MyService::stopTx(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* /*response*/,
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo (LOW): partial RPC?
portInfo[portId]->stopTransmit();
}
//! \todo (LOW): fill-in response "Ack"????
done->Run();
}
void MyService::startCapture(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* /*response*/,
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo (LOW): partial RPC?
portInfo[portId]->startCapture();
}
//! \todo (LOW): fill-in response "Ack"????
done->Run();
}
void MyService::stopCapture(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* /*response*/,
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i=0; i < request->port_id_size(); i++)
{
int portId;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo (LOW): partial RPC?
portInfo[portId]->stopCapture();
}
//! \todo (LOW): fill-in response "Ack"????
done->Run();
}
void MyService::getCaptureBuffer(::google::protobuf::RpcController* controller,
const ::OstProto::PortId* request,
::OstProto::CaptureBuffer* /*response*/,
::google::protobuf::Closure* done)
{
int portId;
qDebug("In %s", __PRETTY_FUNCTION__);
portId = request->id();
if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port;
portInfo[portId]->stopCapture();
static_cast<PbRpcController*>(controller)->setBinaryBlob(
portInfo[portId]->captureData());
done->Run();
return;
_invalid_port:
controller->SetFailed("invalid portid");
done->Run();
}
void MyService::getStats(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::PortStatsList* response,
::google::protobuf::Closure* done)
{
//qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
AbstractPort::PortStats stats;
OstProto::PortStats *s;
OstProto::PortState *st;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo(LOW): partial rpc?
s = response->add_port_stats();
s->mutable_port_id()->set_id(request->port_id(i).id());
st = s->mutable_state();
st->set_link_state(portInfo[portId]->linkState());
st->set_is_transmit_on(portInfo[portId]->isTransmitOn());
st->set_is_capture_on(portInfo[portId]->isCaptureOn());
portInfo[portId]->stats(&stats);
#if 0
if (portId == 2)
qDebug(">%llu", stats.rxPkts);
#endif
s->set_rx_pkts(stats.rxPkts);
s->set_rx_bytes(stats.rxBytes);
s->set_rx_pps(stats.rxPps);
s->set_rx_bps(stats.rxBps);
s->set_tx_pkts(stats.txPkts);
s->set_tx_bytes(stats.txBytes);
s->set_tx_pps(stats.txPps);
s->set_tx_bps(stats.txBps);
}
done->Run();
}
void MyService::clearStats(::google::protobuf::RpcController* /*controller*/,
const ::OstProto::PortIdList* request,
::OstProto::Ack* /*response*/,
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo (LOW): partial RPC?
portInfo[portId]->resetStats();
}
//! \todo (LOW): fill-in response "Ack"????
done->Run();
}