From 8336d77dfaee9686191e158d33be5d612e172f4d Mon Sep 17 00:00:00 2001 From: "Srivats P." Date: Thu, 22 May 2014 20:37:53 +0530 Subject: [PATCH] Added ReadWriteLock to protect RPC service methods in a multi-threaded environment --- server/myservice.cpp | 44 ++++++++++++++++++++++++++++++++++++++++---- server/myservice.h | 17 ++++++++++++++--- 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/server/myservice.cpp b/server/myservice.cpp index be0984a..5a591aa 100644 --- a/server/myservice.cpp +++ b/server/myservice.cpp @@ -38,12 +38,16 @@ MyService::MyService() PortManager *portManager = PortManager::instance(); int n = portManager->portCount(); - for (int i = 0; i < n; i++) + for (int i = 0; i < n; i++) { portInfo.append(portManager->port(i)); + portLock.append(new QReadWriteLock()); + } } MyService::~MyService() { + while (!portLock.isEmpty()) + delete portLock.takeFirst(); //! \todo Use a singleton destroyer instead // http://www.research.ibm.com/designpatterns/pubs/ph-jun96.txt delete PortManager::instance(); @@ -56,6 +60,9 @@ void MyService::getPortIdList(::google::protobuf::RpcController* /*controller*/, { qDebug("In %s", __PRETTY_FUNCTION__); + // No locks are needed here because the list does not change + // and neither does the port_id + for (int i = 0; i < portInfo.size(); i++) { ::OstProto::PortId *p; @@ -84,7 +91,9 @@ void MyService::getPortConfig(::google::protobuf::RpcController* /*controller*/, OstProto::Port *p; p = response->add_port(); + portLock[id]->lockForRead(); portInfo[id]->protoDataCopyInto(p); + portLock[id]->unlock(); } } @@ -107,7 +116,9 @@ void MyService::modifyPort(::google::protobuf::RpcController* /*controller*/, id = port.port_id().id(); if (id < portInfo.size()) { + portLock[id]->lockForWrite(); portInfo[id]->modify(port); + portLock[id]->unlock(); } } @@ -129,6 +140,7 @@ void MyService::getStreamIdList(::google::protobuf::RpcController* controller, goto _invalid_port; response->mutable_port_id()->set_id(portId); + portLock[portId]->lockForRead(); for (int i = 0; i < portInfo[portId]->streamCount(); i++) { OstProto::StreamId *s; @@ -136,11 +148,13 @@ void MyService::getStreamIdList(::google::protobuf::RpcController* controller, s = response->add_stream_id(); s->set_id(portInfo[portId]->streamAtIndex(i)->id()); } + portLock[portId]->unlock(); + done->Run(); return; _invalid_port: - controller->SetFailed("Invalid Port Id"); + controller->SetFailed("Invalid Port Id"); done->Run(); } @@ -158,9 +172,10 @@ void MyService::getStreamConfig(::google::protobuf::RpcController* controller, goto _invalid_port; response->mutable_port_id()->set_id(portId); + portLock[portId]->lockForRead(); for (int i = 0; i < request->stream_id_size(); i++) { - StreamBase *stream; + StreamBase *stream; OstProto::Stream *s; stream = portInfo[portId]->stream(request->stream_id(i).id()); @@ -170,6 +185,8 @@ void MyService::getStreamConfig(::google::protobuf::RpcController* controller, s = response->add_stream(); stream->protoDataCopyInto(*s); } + portLock[portId]->unlock(); + done->Run(); return; @@ -191,6 +208,7 @@ void MyService::addStream(::google::protobuf::RpcController* controller, if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; + portLock[portId]->lockForWrite(); for (int i = 0; i < request->stream_id_size(); i++) { StreamBase *stream; @@ -206,8 +224,8 @@ void MyService::addStream(::google::protobuf::RpcController* controller, stream = new StreamBase; stream->setId(request->stream_id(i).id()); portInfo[portId]->addStream(stream); - } + portLock[portId]->unlock(); //! \todo (LOW): fill-in response "Ack"???? @@ -232,8 +250,10 @@ void MyService::deleteStream(::google::protobuf::RpcController* controller, if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; + portLock[portId]->lockForWrite(); for (int i = 0; i < request->stream_id_size(); i++) portInfo[portId]->deleteStream(request->stream_id(i).id()); + portLock[portId]->unlock(); //! \todo (LOW): fill-in response "Ack"???? @@ -258,6 +278,7 @@ void MyService::modifyStream(::google::protobuf::RpcController* controller, if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; + portLock[portId]->lockForWrite(); for (int i = 0; i < request->stream_size(); i++) { StreamBase *stream; @@ -272,6 +293,7 @@ void MyService::modifyStream(::google::protobuf::RpcController* controller, if (portInfo[portId]->isDirty()) portInfo[portId]->updatePacketList(); + portLock[portId]->unlock(); //! \todo(LOW): fill-in response "Ack"???? @@ -298,7 +320,9 @@ void MyService::startTx(::google::protobuf::RpcController* /*controller*/, if ((portId < 0) || (portId >= portInfo.size())) continue; //! \todo (LOW): partial RPC? + portLock[portId]->lockForWrite(); portInfo[portId]->startTransmit(); + portLock[portId]->unlock(); } //! \todo (LOW): fill-in response "Ack"???? @@ -321,7 +345,9 @@ void MyService::stopTx(::google::protobuf::RpcController* /*controller*/, if ((portId < 0) || (portId >= portInfo.size())) continue; //! \todo (LOW): partial RPC? + portLock[portId]->lockForWrite(); portInfo[portId]->stopTransmit(); + portLock[portId]->unlock(); } //! \todo (LOW): fill-in response "Ack"???? @@ -344,7 +370,9 @@ void MyService::startCapture(::google::protobuf::RpcController* /*controller*/, if ((portId < 0) || (portId >= portInfo.size())) continue; //! \todo (LOW): partial RPC? + portLock[portId]->lockForWrite(); portInfo[portId]->startCapture(); + portLock[portId]->unlock(); } //! \todo (LOW): fill-in response "Ack"???? @@ -366,7 +394,9 @@ void MyService::stopCapture(::google::protobuf::RpcController* /*controller*/, if ((portId < 0) || (portId >= portInfo.size())) continue; //! \todo (LOW): partial RPC? + portLock[portId]->lockForWrite(); portInfo[portId]->stopCapture(); + portLock[portId]->unlock(); } //! \todo (LOW): fill-in response "Ack"???? @@ -387,9 +417,11 @@ void MyService::getCaptureBuffer(::google::protobuf::RpcController* controller, if ((portId < 0) || (portId >= portInfo.size())) goto _invalid_port; + portLock[portId]->lockForWrite(); portInfo[portId]->stopCapture(); static_cast(controller)->setBinaryBlob( portInfo[portId]->captureData()); + portLock[portId]->unlock(); done->Run(); return; @@ -421,11 +453,13 @@ void MyService::getStats(::google::protobuf::RpcController* /*controller*/, s->mutable_port_id()->set_id(request->port_id(i).id()); st = s->mutable_state(); + portLock[portId]->lockForRead(); st->set_link_state(portInfo[portId]->linkState()); st->set_is_transmit_on(portInfo[portId]->isTransmitOn()); st->set_is_capture_on(portInfo[portId]->isCaptureOn()); portInfo[portId]->stats(&stats); + portLock[portId]->unlock(); #if 0 if (portId == 2) @@ -466,7 +500,9 @@ void MyService::clearStats(::google::protobuf::RpcController* /*controller*/, if ((portId < 0) || (portId >= portInfo.size())) continue; //! \todo (LOW): partial RPC? + portLock[portId]->lockForWrite(); portInfo[portId]->resetStats(); + portLock[portId]->unlock(); } //! \todo (LOW): fill-in response "Ack"???? diff --git a/server/myservice.h b/server/myservice.h index 09cb479..3508dc4 100644 --- a/server/myservice.h +++ b/server/myservice.h @@ -20,10 +20,11 @@ along with this program. If not, see #ifndef _MY_SERVICE_H #define _MY_SERVICE_H -#include - #include "../common/protocol.pb.h" +#include +#include + #define MAX_PKT_HDR_SIZE 1536 #define MAX_STREAM_NAME_SIZE 64 @@ -98,8 +99,18 @@ public: ::google::protobuf::Closure* done); private: - /*! AbstractPort::id() and index into portInfo[] are same! */ + /* + * NOTES: + * - AbstractPort::id() and index into portInfo[] are same! + * - portLock[] size and order should be same as portInfo[] as the + * same index is used for both. + * - we assume that once populated by the constructor, the list(s) + * never change (objects in the list can change, but not the list itself) + * - locking is at port granularity, not at stream granularity - for now + * this seems sufficient. Revisit later, if required + */ QList portInfo; + QList portLock; };