Bugfix: Fixed bugs in RPC Channel read function that causes the client to miss some messages or not receive any at all (see the diff for details); these bugs are day one bugs but adding the Notify RPC increased the chances of seeing them

Updates issue 144
This commit is contained in:
Srivats P. 2015-05-01 13:20:12 +05:30
parent fbaf6edcdf
commit f1378965ca
2 changed files with 60 additions and 49 deletions

View File

@ -9,14 +9,10 @@ public:
PbQtInputStream(QIODevice *dev) PbQtInputStream(QIODevice *dev)
: dev_(dev) {}; : dev_(dev) {};
int Read(void *buffer, int size) { int Read(void *buffer, int size) {
_top:
if (dev_->bytesAvailable()) if (dev_->bytesAvailable())
return dev_->read(static_cast<char*>(buffer), size); return dev_->read(static_cast<char*>(buffer), size);
else else
if (dev_->waitForReadyRead(-1)) return 0;
goto _top;
else
return -1; //return dev_->atEnd() ? 0 : -1;
} }
private: private:

View File

@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pbrpcchannel.h" #include "pbrpcchannel.h"
#include "pbqtio.h" #include "pbqtio.h"
#include <QtGlobal>
#include <qendian.h> #include <qendian.h>
static uchar msgBuf[4096]; static uchar msgBuf[4096];
@ -165,43 +166,36 @@ void PbRpcChannel::CallMethod(
void PbRpcChannel::on_mpSocket_readyRead() void PbRpcChannel::on_mpSocket_readyRead()
{ {
uchar *msg = (uchar*) &msgBuf; const uchar *msg;
int msgLen; int msgLen;
static bool parsing = false; static bool parsing = false;
static quint16 type, method; static quint16 type, method;
static quint32 len; static quint32 len;
/* _top:
* FIXME(HI): This function has some serious bugs! //qDebug("%s(entry): bytesAvail = %d", __FUNCTION__, mpSocket->bytesAvailable());
* # It should not read both directly from the socket and via instream;
* reading via instream will consume more than the msg being parsed,
* so next time we read from socket - we are in trouble!
* # It should ensure that it has read all the available data before
* it returns, otherwise the readyRead singal will not be raised
* again - so we'll never read another message even if they are all
* sitting in the buffer waiting for us!
*/
//qDebug("%s: bytesAvail = %d", __FUNCTION__, mpSocket->bytesAvailable());
if (!parsing) if (!parsing)
{ {
// Do we have an entire header? If not, we'll wait ... // Do we have an entire header? If not, we'll wait ...
if (mpSocket->bytesAvailable() < PB_HDR_SIZE) if (inStream->Next((const void**)&msg, &msgLen) == false) {
{ qDebug("No more data or stream error");
qDebug("client: not enough data available for a complete header"); goto _exit;
return;
} }
msgLen = mpSocket->read((char*)msg, PB_HDR_SIZE); if (msgLen < PB_HDR_SIZE) {
qDebug("read less than PB_HDR_SIZE bytes; putting back");
Q_ASSERT(msgLen == PB_HDR_SIZE); inStream->BackUp(msgLen);
Q_UNUSED(msgLen); goto _exit;
}
type = qFromBigEndian<quint16>(msg+0); type = qFromBigEndian<quint16>(msg+0);
method = qFromBigEndian<quint16>(msg+2); method = qFromBigEndian<quint16>(msg+2);
len = qFromBigEndian<quint32>(msg+4); len = qFromBigEndian<quint32>(msg+4);
if (msgLen > PB_HDR_SIZE)
inStream->BackUp(msgLen - PB_HDR_SIZE);
//BUFDUMP(msg, PB_HDR_SIZE); //BUFDUMP(msg, PB_HDR_SIZE);
//qDebug("type = %hu, method = %hu, len = %u", type, method, len); //qDebug("type = %hu, method = %hu, len = %u", type, method, len);
@ -214,23 +208,33 @@ void PbRpcChannel::on_mpSocket_readyRead()
{ {
static quint32 cumLen = 0; static quint32 cumLen = 0;
QIODevice *blob; QIODevice *blob;
int l;
blob = static_cast<PbRpcController*>(controller)->binaryBlob(); blob = static_cast<PbRpcController*>(controller)->binaryBlob();
Q_ASSERT(blob != NULL); Q_ASSERT(blob != NULL);
while ((cumLen < len) && mpSocket->bytesAvailable()) while (cumLen < len)
{ {
int l; if (inStream->Next((const void**)&msg, &msgLen) == false) {
//qDebug("No more data or stream error");
goto _exit;
}
l = mpSocket->read((char*)msgBuf, sizeof(msgBuf)); l = qMin(msgLen, int(len - cumLen));
blob->write((char*)msgBuf, l); blob->write((char*)msg, l);
cumLen += l; cumLen += l;
//qDebug("%s: bin blob rcvd %d/%d/%d", __PRETTY_FUNCTION__, l, cumLen, len);
}
if (l < msgLen) {
qDebug("read extra bytes after blob; putting back");
inStream->BackUp(msgLen - l);
} }
qDebug("%s: bin blob rcvd %d/%d", __PRETTY_FUNCTION__, cumLen, len); qDebug("%s: bin blob rcvd %d/%d", __PRETTY_FUNCTION__, cumLen, len);
if (cumLen < len) if (cumLen < len)
return; goto _exit;
cumLen = 0; cumLen = 0;
@ -251,9 +255,6 @@ void PbRpcChannel::on_mpSocket_readyRead()
} }
case PB_MSG_TYPE_RESPONSE: case PB_MSG_TYPE_RESPONSE:
//qDebug("client(%s) rcvd %d bytes", __FUNCTION__, msgLen);
//BUFDUMP(msg, msgLen);
if (!isPending) if (!isPending)
{ {
qWarning("not waiting for response"); qWarning("not waiting for response");
@ -293,20 +294,30 @@ void PbRpcChannel::on_mpSocket_readyRead()
{ {
static quint32 cumLen = 0; static quint32 cumLen = 0;
static QByteArray error; static QByteArray error;
int l;
while ((cumLen < len) && mpSocket->bytesAvailable()) while (cumLen < len)
{ {
int l; if (inStream->Next((const void**)&msg, &msgLen) == false) {
//qDebug("No more data or stream error");
goto _exit;
}
l = mpSocket->read((char*)msgBuf, sizeof(msgBuf)); l = qMin(msgLen, int(len - cumLen));
error.append(QByteArray((char*)msgBuf,l)); error.append(QByteArray((char*)msg, l));
cumLen += l; cumLen += l;
//qDebug("%s: error rcvd %d/%d/%d", __PRETTY_FUNCTION__, l, cumLen, len);
}
if (l < msgLen) {
qDebug("read extra bytes after error; putting back");
inStream->BackUp(msgLen - l);
} }
qDebug("%s: error rcvd %d/%d", __PRETTY_FUNCTION__, cumLen, len); qDebug("%s: error rcvd %d/%d", __PRETTY_FUNCTION__, cumLen, len);
if (cumLen < len) if (cumLen < len)
return; goto _exit;
static_cast<PbRpcController*>(controller)->SetFailed( static_cast<PbRpcController*>(controller)->SetFailed(
QString::fromUtf8(error, len)); QString::fromUtf8(error, len));
@ -332,16 +343,10 @@ void PbRpcChannel::on_mpSocket_readyRead()
case PB_MSG_TYPE_NOTIFY: case PB_MSG_TYPE_NOTIFY:
{ {
//qDebug("client(%s) rcvd %d bytes", __FUNCTION__, msgLen);
//BUFDUMP(msg, msgLen);
#if 1
notif = notifPrototype.New(); notif = notifPrototype.New();
#else
notif = new OstProto::Notification;
#endif
if (!notif) if (!notif)
{ {
qWarning("invalid notif type %d", method); qWarning("failed to alloc notify");
goto _error_exit; goto _error_exit;
} }
@ -396,8 +401,7 @@ void PbRpcChannel::on_mpSocket_readyRead()
call.done); call.done);
} }
_exit: goto _exit;
return;
_error_exit: _error_exit:
inStream->Skip(len); inStream->Skip(len);
@ -405,6 +409,17 @@ _error_exit2:
parsing = false; parsing = false;
qDebug("client(%s) discarding received msg <----", __FUNCTION__); qDebug("client(%s) discarding received msg <----", __FUNCTION__);
qDebug("method = %d\n---->", method); qDebug("method = %d\n---->", method);
_exit:
// If we have some data still available continue reading/parsing
if (inStream->Next((const void**)&msg, &msgLen)) {
if (msgLen >= PB_HDR_SIZE) {
inStream->BackUp(msgLen);
qDebug("===>> MORE DATA PENDING (%d bytes)... CONTINUE", msgLen);
goto _top;
}
}
if (mpSocket->bytesAvailable())
qDebug("%s (exit): bytesAvail = %lld", __FUNCTION__, mpSocket->bytesAvailable());
return; return;
} }