From c71e3904f4234f0e93c8e684b07617d738fa18f9 Mon Sep 17 00:00:00 2001 From: Johann Sebastian Schicho Date: Wed, 24 Apr 2024 17:56:59 +0200 Subject: [PATCH] mqtt: read variable header independently from payload introduce a function to handle reading the variable header. Decodes the topic length and reads the topic. Additionally, in case a packet with QoS is sent, the QoS packet ID is read. The actual packet payload is then read separately. --- lib/mqtt.c | 159 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 155 insertions(+), 4 deletions(-) diff --git a/lib/mqtt.c b/lib/mqtt.c index 90f857dca5..d5f967f1a3 100644 --- a/lib/mqtt.c +++ b/lib/mqtt.c @@ -697,6 +697,149 @@ static void mqstate(struct Curl_easy *data, mqtt->nextstate = nextstate; } +static CURLcode mqtt_read_pub_varheader(struct Curl_easy *data) +{ + CURLcode result = CURLE_OK; + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); + size_t remlen = mq->remaining_length; + unsigned char qos_level = (mq->firstbyte >> 1) & 0x03; + /* nread and nprocessed give indices into the buffer + and may be reset when more data is received. + They do not count bytes across the whole packet. */ + ssize_t nread; + ssize_t nprocessed = 0; + char buffer[4*1024]; + ssize_t ntopic; + size_t rest = mq->npacket; + if(rest > sizeof(buffer)) + rest = sizeof(buffer); + result = Curl_xfer_recv(data, buffer, rest, &nread); + if(result) { + if(CURLE_AGAIN == result) { + infof(data, "E AGAIN"); + } + goto end; + } + if(!nread) { + infof(data, "server disconnected"); + result = CURLE_PARTIAL_FILE; + goto end; + } + + /* read topic */ + if(nread < 2) { + failf(data, "packet too short. topic length missing"); + result = CURLE_WEIRD_SERVER_REPLY; + goto end; + } + ntopic = (unsigned char)buffer[0] << 8 | (unsigned char)buffer[1]; + nprocessed += 2; + if(remlen < (size_t) nprocessed + ntopic) { + failf(data, "stated topic length longer than packet"); + result = CURLE_WEIRD_SERVER_REPLY; + goto end; + } + + /* write the topic and fetch more if topic exceeds the buffer */ + do { + ssize_t nwrite = ntopic; + if(nwrite > nread - nprocessed) + nwrite = nread - nprocessed; + result = Curl_client_write(data, CLIENTWRITE_HEADER, + &buffer[nprocessed], nwrite); + if(result) + goto end; + infof(data, "Topic: %.*s", (int)nwrite, &buffer[nprocessed]); + nprocessed += nwrite; + ntopic -= nwrite; + + /* more data needs to be fetched */ + if(ntopic) { + mq->npacket -= nread; + nprocessed = 0; + result = Curl_xfer_recv(data, buffer, sizeof(buffer), &nread); + if(result) { + if(CURLE_AGAIN == result) { + infof(data, "E AGAIN"); + } + goto end; + } + if(!nread) { + infof(data, "server disconnected"); + result = CURLE_PARTIAL_FILE; + goto end; + } + } + } while(ntopic); + + /* if QoS is set, message contains packet id */ + if(qos_level) { + unsigned char packetid_msb, packetid_lsb; + if(qos_level > 2) { + failf(data, "illegal QoS value of 3"); + result = CURLE_WEIRD_SERVER_REPLY; + goto end; + } + + if(nread - nprocessed >= 2) { + packetid_msb = buffer[nprocessed]; + packetid_lsb = buffer[nprocessed + 1]; + nprocessed += 2; + } + else { + bool qos_split = nread - nprocessed == 1; + if(qos_split) + packetid_msb = buffer[nprocessed]; + + mq->npacket -= nread; + nprocessed = 0; + result = Curl_xfer_recv(data, buffer, sizeof(buffer), &nread); + if(result) { + if(CURLE_AGAIN == result) { + infof(data, "E AGAIN"); + } + goto end; + } + if(!nread) { + infof(data, "server disconnected"); + result = CURLE_PARTIAL_FILE; + goto end; + } + + if(qos_split) { + if(nread < 1) { + failf(data, "packet too short. packet id missing"); + result = CURLE_WEIRD_SERVER_REPLY; + goto end; + } + packetid_lsb = buffer[0]; + nprocessed = 1; + } + else { + if(nread < 2) { + failf(data, "packet too short. packet id missing"); + result = CURLE_WEIRD_SERVER_REPLY; + goto end; + } + packetid_msb = buffer[0]; + packetid_lsb = buffer[1]; + nprocessed = 2; + } + } + infof(data, "Packet ID: %02x%02x", packetid_msb, packetid_lsb); + } + + /* flush remaining payload from the buffer */ + result = Curl_client_write(data, CLIENTWRITE_BODY, + &buffer[nprocessed], + nread - nprocessed); + if(result) + goto end; + mq->npacket -= nread; +end: + return result; +} + static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done) { @@ -755,9 +898,18 @@ MQTT_SUBACK_COMING: data->req.bytecount = 0; data->req.size = remlen; mq->npacket = remlen; /* get this many bytes */ + /* we are at the very start of the packet an need to read the header */ + result = mqtt_read_pub_varheader(data); + if(result) + goto end; + if(!mq->npacket) { + /* no more PUBLISH payload, back to subscribe wait state */ + mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); + break; + } FALLTHROUGH(); case MQTT_PUB_REMAIN: { - /* read rest of packet, but no more. Cap to buffer size */ + /* we are in the middle of a packet */ char buffer[4*1024]; size_t rest = mq->npacket; if(rest > sizeof(buffer)) @@ -765,7 +917,7 @@ MQTT_SUBACK_COMING: result = Curl_xfer_recv(data, buffer, rest, &nread); if(result) { if(CURLE_AGAIN == result) { - infof(data, "EEEE AAAAGAIN"); + infof(data, "E AGAIN"); } goto end; } @@ -778,11 +930,10 @@ MQTT_SUBACK_COMING: /* we received something */ mq->lastTime = curlx_now(); - /* if QoS is set, message contains packet id */ + /* read payload */ result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread); if(result) goto end; - mq->npacket -= nread; if(!mq->npacket) /* no more PUBLISH payload, back to subscribe wait state */