Added ReadWriteLock to protect RPC service methods in a multi-threaded environment

This commit is contained in:
Srivats P. 2014-05-22 20:37:53 +05:30
parent 8e14e0c15b
commit 8336d77dfa
2 changed files with 54 additions and 7 deletions

View File

@ -38,12 +38,16 @@ MyService::MyService()
PortManager *portManager = PortManager::instance(); PortManager *portManager = PortManager::instance();
int n = portManager->portCount(); int n = portManager->portCount();
for (int i = 0; i < n; i++) for (int i = 0; i < n; i++) {
portInfo.append(portManager->port(i)); portInfo.append(portManager->port(i));
portLock.append(new QReadWriteLock());
}
} }
MyService::~MyService() MyService::~MyService()
{ {
while (!portLock.isEmpty())
delete portLock.takeFirst();
//! \todo Use a singleton destroyer instead //! \todo Use a singleton destroyer instead
// http://www.research.ibm.com/designpatterns/pubs/ph-jun96.txt // http://www.research.ibm.com/designpatterns/pubs/ph-jun96.txt
delete PortManager::instance(); delete PortManager::instance();
@ -56,6 +60,9 @@ void MyService::getPortIdList(::google::protobuf::RpcController* /*controller*/,
{ {
qDebug("In %s", __PRETTY_FUNCTION__); qDebug("In %s", __PRETTY_FUNCTION__);
// No locks are needed here because the list does not change
// and neither does the port_id
for (int i = 0; i < portInfo.size(); i++) for (int i = 0; i < portInfo.size(); i++)
{ {
::OstProto::PortId *p; ::OstProto::PortId *p;
@ -84,7 +91,9 @@ void MyService::getPortConfig(::google::protobuf::RpcController* /*controller*/,
OstProto::Port *p; OstProto::Port *p;
p = response->add_port(); p = response->add_port();
portLock[id]->lockForRead();
portInfo[id]->protoDataCopyInto(p); portInfo[id]->protoDataCopyInto(p);
portLock[id]->unlock();
} }
} }
@ -107,7 +116,9 @@ void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/,
id = port.port_id().id(); id = port.port_id().id();
if (id < portInfo.size()) if (id < portInfo.size())
{ {
portLock[id]->lockForWrite();
portInfo[id]->modify(port); portInfo[id]->modify(port);
portLock[id]->unlock();
} }
} }
@ -129,6 +140,7 @@ void MyService::getStreamIdList(::google::protobuf::RpcController* controller,
goto _invalid_port; goto _invalid_port;
response->mutable_port_id()->set_id(portId); response->mutable_port_id()->set_id(portId);
portLock[portId]->lockForRead();
for (int i = 0; i < portInfo[portId]->streamCount(); i++) for (int i = 0; i < portInfo[portId]->streamCount(); i++)
{ {
OstProto::StreamId *s; OstProto::StreamId *s;
@ -136,11 +148,13 @@ void MyService::getStreamIdList(::google::protobuf::RpcController* controller,
s = response->add_stream_id(); s = response->add_stream_id();
s->set_id(portInfo[portId]->streamAtIndex(i)->id()); s->set_id(portInfo[portId]->streamAtIndex(i)->id());
} }
portLock[portId]->unlock();
done->Run(); done->Run();
return; return;
_invalid_port: _invalid_port:
controller->SetFailed("Invalid Port Id"); controller->SetFailed("Invalid Port Id");
done->Run(); done->Run();
} }
@ -158,9 +172,10 @@ void MyService::getStreamConfig(::google::protobuf::RpcController* controller,
goto _invalid_port; goto _invalid_port;
response->mutable_port_id()->set_id(portId); response->mutable_port_id()->set_id(portId);
portLock[portId]->lockForRead();
for (int i = 0; i < request->stream_id_size(); i++) for (int i = 0; i < request->stream_id_size(); i++)
{ {
StreamBase *stream; StreamBase *stream;
OstProto::Stream *s; OstProto::Stream *s;
stream = portInfo[portId]->stream(request->stream_id(i).id()); stream = portInfo[portId]->stream(request->stream_id(i).id());
@ -170,6 +185,8 @@ void MyService::getStreamConfig(::google::protobuf::RpcController* controller,
s = response->add_stream(); s = response->add_stream();
stream->protoDataCopyInto(*s); stream->protoDataCopyInto(*s);
} }
portLock[portId]->unlock();
done->Run(); done->Run();
return; return;
@ -191,6 +208,7 @@ void MyService::addStream(::google::protobuf::RpcController* controller,
if ((portId < 0) || (portId >= portInfo.size())) if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port; goto _invalid_port;
portLock[portId]->lockForWrite();
for (int i = 0; i < request->stream_id_size(); i++) for (int i = 0; i < request->stream_id_size(); i++)
{ {
StreamBase *stream; StreamBase *stream;
@ -206,8 +224,8 @@ void MyService::addStream(::google::protobuf::RpcController* controller,
stream = new StreamBase; stream = new StreamBase;
stream->setId(request->stream_id(i).id()); stream->setId(request->stream_id(i).id());
portInfo[portId]->addStream(stream); portInfo[portId]->addStream(stream);
} }
portLock[portId]->unlock();
//! \todo (LOW): fill-in response "Ack"???? //! \todo (LOW): fill-in response "Ack"????
@ -232,8 +250,10 @@ void MyService::deleteStream(::google::protobuf::RpcController* controller,
if ((portId < 0) || (portId >= portInfo.size())) if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port; goto _invalid_port;
portLock[portId]->lockForWrite();
for (int i = 0; i < request->stream_id_size(); i++) for (int i = 0; i < request->stream_id_size(); i++)
portInfo[portId]->deleteStream(request->stream_id(i).id()); portInfo[portId]->deleteStream(request->stream_id(i).id());
portLock[portId]->unlock();
//! \todo (LOW): fill-in response "Ack"???? //! \todo (LOW): fill-in response "Ack"????
@ -258,6 +278,7 @@ void MyService::modifyStream(::google::protobuf::RpcController* controller,
if ((portId < 0) || (portId >= portInfo.size())) if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port; goto _invalid_port;
portLock[portId]->lockForWrite();
for (int i = 0; i < request->stream_size(); i++) for (int i = 0; i < request->stream_size(); i++)
{ {
StreamBase *stream; StreamBase *stream;
@ -272,6 +293,7 @@ void MyService::modifyStream(::google::protobuf::RpcController* controller,
if (portInfo[portId]->isDirty()) if (portInfo[portId]->isDirty())
portInfo[portId]->updatePacketList(); portInfo[portId]->updatePacketList();
portLock[portId]->unlock();
//! \todo(LOW): fill-in response "Ack"???? //! \todo(LOW): fill-in response "Ack"????
@ -298,7 +320,9 @@ void MyService::startTx(::google::protobuf::RpcController* /*controller*/,
if ((portId < 0) || (portId >= portInfo.size())) if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo (LOW): partial RPC? continue; //! \todo (LOW): partial RPC?
portLock[portId]->lockForWrite();
portInfo[portId]->startTransmit(); portInfo[portId]->startTransmit();
portLock[portId]->unlock();
} }
//! \todo (LOW): fill-in response "Ack"???? //! \todo (LOW): fill-in response "Ack"????
@ -321,7 +345,9 @@ void MyService::stopTx(::google::protobuf::RpcController* /*controller*/,
if ((portId < 0) || (portId >= portInfo.size())) if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo (LOW): partial RPC? continue; //! \todo (LOW): partial RPC?
portLock[portId]->lockForWrite();
portInfo[portId]->stopTransmit(); portInfo[portId]->stopTransmit();
portLock[portId]->unlock();
} }
//! \todo (LOW): fill-in response "Ack"???? //! \todo (LOW): fill-in response "Ack"????
@ -344,7 +370,9 @@ void MyService::startCapture(::google::protobuf::RpcController* /*controller*/,
if ((portId < 0) || (portId >= portInfo.size())) if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo (LOW): partial RPC? continue; //! \todo (LOW): partial RPC?
portLock[portId]->lockForWrite();
portInfo[portId]->startCapture(); portInfo[portId]->startCapture();
portLock[portId]->unlock();
} }
//! \todo (LOW): fill-in response "Ack"???? //! \todo (LOW): fill-in response "Ack"????
@ -366,7 +394,9 @@ void MyService::stopCapture(::google::protobuf::RpcController* /*controller*/,
if ((portId < 0) || (portId >= portInfo.size())) if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo (LOW): partial RPC? continue; //! \todo (LOW): partial RPC?
portLock[portId]->lockForWrite();
portInfo[portId]->stopCapture(); portInfo[portId]->stopCapture();
portLock[portId]->unlock();
} }
//! \todo (LOW): fill-in response "Ack"???? //! \todo (LOW): fill-in response "Ack"????
@ -387,9 +417,11 @@ void MyService::getCaptureBuffer(::google::protobuf::RpcController* controller,
if ((portId < 0) || (portId >= portInfo.size())) if ((portId < 0) || (portId >= portInfo.size()))
goto _invalid_port; goto _invalid_port;
portLock[portId]->lockForWrite();
portInfo[portId]->stopCapture(); portInfo[portId]->stopCapture();
static_cast<PbRpcController*>(controller)->setBinaryBlob( static_cast<PbRpcController*>(controller)->setBinaryBlob(
portInfo[portId]->captureData()); portInfo[portId]->captureData());
portLock[portId]->unlock();
done->Run(); done->Run();
return; return;
@ -421,11 +453,13 @@ void MyService::getStats(::google::protobuf::RpcController* /*controller*/,
s->mutable_port_id()->set_id(request->port_id(i).id()); s->mutable_port_id()->set_id(request->port_id(i).id());
st = s->mutable_state(); st = s->mutable_state();
portLock[portId]->lockForRead();
st->set_link_state(portInfo[portId]->linkState()); st->set_link_state(portInfo[portId]->linkState());
st->set_is_transmit_on(portInfo[portId]->isTransmitOn()); st->set_is_transmit_on(portInfo[portId]->isTransmitOn());
st->set_is_capture_on(portInfo[portId]->isCaptureOn()); st->set_is_capture_on(portInfo[portId]->isCaptureOn());
portInfo[portId]->stats(&stats); portInfo[portId]->stats(&stats);
portLock[portId]->unlock();
#if 0 #if 0
if (portId == 2) if (portId == 2)
@ -466,7 +500,9 @@ void MyService::clearStats(::google::protobuf::RpcController* /*controller*/,
if ((portId < 0) || (portId >= portInfo.size())) if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo (LOW): partial RPC? continue; //! \todo (LOW): partial RPC?
portLock[portId]->lockForWrite();
portInfo[portId]->resetStats(); portInfo[portId]->resetStats();
portLock[portId]->unlock();
} }
//! \todo (LOW): fill-in response "Ack"???? //! \todo (LOW): fill-in response "Ack"????

View File

@ -20,10 +20,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#ifndef _MY_SERVICE_H #ifndef _MY_SERVICE_H
#define _MY_SERVICE_H #define _MY_SERVICE_H
#include <QList>
#include "../common/protocol.pb.h" #include "../common/protocol.pb.h"
#include <QList>
#include <QReadWriteLock>
#define MAX_PKT_HDR_SIZE 1536 #define MAX_PKT_HDR_SIZE 1536
#define MAX_STREAM_NAME_SIZE 64 #define MAX_STREAM_NAME_SIZE 64
@ -98,8 +99,18 @@ public:
::google::protobuf::Closure* done); ::google::protobuf::Closure* done);
private: private:
/*! AbstractPort::id() and index into portInfo[] are same! */ /*
* NOTES:
* - AbstractPort::id() and index into portInfo[] are same!
* - portLock[] size and order should be same as portInfo[] as the
* same index is used for both.
* - we assume that once populated by the constructor, the list(s)
* never change (objects in the list can change, but not the list itself)
* - locking is at port granularity, not at stream granularity - for now
* this seems sufficient. Revisit later, if required
*/
QList<AbstractPort*> portInfo; QList<AbstractPort*> portInfo;
QList<QReadWriteLock*> portLock;
}; };