CLI: Added exception handling to python RPC

This commit is contained in:
Srivats P. 2014-06-11 20:39:55 +05:30
parent 02ac4e7dff
commit 30e9496270
2 changed files with 77 additions and 23 deletions

View File

@ -27,6 +27,7 @@ Makefile*
# ostinato generated files # ostinato generated files
version.cpp version.cpp
pkg_info.json
# vim swap files # vim swap files
*.swp *.swp

View File

@ -15,10 +15,12 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/> # along with this program. If not, see <http://www.gnu.org/licenses/>
from google.protobuf.message import EncodeError, DecodeError
from google.protobuf.service import RpcChannel from google.protobuf.service import RpcChannel
from google.protobuf.service import RpcController from google.protobuf.service import RpcController
import socket import socket
import struct import struct
import sys
class OstinatoRpcController(RpcController): class OstinatoRpcController(RpcController):
def __init__(self): def __init__(self):
@ -29,7 +31,13 @@ class OstinatoRpcChannel(RpcChannel):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def connect(self, host, port): def connect(self, host, port):
self.sock.connect((host, port)) self.peer = '%s:%d' % (host, port)
try:
self.sock.connect((host, port))
except socket.error, e:
print 'ERROR: Unable to connect to Drone %s (%s)' % (
self.peer, str(e))
sys.exit(1)
def disconnect(self): def disconnect(self):
self.sock.close() self.sock.close()
@ -40,36 +48,81 @@ class OstinatoRpcChannel(RpcChannel):
OST_PB_MSG_TYPE_RESPONSE = 2 OST_PB_MSG_TYPE_RESPONSE = 2
OST_PB_MSG_TYPE_BLOB = 3 OST_PB_MSG_TYPE_BLOB = 3
req = request.SerializeToString() try:
self.sock.sendall(struct.pack('>HHI', req = request.SerializeToString()
OST_PB_MSG_TYPE_REQUEST, method.index, len(req)) + req) self.sock.sendall(struct.pack('>HHI',
OST_PB_MSG_TYPE_REQUEST, method.index, len(req)) + req)
except EncodeError, e:
print 'ERROR: Failed to serialize %s arg for RPC %s() ' \
'to Drone %s (%s)' % (
type(request).__name__, method.name, self.peer, e)
sys.exit(1)
except socket.error, e:
print 'ERROR: Failed to invoke RPC %s() to Drone %s (%s)' % (
method.name, self.peer, e)
sys.exit(1)
# receive and parse header # receive and parse header
hdr = '' try:
while len(hdr) < OST_PB_MSG_HDR_SIZE: hdr = ''
chunk = self.sock.recv(OST_PB_MSG_HDR_SIZE - len(hdr)) while len(hdr) < OST_PB_MSG_HDR_SIZE:
if chunk == '': chunk = self.sock.recv(OST_PB_MSG_HDR_SIZE - len(hdr))
raise RuntimeError("socket connection broken") if chunk == '':
hdr = hdr + chunk raise RuntimeError("socket connection closed by peer")
hdr = hdr + chunk
except socket.error, e:
print 'ERROR: Failed to receive msg reply for RPC %s() ' \
'from Drone %s (%s)' % (
method.name, self.peer, e)
sys.exit(1)
except RuntimeError, e:
print 'ERROR: Drone %s closed connection receiving msg reply ' \
'for RPC %s() (%s)' % (
self.peer, method.name, e)
sys.exit(1)
(type, method, resp_len) = struct.unpack('>HHI', hdr) (msg_type, method_index, resp_len) = struct.unpack('>HHI', hdr)
# verify response method is same as the one requested
if method_index != method.index:
print 'ERROR: Received Reply for Method %d; expecting reply for ' \
'method %d (%s)' % (
method_index, method.index, method.name)
sys.exit(1)
# receive and parse the actual response message # receive and parse the actual response message
resp = '' try:
while len(resp) < resp_len: resp = ''
chunk = self.sock.recv(resp_len - len(resp)) while len(resp) < resp_len:
if chunk == '': chunk = self.sock.recv(resp_len - len(resp))
raise RuntimeError("socket connection broken") if chunk == '':
resp = resp + chunk raise RuntimeError("socket connection closed by peer")
resp = resp + chunk
except socket.error, e:
print 'ERROR: Failed to receive reply for RPC %s() ' \
'from Drone %s (%s)' % (
method.name, self.peer, e)
sys.exit(1)
except RuntimeError, e:
print 'ERROR: Drone %s closed connection receiving reply ' \
'for RPC %s() (%s)' % (
self.peer, method.name, e)
sys.exit(1)
if type == OST_PB_MSG_TYPE_RESPONSE: if msg_type == OST_PB_MSG_TYPE_RESPONSE:
response = response_class() try:
response.ParseFromString(resp) response = response_class()
elif type == OST_PB_MSG_TYPE_BLOB: response.ParseFromString(resp)
except DecodeError, e:
print 'ERROR: Failed to parse %s response for RPC %s() ' \
'from Drone %s (%s)' % (
type(response).__name__, method.name, self.peer, e)
sys.exit(1)
elif msg_type == OST_PB_MSG_TYPE_BLOB:
response = resp response = resp
else: else:
print 'unsupported msg type %d received in respone to %s' % \ print 'ERROR: unsupported msg type %d received in respone to %s' % \
type, method.name msg_type, method.name
controller.response = response controller.response = response