mqtt: read variable header independently from payload

introduce a function to handle reading the variable header.
Decodes the topic length and reads the topic.
Additionally, in case a packet with QoS is sent, the QoS packet ID
is read.
The actual packet payload is then read separately.
This commit is contained in:
Johann Sebastian Schicho 2024-04-24 17:56:59 +02:00
parent 338f7e9e89
commit c71e3904f4
No known key found for this signature in database

View file

@ -697,6 +697,149 @@ static void mqstate(struct Curl_easy *data,
mqtt->nextstate = nextstate;
}
static CURLcode mqtt_read_pub_varheader(struct Curl_easy *data)
{
CURLcode result = CURLE_OK;
struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
size_t remlen = mq->remaining_length;
unsigned char qos_level = (mq->firstbyte >> 1) & 0x03;
/* nread and nprocessed give indices into the buffer
and may be reset when more data is received.
They do not count bytes across the whole packet. */
ssize_t nread;
ssize_t nprocessed = 0;
char buffer[4*1024];
ssize_t ntopic;
size_t rest = mq->npacket;
if(rest > sizeof(buffer))
rest = sizeof(buffer);
result = Curl_xfer_recv(data, buffer, rest, &nread);
if(result) {
if(CURLE_AGAIN == result) {
infof(data, "E AGAIN");
}
goto end;
}
if(!nread) {
infof(data, "server disconnected");
result = CURLE_PARTIAL_FILE;
goto end;
}
/* read topic */
if(nread < 2) {
failf(data, "packet too short. topic length missing");
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
ntopic = (unsigned char)buffer[0] << 8 | (unsigned char)buffer[1];
nprocessed += 2;
if(remlen < (size_t) nprocessed + ntopic) {
failf(data, "stated topic length longer than packet");
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
/* write the topic and fetch more if topic exceeds the buffer */
do {
ssize_t nwrite = ntopic;
if(nwrite > nread - nprocessed)
nwrite = nread - nprocessed;
result = Curl_client_write(data, CLIENTWRITE_HEADER,
&buffer[nprocessed], nwrite);
if(result)
goto end;
infof(data, "Topic: %.*s", (int)nwrite, &buffer[nprocessed]);
nprocessed += nwrite;
ntopic -= nwrite;
/* more data needs to be fetched */
if(ntopic) {
mq->npacket -= nread;
nprocessed = 0;
result = Curl_xfer_recv(data, buffer, sizeof(buffer), &nread);
if(result) {
if(CURLE_AGAIN == result) {
infof(data, "E AGAIN");
}
goto end;
}
if(!nread) {
infof(data, "server disconnected");
result = CURLE_PARTIAL_FILE;
goto end;
}
}
} while(ntopic);
/* if QoS is set, message contains packet id */
if(qos_level) {
unsigned char packetid_msb, packetid_lsb;
if(qos_level > 2) {
failf(data, "illegal QoS value of 3");
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
if(nread - nprocessed >= 2) {
packetid_msb = buffer[nprocessed];
packetid_lsb = buffer[nprocessed + 1];
nprocessed += 2;
}
else {
bool qos_split = nread - nprocessed == 1;
if(qos_split)
packetid_msb = buffer[nprocessed];
mq->npacket -= nread;
nprocessed = 0;
result = Curl_xfer_recv(data, buffer, sizeof(buffer), &nread);
if(result) {
if(CURLE_AGAIN == result) {
infof(data, "E AGAIN");
}
goto end;
}
if(!nread) {
infof(data, "server disconnected");
result = CURLE_PARTIAL_FILE;
goto end;
}
if(qos_split) {
if(nread < 1) {
failf(data, "packet too short. packet id missing");
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
packetid_lsb = buffer[0];
nprocessed = 1;
}
else {
if(nread < 2) {
failf(data, "packet too short. packet id missing");
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
packetid_msb = buffer[0];
packetid_lsb = buffer[1];
nprocessed = 2;
}
}
infof(data, "Packet ID: %02x%02x", packetid_msb, packetid_lsb);
}
/* flush remaining payload from the buffer */
result = Curl_client_write(data, CLIENTWRITE_BODY,
&buffer[nprocessed],
nread - nprocessed);
if(result)
goto end;
mq->npacket -= nread;
end:
return result;
}
static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
{
@ -755,9 +898,18 @@ MQTT_SUBACK_COMING:
data->req.bytecount = 0;
data->req.size = remlen;
mq->npacket = remlen; /* get this many bytes */
/* we are at the very start of the packet an need to read the header */
result = mqtt_read_pub_varheader(data);
if(result)
goto end;
if(!mq->npacket) {
/* no more PUBLISH payload, back to subscribe wait state */
mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
break;
}
FALLTHROUGH();
case MQTT_PUB_REMAIN: {
/* read rest of packet, but no more. Cap to buffer size */
/* we are in the middle of a packet */
char buffer[4*1024];
size_t rest = mq->npacket;
if(rest > sizeof(buffer))
@ -765,7 +917,7 @@ MQTT_SUBACK_COMING:
result = Curl_xfer_recv(data, buffer, rest, &nread);
if(result) {
if(CURLE_AGAIN == result) {
infof(data, "EEEE AAAAGAIN");
infof(data, "E AGAIN");
}
goto end;
}
@ -778,11 +930,10 @@ MQTT_SUBACK_COMING:
/* we received something */
mq->lastTime = curlx_now();
/* if QoS is set, message contains packet id */
/* read payload */
result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread);
if(result)
goto end;
mq->npacket -= nread;
if(!mq->npacket)
/* no more PUBLISH payload, back to subscribe wait state */