bufq: change read/write signatures

Change the signature of `bufq` functions from

* `ssize_t Curl_bufq_*(..., CURLcode *err)` to
* `CURLcode Curl_bufq_*(..., size_t *pn)`

This allows us to write slightly less code and avoids the ssize_t/size_t
conversions in many cases. Also, it gets the function in line with all
the other send/recv signatures.

Added helper functions in `cfilters.h` for sending from/receving into
a bufq.

Fuzzer now fails to build due to these changes and its testing of
the bufq API.

Closes #17396
This commit is contained in:
Stefan Eissing 2025-06-26 10:26:35 +02:00 committed by Viktor Szakats
parent 86eb054286
commit d4983ffc13
No known key found for this signature in database
GPG key ID: B5ABD165E2AEF201
15 changed files with 488 additions and 552 deletions

View file

@ -86,27 +86,26 @@ static size_t chunk_read(struct buf_chunk *chunk,
}
}
static ssize_t chunk_slurpn(struct buf_chunk *chunk, size_t max_len,
Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err)
static CURLcode chunk_slurpn(struct buf_chunk *chunk, size_t max_len,
Curl_bufq_reader *reader,
void *reader_ctx, size_t *pnread)
{
unsigned char *p = &chunk->x.data[chunk->w_offset];
size_t n = chunk->dlen - chunk->w_offset; /* free amount */
ssize_t nread;
CURLcode result;
*pnread = 0;
DEBUGASSERT(chunk->dlen >= chunk->w_offset);
if(!n) {
*err = CURLE_AGAIN;
return -1;
}
if(!n)
return CURLE_AGAIN;
if(max_len && n > max_len)
n = max_len;
nread = reader(reader_ctx, p, n, err);
if(nread > 0) {
DEBUGASSERT((size_t)nread <= n);
chunk->w_offset += nread;
result = reader(reader_ctx, p, n, pnread);
if(!result) {
DEBUGASSERT(*pnread <= n);
chunk->w_offset += *pnread;
}
return nread;
return result;
}
static void chunk_peek(const struct buf_chunk *chunk,
@ -361,81 +360,60 @@ static struct buf_chunk *get_non_full_tail(struct bufq *q)
return chunk;
}
ssize_t Curl_bufq_write(struct bufq *q,
const unsigned char *buf, size_t len,
CURLcode *err)
CURLcode Curl_bufq_write(struct bufq *q,
const unsigned char *buf, size_t len,
size_t *pnwritten)
{
struct buf_chunk *tail;
ssize_t nwritten = 0;
size_t n;
DEBUGASSERT(q->max_chunks > 0);
*pnwritten = 0;
while(len) {
tail = get_non_full_tail(q);
if(!tail) {
if((q->chunk_count < q->max_chunks) || (q->opts & BUFQ_OPT_SOFT_LIMIT)) {
*err = CURLE_OUT_OF_MEMORY;
return -1;
}
if((q->chunk_count < q->max_chunks) || (q->opts & BUFQ_OPT_SOFT_LIMIT))
/* should have gotten a tail, but did not */
return CURLE_OUT_OF_MEMORY;
break;
}
n = chunk_append(tail, buf, len);
if(!n)
break;
nwritten += n;
*pnwritten += n;
buf += n;
len -= n;
}
if(nwritten == 0 && len) {
*err = CURLE_AGAIN;
return -1;
}
*err = CURLE_OK;
return nwritten;
return (!*pnwritten && len) ? CURLE_AGAIN : CURLE_OK;
}
CURLcode Curl_bufq_cwrite(struct bufq *q,
const char *buf, size_t len,
size_t *pnwritten)
{
ssize_t n;
CURLcode result;
n = Curl_bufq_write(q, (const unsigned char *)buf, len, &result);
*pnwritten = (n < 0) ? 0 : (size_t)n;
return result;
return Curl_bufq_write(q, (const unsigned char *)buf, len, pnwritten);
}
ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
CURLcode *err)
CURLcode Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
size_t *pnread)
{
ssize_t nread = 0;
size_t n;
*err = CURLE_OK;
*pnread = 0;
while(len && q->head) {
n = chunk_read(q->head, buf, len);
size_t n = chunk_read(q->head, buf, len);
if(n) {
nread += n;
*pnread += n;
buf += n;
len -= n;
}
prune_head(q);
}
if(nread == 0) {
*err = CURLE_AGAIN;
return -1;
}
return nread;
return (!*pnread) ? CURLE_AGAIN : CURLE_OK;
}
CURLcode Curl_bufq_cread(struct bufq *q, char *buf, size_t len,
size_t *pnread)
{
ssize_t n;
CURLcode result;
n = Curl_bufq_read(q, (unsigned char *)buf, len, &result);
*pnread = (n < 0) ? 0 : (size_t)n;
return result;
return Curl_bufq_read(q, (unsigned char *)buf, len, pnread);
}
bool Curl_bufq_peek(struct bufq *q,
@ -487,156 +465,139 @@ void Curl_bufq_skip(struct bufq *q, size_t amount)
}
}
ssize_t Curl_bufq_pass(struct bufq *q, Curl_bufq_writer *writer,
void *writer_ctx, CURLcode *err)
CURLcode Curl_bufq_pass(struct bufq *q, Curl_bufq_writer *writer,
void *writer_ctx, size_t *pwritten)
{
const unsigned char *buf;
size_t blen;
ssize_t nwritten = 0;
CURLcode result = CURLE_OK;
*pwritten = 0;
while(Curl_bufq_peek(q, &buf, &blen)) {
ssize_t chunk_written;
size_t chunk_written;
chunk_written = writer(writer_ctx, buf, blen, err);
if(chunk_written < 0) {
if(!nwritten || *err != CURLE_AGAIN) {
/* blocked on first write or real error, fail */
nwritten = -1;
result = writer(writer_ctx, buf, blen, &chunk_written);
if(result) {
if((result == CURLE_AGAIN) && *pwritten) {
/* blocked on subsequent write, report success */
result = CURLE_OK;
}
break;
}
if(!chunk_written) {
if(!nwritten) {
if(!*pwritten) {
/* treat as blocked */
*err = CURLE_AGAIN;
nwritten = -1;
result = CURLE_AGAIN;
}
break;
}
Curl_bufq_skip(q, (size_t)chunk_written);
nwritten += chunk_written;
*pwritten += chunk_written;
Curl_bufq_skip(q, chunk_written);
}
return nwritten;
return result;
}
ssize_t Curl_bufq_write_pass(struct bufq *q,
const unsigned char *buf, size_t len,
Curl_bufq_writer *writer, void *writer_ctx,
CURLcode *err)
CURLcode Curl_bufq_write_pass(struct bufq *q,
const unsigned char *buf, size_t len,
Curl_bufq_writer *writer, void *writer_ctx,
size_t *pwritten)
{
ssize_t nwritten = 0, n;
CURLcode result = CURLE_OK;
size_t n;
*err = CURLE_OK;
*pwritten = 0;
while(len) {
if(Curl_bufq_is_full(q)) {
/* try to make room in case we are full */
n = Curl_bufq_pass(q, writer, writer_ctx, err);
if(n < 0) {
if(*err != CURLE_AGAIN) {
result = Curl_bufq_pass(q, writer, writer_ctx, &n);
if(result) {
if(result != CURLE_AGAIN) {
/* real error, fail */
return -1;
return result;
}
/* would block, bufq is full, give up */
break;
}
}
/* Add whatever is remaining now to bufq */
n = Curl_bufq_write(q, buf, len, err);
if(n < 0) {
if(*err != CURLE_AGAIN) {
/* Add to bufq as much as there is room for */
result = Curl_bufq_write(q, buf, len, &n);
if(result) {
if(result != CURLE_AGAIN)
/* real error, fail */
return -1;
}
/* no room in bufq */
break;
return result;
if((result == CURLE_AGAIN) && *pwritten)
/* we did write successfully before */
result = CURLE_OK;
return result;
}
/* edge case of writer returning 0 (and len is >0)
* break or we might enter an infinite loop here */
if(n == 0)
else if(n == 0)
/* edge case of writer returning 0 (and len is >0)
* break or we might enter an infinite loop here */
break;
/* Maybe only part of `data` has been added, continue to loop */
buf += (size_t)n;
len -= (size_t)n;
nwritten += (size_t)n;
/* Track what we added to bufq */
buf += n;
len -= n;
*pwritten += n;
}
if(!nwritten && len) {
*err = CURLE_AGAIN;
return -1;
}
*err = CURLE_OK;
return nwritten;
return (!*pwritten && len) ? CURLE_AGAIN : CURLE_OK;
}
ssize_t Curl_bufq_sipn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
CURLcode *err)
CURLcode Curl_bufq_sipn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
size_t *pnread)
{
struct buf_chunk *tail = NULL;
ssize_t nread;
*err = CURLE_AGAIN;
*pnread = 0;
tail = get_non_full_tail(q);
if(!tail) {
if(q->chunk_count < q->max_chunks) {
*err = CURLE_OUT_OF_MEMORY;
return -1;
}
if(q->chunk_count < q->max_chunks)
return CURLE_OUT_OF_MEMORY;
/* full, blocked */
*err = CURLE_AGAIN;
return -1;
return CURLE_AGAIN;
}
nread = chunk_slurpn(tail, max_len, reader, reader_ctx, err);
if(nread < 0) {
return -1;
}
else if(nread == 0) {
/* eof */
*err = CURLE_OK;
}
return nread;
return chunk_slurpn(tail, max_len, reader, reader_ctx, pnread);
}
/**
* Read up to `max_len` bytes and append it to the end of the buffer queue.
* if `max_len` is 0, no limit is imposed and the call behaves exactly
* the same as `Curl_bufq_slurp()`.
* Returns the total amount of buf read (may be 0) or -1 on other
* reader errors.
* Note that even in case of a -1 chunks may have been read and
* Returns the total amount of buf read (may be 0) in `pnread` or error
* Note that even in case of an error chunks may have been read and
* the buffer queue will have different length than before.
*/
static ssize_t bufq_slurpn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
CURLcode *err)
static CURLcode bufq_slurpn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
size_t *pnread)
{
ssize_t nread = 0, n;
CURLcode result;
*err = CURLE_AGAIN;
*pnread = 0;
while(1) {
n = Curl_bufq_sipn(q, max_len, reader, reader_ctx, err);
if(n < 0) {
if(!nread || *err != CURLE_AGAIN) {
size_t n;
result = Curl_bufq_sipn(q, max_len, reader, reader_ctx, &n);
if(result) {
if(!*pnread || result != CURLE_AGAIN) {
/* blocked on first read or real error, fail */
nread = -1;
return result;
}
else
*err = CURLE_OK;
result = CURLE_OK;
break;
}
else if(n == 0) {
/* eof */
*err = CURLE_OK;
result = CURLE_OK;
break;
}
nread += (size_t)n;
*pnread += n;
if(max_len) {
DEBUGASSERT((size_t)n <= max_len);
max_len -= (size_t)n;
DEBUGASSERT(n <= max_len);
max_len -= n;
if(!max_len)
break;
}
@ -644,11 +605,11 @@ static ssize_t bufq_slurpn(struct bufq *q, size_t max_len,
if(q->tail && !chunk_is_full(q->tail))
break;
}
return nread;
return result;
}
ssize_t Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err)
CURLcode Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
void *reader_ctx, size_t *pnread)
{
return bufq_slurpn(q, 0, reader, reader_ctx, err);
return bufq_slurpn(q, 0, reader, reader_ctx, pnread);
}

View file

@ -163,12 +163,11 @@ bool Curl_bufq_is_full(const struct bufq *q);
/**
* Write buf to the end of the buffer queue. The buf is copied
* and the amount of copied bytes is returned.
* A return code of -1 indicates an error, setting `err` to the
* cause. An err of CURLE_AGAIN is returned if the buffer queue is full.
* CURLE_AGAIN is returned if the buffer queue is full.
*/
ssize_t Curl_bufq_write(struct bufq *q,
const unsigned char *buf, size_t len,
CURLcode *err);
CURLcode Curl_bufq_write(struct bufq *q,
const unsigned char *buf, size_t len,
size_t *pnwritten);
CURLcode Curl_bufq_cwrite(struct bufq *q,
const char *buf, size_t len,
@ -177,11 +176,9 @@ CURLcode Curl_bufq_cwrite(struct bufq *q,
/**
* Read buf from the start of the buffer queue. The buf is copied
* and the amount of copied bytes is returned.
* A return code of -1 indicates an error, setting `err` to the
* cause. An err of CURLE_AGAIN is returned if the buffer queue is empty.
*/
ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
CURLcode *err);
CURLcode Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
size_t *pnread);
CURLcode Curl_bufq_cread(struct bufq *q, char *buf, size_t len,
size_t *pnread);
@ -208,9 +205,9 @@ bool Curl_bufq_peek_at(struct bufq *q, size_t offset,
*/
void Curl_bufq_skip(struct bufq *q, size_t amount);
typedef ssize_t Curl_bufq_writer(void *writer_ctx,
typedef CURLcode Curl_bufq_writer(void *writer_ctx,
const unsigned char *buf, size_t len,
CURLcode *err);
size_t *pwritten);
/**
* Passes the chunks in the buffer queue to the writer and returns
* the amount of buf written. A writer may return -1 and CURLE_AGAIN
@ -220,24 +217,23 @@ typedef ssize_t Curl_bufq_writer(void *writer_ctx,
* Note that in case of a -1 chunks may have been written and
* the buffer queue will have different length than before.
*/
ssize_t Curl_bufq_pass(struct bufq *q, Curl_bufq_writer *writer,
void *writer_ctx, CURLcode *err);
CURLcode Curl_bufq_pass(struct bufq *q, Curl_bufq_writer *writer,
void *writer_ctx, size_t *pwritten);
typedef ssize_t Curl_bufq_reader(void *reader_ctx,
unsigned char *buf, size_t len,
CURLcode *err);
typedef CURLcode Curl_bufq_reader(void *reader_ctx,
unsigned char *buf, size_t len,
size_t *pnread);
/**
* Read date and append it to the end of the buffer queue until the
* reader returns blocking or the queue is full. A reader returns
* -1 and CURLE_AGAIN to indicate blocking.
* Returns the total amount of buf read (may be 0) or -1 on other
* reader errors.
* Note that in case of a -1 chunks may have been read and
* CURLE_AGAIN to indicate blocking.
* Returns the total amount of buf read (may be 0) in `pnread` on success.
* Note that in case of an error chunks may have been read and
* the buffer queue will have different length than before.
*/
ssize_t Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err);
CURLcode Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
void *reader_ctx, size_t *pnread);
/**
* Read *once* up to `max_len` bytes and append it to the buffer.
@ -245,9 +241,9 @@ ssize_t Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
* Returns the total amount of buf read (may be 0) or -1 on other
* reader errors.
*/
ssize_t Curl_bufq_sipn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
CURLcode *err);
CURLcode Curl_bufq_sipn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
size_t *pnread);
/**
* Write buf to the end of the buffer queue.
@ -256,9 +252,9 @@ ssize_t Curl_bufq_sipn(struct bufq *q, size_t max_len,
* on or is placed into the buffer, depending on `len` and current
* amount buffered, chunk size, etc.
*/
ssize_t Curl_bufq_write_pass(struct bufq *q,
const unsigned char *buf, size_t len,
Curl_bufq_writer *writer, void *writer_ctx,
CURLcode *err);
CURLcode Curl_bufq_write_pass(struct bufq *q,
const unsigned char *buf, size_t len,
Curl_bufq_writer *writer, void *writer_ctx,
size_t *pwritten);
#endif /* HEADER_CURL_BUFQ_H */

View file

@ -224,41 +224,22 @@ static void drain_tunnel(struct Curl_cfilter *cf,
Curl_multi_mark_dirty(data);
}
static ssize_t proxy_nw_in_reader(void *reader_ctx,
unsigned char *buf, size_t buflen,
CURLcode *err)
{
struct Curl_cfilter *cf = reader_ctx;
if(cf) {
struct Curl_easy *data = CF_DATA_CURRENT(cf);
size_t nread;
*err = Curl_conn_cf_recv(cf->next, data, (char *)buf, buflen, &nread);
CURL_TRC_CF(data, cf, "[0] nw_in_reader(len=%zu) -> %d, %zu",
buflen, *err, nread);
return *err ? -1 : (ssize_t)nread;
}
*err = CURLE_FAILED_INIT;
return -1;
}
static ssize_t proxy_h2_nw_out_writer(void *writer_ctx,
const unsigned char *buf, size_t buflen,
CURLcode *err)
static CURLcode proxy_h2_nw_out_writer(void *writer_ctx,
const unsigned char *buf, size_t buflen,
size_t *pnwritten)
{
struct Curl_cfilter *cf = writer_ctx;
*pnwritten = 0;
if(cf) {
struct Curl_easy *data = CF_DATA_CURRENT(cf);
size_t nwritten;
*err = Curl_conn_cf_send(cf->next, data, (const char *)buf, buflen,
FALSE, &nwritten);
CURL_TRC_CF(data, cf, "[0] nw_out_writer(len=%zu) -> %zd, %d",
buflen, nwritten, *err);
return *err ? -1 : (ssize_t)nwritten;
CURLcode result;
result = Curl_conn_cf_send(cf->next, data, (const char *)buf, buflen,
FALSE, pnwritten);
CURL_TRC_CF(data, cf, "[0] nw_out_writer(len=%zu) -> %d, %zu",
buflen, result, *pnwritten);
return result;
}
*err = CURLE_FAILED_INIT;
return -1;
return CURLE_FAILED_INIT;
}
static int proxy_h2_client_new(struct Curl_cfilter *cf,
@ -403,16 +384,16 @@ static CURLcode proxy_h2_nw_out_flush(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
ssize_t nwritten;
size_t nwritten;
CURLcode result;
(void)data;
if(Curl_bufq_is_empty(&ctx->outbufq))
return CURLE_OK;
nwritten = Curl_bufq_pass(&ctx->outbufq, proxy_h2_nw_out_writer, cf,
&result);
if(nwritten < 0) {
result = Curl_bufq_pass(&ctx->outbufq, proxy_h2_nw_out_writer, cf,
&nwritten);
if(result) {
if(result == CURLE_AGAIN) {
CURL_TRC_CF(data, cf, "[0] flush nw send buffer(%zu) -> EAGAIN",
Curl_bufq_len(&ctx->outbufq));
@ -468,7 +449,7 @@ static CURLcode proxy_h2_progress_ingress(struct Curl_cfilter *cf,
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
CURLcode result = CURLE_OK;
ssize_t nread;
size_t nread;
/* Process network input buffer fist */
if(!Curl_bufq_is_empty(&ctx->inbufq)) {
@ -485,10 +466,10 @@ static CURLcode proxy_h2_progress_ingress(struct Curl_cfilter *cf,
Curl_bufq_is_empty(&ctx->inbufq) && /* and we consumed our input */
!Curl_bufq_is_full(&ctx->tunnel.recvbuf)) {
nread = Curl_bufq_slurp(&ctx->inbufq, proxy_nw_in_reader, cf, &result);
CURL_TRC_CF(data, cf, "[0] read %zu bytes nw data -> %zd, %d",
Curl_bufq_len(&ctx->inbufq), nread, result);
if(nread < 0) {
result = Curl_cf_recv_bufq(cf->next, data, &ctx->inbufq, 0, &nread);
CURL_TRC_CF(data, cf, "[0] read %zu bytes nw data -> %d, %zu",
Curl_bufq_len(&ctx->inbufq), result, nread);
if(result) {
if(result != CURLE_AGAIN) {
failf(data, "Failed receiving HTTP2 data");
return result;
@ -536,17 +517,18 @@ static ssize_t on_session_send(nghttp2_session *h2,
struct Curl_cfilter *cf = userp;
struct cf_h2_proxy_ctx *ctx = cf->ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
ssize_t nwritten;
size_t nwritten;
CURLcode result = CURLE_OK;
(void)h2;
(void)flags;
DEBUGASSERT(data);
nwritten = Curl_bufq_write_pass(&ctx->outbufq, buf, blen,
proxy_h2_nw_out_writer, cf, &result);
if(nwritten < 0) {
result = Curl_bufq_write_pass(&ctx->outbufq, buf, blen,
proxy_h2_nw_out_writer, cf, &nwritten);
if(result) {
if(result == CURLE_AGAIN) {
ctx->nw_out_blocked = 1;
return NGHTTP2_ERR_WOULDBLOCK;
}
failf(data, "Failed sending HTTP2 data");
@ -556,7 +538,8 @@ static ssize_t on_session_send(nghttp2_session *h2,
if(!nwritten)
return NGHTTP2_ERR_WOULDBLOCK;
return nwritten;
return (nwritten > SSIZE_T_MAX) ?
NGHTTP2_ERR_CALLBACK_FAILURE : (ssize_t)nwritten;
}
#ifndef CURL_DISABLE_VERBOSE_STRINGS
@ -806,7 +789,7 @@ static ssize_t tunnel_send_callback(nghttp2_session *session,
struct Curl_easy *data = CF_DATA_CURRENT(cf);
struct tunnel_stream *ts;
CURLcode result;
ssize_t nread;
size_t nread;
(void)source;
(void)data;
@ -820,8 +803,8 @@ static ssize_t tunnel_send_callback(nghttp2_session *session,
return NGHTTP2_ERR_CALLBACK_FAILURE;
DEBUGASSERT(ts == &ctx->tunnel);
nread = Curl_bufq_read(&ts->sendbuf, buf, length, &result);
if(nread < 0) {
result = Curl_bufq_read(&ts->sendbuf, buf, length, &nread);
if(result) {
if(result != CURLE_AGAIN)
return NGHTTP2_ERR_CALLBACK_FAILURE;
return NGHTTP2_ERR_DEFERRED;
@ -831,7 +814,8 @@ static ssize_t tunnel_send_callback(nghttp2_session *session,
CURL_TRC_CF(data, cf, "[%d] tunnel_send_callback -> %zd",
ts->stream_id, nread);
return nread;
return (nread > SSIZE_T_MAX) ?
NGHTTP2_ERR_CALLBACK_FAILURE : (ssize_t)nread;
}
static int tunnel_recv_callback(nghttp2_session *session, uint8_t flags,
@ -840,7 +824,7 @@ static int tunnel_recv_callback(nghttp2_session *session, uint8_t flags,
{
struct Curl_cfilter *cf = userp;
struct cf_h2_proxy_ctx *ctx = cf->ctx;
ssize_t nwritten;
size_t nwritten;
CURLcode result;
(void)flags;
@ -850,14 +834,15 @@ static int tunnel_recv_callback(nghttp2_session *session, uint8_t flags,
if(stream_id != ctx->tunnel.stream_id)
return NGHTTP2_ERR_CALLBACK_FAILURE;
nwritten = Curl_bufq_write(&ctx->tunnel.recvbuf, mem, len, &result);
if(nwritten < 0) {
result = Curl_bufq_write(&ctx->tunnel.recvbuf, mem, len, &nwritten);
if(result) {
if(result != CURLE_AGAIN)
return NGHTTP2_ERR_CALLBACK_FAILURE;
#ifdef DEBUGBUILD
nwritten = 0;
#endif
}
/* tunnel.recbuf has soft limit, any success MUST add all data */
DEBUGASSERT((size_t)nwritten == len);
return 0;
}
@ -1302,16 +1287,9 @@ static CURLcode tunnel_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
CURLcode result = CURLE_AGAIN;
*pnread = 0;
if(!Curl_bufq_is_empty(&ctx->tunnel.recvbuf)) {
ssize_t nread = Curl_bufq_read(&ctx->tunnel.recvbuf,
(unsigned char *)buf, len, &result);
if(nread < 0)
goto out;
DEBUGASSERT(nread > 0);
*pnread = (size_t)nread;
}
if(!*pnread) {
if(!Curl_bufq_is_empty(&ctx->tunnel.recvbuf))
result = Curl_bufq_cread(&ctx->tunnel.recvbuf, buf, len, pnread);
else {
if(ctx->tunnel.closed) {
result = h2_handle_tunnel_close(cf, data, pnread);
}
@ -1325,7 +1303,6 @@ static CURLcode tunnel_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
result = CURLE_AGAIN;
}
out:
CURL_TRC_CF(data, cf, "[%d] tunnel_recv(len=%zu) -> %d, %zu",
ctx->tunnel.stream_id, len, result, *pnread);
return result;
@ -1387,7 +1364,6 @@ static CURLcode cf_h2_proxy_send(struct Curl_cfilter *cf,
struct cf_h2_proxy_ctx *ctx = cf->ctx;
struct cf_call_data save;
int rv;
ssize_t nwritten = 0;
CURLcode result, r2;
(void)eos;
@ -1404,11 +1380,9 @@ static CURLcode cf_h2_proxy_send(struct Curl_cfilter *cf,
goto out;
}
nwritten = Curl_bufq_write(&ctx->tunnel.sendbuf, buf, len, &result);
CURL_TRC_CF(data, cf, "cf_send(), bufq_write %d, %zd", result, nwritten);
if(nwritten >= 0)
*pnwritten = (size_t)nwritten;
else if(result && (result != CURLE_AGAIN))
result = Curl_bufq_write(&ctx->tunnel.sendbuf, buf, len, pnwritten);
CURL_TRC_CF(data, cf, "cf_send(), bufq_write %d, %zd", result, *pnwritten);
if(result && (result != CURLE_AGAIN))
goto out;
if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
@ -1513,11 +1487,11 @@ static bool proxy_h2_connisalive(struct Curl_cfilter *cf,
not in use by any other transfer, there should not be any data here,
only "protocol frames" */
CURLcode result;
ssize_t nread = -1;
size_t nread;
*input_pending = FALSE;
nread = Curl_bufq_slurp(&ctx->inbufq, proxy_nw_in_reader, cf, &result);
if(nread != -1) {
result = Curl_cf_recv_bufq(cf->next, data, &ctx->inbufq, 0, &nread);
if(!result) {
if(proxy_h2_process_pending_input(cf, data, &result) < 0)
/* immediate error, considered dead */
alive = FALSE;

View file

@ -261,6 +261,66 @@ CURLcode Curl_cf_send(struct Curl_easy *data, int num,
return CURLE_FAILED_INIT;
}
struct cf_io_ctx {
struct Curl_easy *data;
struct Curl_cfilter *cf;
};
static CURLcode cf_bufq_reader(void *writer_ctx,
unsigned char *buf, size_t blen,
size_t *pnread)
{
struct cf_io_ctx *io = writer_ctx;
return Curl_conn_cf_recv(io->cf, io->data, (char *)buf, blen, pnread);
}
CURLcode Curl_cf_recv_bufq(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct bufq *bufq,
size_t maxlen,
size_t *pnread)
{
struct cf_io_ctx io;
if(!cf || !data) {
*pnread = 0;
return CURLE_BAD_FUNCTION_ARGUMENT;
}
io.data = data;
io.cf = cf;
return Curl_bufq_sipn(bufq, maxlen, cf_bufq_reader, &io, pnread);
}
static CURLcode cf_bufq_writer(void *writer_ctx,
const unsigned char *buf, size_t buflen,
size_t *pnwritten)
{
struct cf_io_ctx *io = writer_ctx;
return Curl_conn_cf_send(io->cf, io->data, (const char *)buf,
buflen, FALSE, pnwritten);
}
CURLcode Curl_cf_send_bufq(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct bufq *bufq,
const unsigned char *buf, size_t blen,
size_t *pnwritten)
{
struct cf_io_ctx io;
if(!cf || !data) {
*pnwritten = 0;
return CURLE_BAD_FUNCTION_ARGUMENT;
}
io.data = data;
io.cf = cf;
if(buf && blen)
return Curl_bufq_write_pass(bufq, buf, blen, cf_bufq_writer, &io,
pnwritten);
else
return Curl_bufq_pass(bufq, cf_bufq_writer, &io, pnwritten);
}
CURLcode Curl_cf_create(struct Curl_cfilter **pcf,
const struct Curl_cftype *cft,
void *ctx)

View file

@ -26,6 +26,7 @@
#include "curlx/timediff.h"
struct bufq;
struct Curl_cfilter;
struct Curl_easy;
struct Curl_dns_entry;
@ -477,6 +478,28 @@ CURLcode Curl_cf_send(struct Curl_easy *data, int sockindex,
const void *buf, size_t len, bool eos,
size_t *pnwritten);
/**
* Receive bytes from connection filter `cf` into `bufq`.
* Convenience wrappter around `Curl_bufq_sipn()`,
* so users do not have to implement a callback.
*/
CURLcode Curl_cf_recv_bufq(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct bufq *bufq,
size_t maxlen,
size_t *pnread);
/**
* Send bytes in `bufq` using connection filter `cf`.
* A convenience wrapper around `Curl_bufq_write_pass()`,
* so users do not have to implement a callback.
*/
CURLcode Curl_cf_send_bufq(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct bufq *bufq,
const unsigned char *buf, size_t blen,
size_t *pnwritten);
/**
* Notify connection filters that they need to setup data for
* a transfer.

View file

@ -464,37 +464,6 @@ static int h2_client_new(struct Curl_cfilter *cf,
return rc;
}
static ssize_t nw_in_reader(void *reader_ctx,
unsigned char *buf, size_t buflen,
CURLcode *err)
{
struct Curl_cfilter *cf = reader_ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
size_t nread;
*err = Curl_conn_cf_recv(cf->next, data, (char *)buf, buflen, &nread);
return *err ? -1 : (ssize_t)nread;
}
static ssize_t nw_out_writer(void *writer_ctx,
const unsigned char *buf, size_t buflen,
CURLcode *err)
{
struct Curl_cfilter *cf = writer_ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
size_t nwritten;
if(!data) {
*err = CURLE_OK;
return 0;
}
*err = Curl_conn_cf_send(cf->next, data, (const char *)buf,
buflen, FALSE, &nwritten);
CURL_TRC_CF(data, cf, "[0] egress write -> %d, %zu", *err, nwritten);
return *err ? -1 : (ssize_t)nwritten;
}
static ssize_t send_callback(nghttp2_session *h2,
const uint8_t *mem, size_t length, int flags,
void *userp);
@ -715,12 +684,12 @@ static bool http2_connisalive(struct Curl_cfilter *cf, struct Curl_easy *data,
not in use by any other transfer, there should not be any data here,
only "protocol frames" */
CURLcode result;
ssize_t nread = -1;
size_t nread;
*input_pending = FALSE;
nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result);
if(nread != -1) {
CURL_TRC_CF(data, cf, "%zd bytes stray data read before trying "
result = Curl_cf_recv_bufq(cf->next, data, &ctx->inbufq, 0, &nread);
if(!result) {
CURL_TRC_CF(data, cf, "%zu bytes stray data read before trying "
"h2 connection", nread);
if(h2_process_pending_input(cf, data, &result) < 0)
/* immediate error, considered dead */
@ -773,15 +742,16 @@ static CURLcode nw_out_flush(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_h2_ctx *ctx = cf->ctx;
ssize_t nwritten;
size_t nwritten;
CURLcode result;
(void)data;
if(Curl_bufq_is_empty(&ctx->outbufq))
return CURLE_OK;
nwritten = Curl_bufq_pass(&ctx->outbufq, nw_out_writer, cf, &result);
if(nwritten < 0) {
result = Curl_cf_send_bufq(cf->next, data, &ctx->outbufq, NULL, 0,
&nwritten);
if(result) {
if(result == CURLE_AGAIN) {
CURL_TRC_CF(data, cf, "flush nw send buffer(%zu) -> EAGAIN",
Curl_bufq_len(&ctx->outbufq));
@ -804,7 +774,7 @@ static ssize_t send_callback(nghttp2_session *h2,
struct Curl_cfilter *cf = userp;
struct cf_h2_ctx *ctx = cf->ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
ssize_t nwritten;
size_t nwritten;
CURLcode result = CURLE_OK;
(void)h2;
@ -812,11 +782,12 @@ static ssize_t send_callback(nghttp2_session *h2,
DEBUGASSERT(data);
if(!cf->connected)
nwritten = Curl_bufq_write(&ctx->outbufq, buf, blen, &result);
result = Curl_bufq_write(&ctx->outbufq, buf, blen, &nwritten);
else
nwritten = Curl_bufq_write_pass(&ctx->outbufq, buf, blen,
nw_out_writer, cf, &result);
if(nwritten < 0) {
result = Curl_cf_send_bufq(cf->next, data, &ctx->outbufq, buf, blen,
&nwritten);
if(result) {
if(result == CURLE_AGAIN) {
ctx->nw_out_blocked = 1;
return NGHTTP2_ERR_WOULDBLOCK;
@ -829,7 +800,8 @@ static ssize_t send_callback(nghttp2_session *h2,
ctx->nw_out_blocked = 1;
return NGHTTP2_ERR_WOULDBLOCK;
}
return nwritten;
return (nwritten > SSIZE_T_MAX) ?
NGHTTP2_ERR_CALLBACK_FAILURE : (ssize_t)nwritten;
}
@ -1770,6 +1742,7 @@ static ssize_t req_body_read_callback(nghttp2_session *session,
struct h2_stream_ctx *stream = NULL;
CURLcode result;
ssize_t nread;
size_t n;
(void)source;
(void)cf;
@ -1788,12 +1761,14 @@ static ssize_t req_body_read_callback(nghttp2_session *session,
if(!stream)
return NGHTTP2_ERR_CALLBACK_FAILURE;
nread = Curl_bufq_read(&stream->sendbuf, buf, length, &result);
if(nread < 0) {
result = Curl_bufq_read(&stream->sendbuf, buf, length, &n);
if(result) {
if(result != CURLE_AGAIN)
return NGHTTP2_ERR_CALLBACK_FAILURE;
nread = 0;
}
else
nread = (ssize_t)n;
CURL_TRC_CF(data_s, cf, "[%d] req_body_read(len=%zu) eos=%d -> %zd, %d",
stream_id, length, stream->body_eos, nread, result);
@ -2056,7 +2031,7 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream;
CURLcode result = CURLE_OK;
ssize_t nread;
size_t nread;
if(should_close_session(ctx)) {
CURL_TRC_CF(data, cf, "progress ingress, session is closed");
@ -2086,8 +2061,8 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
break;
}
nread = Curl_bufq_sipn(&ctx->inbufq, 0, nw_in_reader, cf, &result);
if(nread < 0) {
result = Curl_cf_recv_bufq(cf->next, data, &ctx->inbufq, 0, &nread);
if(result) {
if(result != CURLE_AGAIN) {
failf(data, "Failed receiving HTTP2 data: %d(%s)", result,
curl_easy_strerror(result));
@ -2101,9 +2076,8 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
break;
}
else {
CURL_TRC_CF(data, cf, "[0] ingress: read %zd bytes", nread);
data_max_bytes = (data_max_bytes > (size_t)nread) ?
(data_max_bytes - (size_t)nread) : 0;
CURL_TRC_CF(data, cf, "[0] ingress: read %zu bytes", nread);
data_max_bytes = (data_max_bytes > nread) ? (data_max_bytes - nread) : 0;
}
if(h2_process_pending_input(cf, data, &result))
@ -2197,7 +2171,7 @@ static ssize_t cf_h2_body_send(struct Curl_cfilter *cf,
CURLcode *err)
{
struct cf_h2_ctx *ctx = cf->ctx;
ssize_t nwritten;
size_t nwritten;
if(stream->closed) {
if(stream->resp_hds_complete) {
@ -2219,11 +2193,11 @@ static ssize_t cf_h2_body_send(struct Curl_cfilter *cf,
return -1;
}
nwritten = Curl_bufq_write(&stream->sendbuf, buf, blen, err);
if(nwritten < 0)
*err = Curl_bufq_write(&stream->sendbuf, buf, blen, &nwritten);
if(*err)
return -1;
if(eos && (blen == (size_t)nwritten))
if(eos && (blen == nwritten))
stream->body_eos = TRUE;
if(eos || !Curl_bufq_is_empty(&stream->sendbuf)) {
@ -2234,7 +2208,7 @@ static ssize_t cf_h2_body_send(struct Curl_cfilter *cf,
return -1;
}
}
return nwritten;
return (ssize_t)nwritten;
}
static CURLcode h2_submit(struct h2_stream_ctx **pstream,
@ -2971,17 +2945,17 @@ CURLcode Curl_http2_upgrade(struct Curl_easy *data,
/* Remaining data from the protocol switch reply is already using
* the switched protocol, ie. HTTP/2. We add that to the network
* inbufq. */
ssize_t copied;
size_t copied;
copied = Curl_bufq_write(&ctx->inbufq,
(const unsigned char *)mem, nread, &result);
if(copied < 0) {
result = Curl_bufq_write(&ctx->inbufq,
(const unsigned char *)mem, nread, &copied);
if(result) {
failf(data, "error on copying HTTP Upgrade response: %d", result);
return CURLE_RECV_ERROR;
}
if((size_t)copied < nread) {
if(copied < nread) {
failf(data, "connection buffer size could not take all data "
"from HTTP Upgrade response header: copied=%zd, datalen=%zu",
"from HTTP Upgrade response header: copied=%zu, datalen=%zu",
copied, nread);
return CURLE_HTTP2;
}

View file

@ -1992,14 +1992,12 @@ static CURLcode cr_mime_read(struct Curl_easy *data,
}
if(!Curl_bufq_is_empty(&ctx->tmpbuf)) {
ssize_t n = Curl_bufq_read(&ctx->tmpbuf, (unsigned char *)buf, blen,
&result);
if(n < 0) {
result = Curl_bufq_read(&ctx->tmpbuf, (unsigned char *)buf, blen, &nread);
if(result) {
ctx->errored = TRUE;
ctx->error_result = result;
return result;
}
nread = (size_t)n;
}
else if(blen <= 4) {
/* Curl_mime_read() may go into an infinite loop when reading
@ -2009,21 +2007,20 @@ static CURLcode cr_mime_read(struct Curl_easy *data,
CURL_TRC_READ(data, "cr_mime_read(len=%zu), small read, using tmp", blen);
nread = Curl_mime_read(tmp, 1, sizeof(tmp), ctx->part);
if(nread <= sizeof(tmp)) {
ssize_t n = Curl_bufq_write(&ctx->tmpbuf, (unsigned char *)tmp, nread,
&result);
if(n < 0) {
size_t n;
result = Curl_bufq_write(&ctx->tmpbuf, (unsigned char *)tmp, nread, &n);
if(result) {
ctx->errored = TRUE;
ctx->error_result = result;
return result;
}
/* stored it, read again */
n = Curl_bufq_read(&ctx->tmpbuf, (unsigned char *)buf, blen, &result);
if(n < 0) {
result = Curl_bufq_cread(&ctx->tmpbuf, buf, blen, &nread);
if(result) {
ctx->errored = TRUE;
ctx->error_result = result;
return result;
}
nread = (size_t)n;
}
}
else

View file

@ -338,20 +338,18 @@ static CURLcode req_flush(struct Curl_easy *data)
return CURLE_OK;
}
static ssize_t add_from_client(void *reader_ctx,
unsigned char *buf, size_t buflen,
CURLcode *err)
static CURLcode add_from_client(void *reader_ctx,
unsigned char *buf, size_t buflen,
size_t *pnread)
{
struct Curl_easy *data = reader_ctx;
size_t nread;
CURLcode result;
bool eos;
*err = Curl_client_read(data, (char *)buf, buflen, &nread, &eos);
if(*err)
return -1;
if(eos)
result = Curl_client_read(data, (char *)buf, buflen, pnread, &eos);
if(!result && eos)
data->req.eos_read = TRUE;
return (ssize_t)nread;
return result;
}
static CURLcode req_send_buffer_add(struct Curl_easy *data,
@ -359,13 +357,12 @@ static CURLcode req_send_buffer_add(struct Curl_easy *data,
size_t hds_len)
{
CURLcode result = CURLE_OK;
ssize_t n;
n = Curl_bufq_write(&data->req.sendbuf,
(const unsigned char *)buf, blen, &result);
if(n < 0)
size_t n;
result = Curl_bufq_cwrite(&data->req.sendbuf, buf, blen, &n);
if(result)
return result;
/* We rely on a SOFTLIMIT on sendbuf, so it can take all data in */
DEBUGASSERT((size_t)n == blen);
DEBUGASSERT(n == blen);
data->req.sendbuf_hds_len += hds_len;
return CURLE_OK;
}
@ -437,9 +434,10 @@ CURLcode Curl_req_send_more(struct Curl_easy *data)
!data->req.eos_read &&
!Curl_xfer_send_is_paused(data) &&
!Curl_bufq_is_full(&data->req.sendbuf)) {
ssize_t nread = Curl_bufq_sipn(&data->req.sendbuf, 0,
add_from_client, data, &result);
if(nread < 0 && result != CURLE_AGAIN)
size_t nread;
result = Curl_bufq_sipn(&data->req.sendbuf, 0,
add_from_client, data, &nread);
if(result && result != CURLE_AGAIN)
return result;
}

View file

@ -345,17 +345,16 @@ static CURLcode write_resp_raw(struct Curl_easy *data,
struct cf_msh3_ctx *ctx = h3_get_msh3_ctx(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result = CURLE_OK;
ssize_t nwritten;
size_t nwritten;
if(!stream)
return CURLE_RECV_ERROR;
nwritten = Curl_bufq_write(&stream->recvbuf, mem, memlen, &result);
if(nwritten < 0) {
result = Curl_bufq_write(&stream->recvbuf, mem, memlen, &nwritten);
if(result)
return result;
}
if((size_t)nwritten < memlen) {
if(nwritten < memlen) {
/* This MUST not happen. Our recbuf is dimensioned to hold the
* full max_stream_window and then some for this very reason. */
DEBUGASSERT(0);
@ -553,7 +552,6 @@ static CURLcode cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
{
struct cf_msh3_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nread = -1;
struct cf_call_data save;
CURLcode result = CURLE_OK;
@ -572,13 +570,11 @@ static CURLcode cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
}
if(!Curl_bufq_is_empty(&stream->recvbuf)) {
nread = Curl_bufq_read(&stream->recvbuf,
(unsigned char *)buf, len, &result);
CURL_TRC_CF(data, cf, "read recvbuf(len=%zu) -> %zd, %d",
len, nread, result);
if(nread < 0)
result = Curl_bufq_cread(&stream->recvbuf, buf, len, pnread);
CURL_TRC_CF(data, cf, "read recvbuf(len=%zu) -> %d, %zu",
len, result, *pnread);
if(result)
goto out;
*pnread = (size_t)nread;
if(stream->closed)
h3_drain_stream(cf, data);
}

View file

@ -1644,14 +1644,12 @@ static CURLcode cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out;
}
else {
ssize_t sent;
sent = Curl_bufq_write(&stream->sendbuf, buf, len, &result);
result = Curl_bufq_write(&stream->sendbuf, buf, len, pnwritten);
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] cf_send, add to "
"sendbuf(len=%zu) -> %zd, %d",
stream->id, len, sent, result);
if(sent < 0)
"sendbuf(len=%zu) -> %d, %zu",
stream->id, len, result, *pnwritten);
if(result)
goto out;
*pnwritten = (size_t)sent;
(void)nghttp3_conn_resume_stream(ctx->h3conn, stream->id);
}
@ -1730,9 +1728,9 @@ static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
* Read a network packet to send from ngtcp2 into `buf`.
* Return number of bytes written or -1 with *err set.
*/
static ssize_t read_pkt_to_send(void *userp,
unsigned char *buf, size_t buflen,
CURLcode *err)
static CURLcode read_pkt_to_send(void *userp,
unsigned char *buf, size_t buflen,
size_t *pnread)
{
struct pkt_io_ctx *x = userp;
struct cf_ngtcp2_ctx *ctx = x->cf->ctx;
@ -1742,7 +1740,9 @@ static ssize_t read_pkt_to_send(void *userp,
uint32_t flags;
int64_t stream_id;
int fin;
ssize_t nwritten = 0, n;
ssize_t n;
*pnread = 0;
veccnt = 0;
stream_id = -1;
fin = 0;
@ -1754,7 +1754,6 @@ static ssize_t read_pkt_to_send(void *userp,
* When ngtcp2 is happy (because it has no other frame that would fit
* or it has nothing more to send), it returns the total length
* of the assembled packet. This may be 0 if there was nothing to send. */
*err = CURLE_OK;
for(;;) {
if(ctx->h3conn && ngtcp2_conn_get_max_data_left(ctx->qconn)) {
@ -1764,8 +1763,7 @@ static ssize_t read_pkt_to_send(void *userp,
failf(x->data, "nghttp3_conn_writev_stream returned error: %s",
nghttp3_strerror((int)veccnt));
cf_ngtcp2_h3_err_set(x->cf, x->data, (int)veccnt);
*err = CURLE_SEND_ERROR;
return -1;
return CURLE_SEND_ERROR;
}
}
@ -1777,9 +1775,7 @@ static ssize_t read_pkt_to_send(void *userp,
(const ngtcp2_vec *)vec, veccnt, x->ts);
if(n == 0) {
/* nothing to send */
*err = CURLE_AGAIN;
nwritten = -1;
goto out;
return CURLE_AGAIN;
}
else if(n < 0) {
switch(n) {
@ -1811,9 +1807,7 @@ static ssize_t read_pkt_to_send(void *userp,
failf(x->data, "ngtcp2_conn_writev_stream returned error: %s",
ngtcp2_strerror((int)n));
cf_ngtcp2_err_set(x->cf, x->data, (int)n);
*err = CURLE_SEND_ERROR;
nwritten = -1;
goto out;
return CURLE_SEND_ERROR;
}
}
@ -1829,12 +1823,10 @@ static ssize_t read_pkt_to_send(void *userp,
if(n > 0) {
/* packet assembled, leave */
nwritten = n;
goto out;
*pnread = (size_t)n;
return CURLE_OK;
}
}
out:
return nwritten;
}
static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
@ -1842,7 +1834,7 @@ static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
struct pkt_io_ctx *pktx)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
ssize_t nread;
size_t nread;
size_t max_payload_size, path_max_payload_size, max_pktcnt;
size_t pktcnt = 0;
size_t gsolen = 0; /* this disables gso until we have a clue */
@ -1887,9 +1879,9 @@ static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
for(;;) {
/* add the next packet to send, if any, to our buffer */
nread = Curl_bufq_sipn(&ctx->q.sendbuf, max_payload_size,
read_pkt_to_send, pktx, &curlcode);
if(nread < 0) {
curlcode = Curl_bufq_sipn(&ctx->q.sendbuf, max_payload_size,
read_pkt_to_send, pktx, &nread);
if(curlcode) {
if(curlcode != CURLE_AGAIN)
return curlcode;
/* Nothing more to add, flush and leave */
@ -1908,10 +1900,10 @@ static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
if(pktcnt == 0) {
/* first packet in buffer. This is either of a known, "good"
* payload size or it is a PMTUD. We will see. */
gsolen = (size_t)nread;
gsolen = nread;
}
else if((size_t)nread > gsolen ||
(gsolen > path_max_payload_size && (size_t)nread != gsolen)) {
else if(nread > gsolen ||
(gsolen > path_max_payload_size && nread != gsolen)) {
/* The just added packet is a PMTUD *or* the one(s) before the
* just added were PMTUD and the last one is smaller.
* Flush the buffer before the last add. */
@ -1928,7 +1920,7 @@ static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
continue;
}
if(++pktcnt >= max_pktcnt || (size_t)nread < gsolen) {
if(++pktcnt >= max_pktcnt || nread < gsolen) {
/* Reached MAX_PKT_BURST *or*
* the capacity of our buffer *or*
* last add was shorter than the previous ones, flush */
@ -2102,8 +2094,9 @@ static CURLcode cf_ngtcp2_shutdown(struct Curl_cfilter *cf,
/* Ignore amount written. sendbuf was empty and has always room for
* NGTCP2_MAX_UDP_PAYLOAD_SIZE. It can only completely fail, in which
* case `result` is set non zero. */
(void)Curl_bufq_write(&ctx->q.sendbuf, (const unsigned char *)buffer,
(size_t)nwritten, &result);
size_t n;
result = Curl_bufq_write(&ctx->q.sendbuf, (const unsigned char *)buffer,
(size_t)nwritten, &n);
if(result) {
CURL_TRC_CF(data, cf, "error %d adding shutdown packets to sendbuf, "
"aborting shutdown", result);

View file

@ -766,21 +766,20 @@ static CURLcode write_resp_raw(struct Curl_cfilter *cf,
struct cf_osslq_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result = CURLE_OK;
ssize_t nwritten;
size_t nwritten;
(void)cf;
if(!stream) {
return CURLE_RECV_ERROR;
}
nwritten = Curl_bufq_write(&stream->recvbuf, mem, memlen, &result);
if(nwritten < 0) {
result = Curl_bufq_write(&stream->recvbuf, mem, memlen, &nwritten);
if(result)
return result;
}
if(!flow)
stream->recv_buf_nonflow += (size_t)nwritten;
if((size_t)nwritten < memlen) {
if(nwritten < memlen) {
/* This MUST not happen. Our recbuf is dimensioned to hold the
* full max_stream_window and then some for this very reason. */
DEBUGASSERT(0);
@ -1242,27 +1241,24 @@ struct h3_quic_recv_ctx {
struct cf_osslq_stream *s;
};
static ssize_t h3_quic_recv(void *reader_ctx,
static CURLcode h3_quic_recv(void *reader_ctx,
unsigned char *buf, size_t len,
CURLcode *err)
size_t *pnread)
{
struct h3_quic_recv_ctx *x = reader_ctx;
size_t nread;
int rv;
*err = CURLE_OK;
rv = SSL_read_ex(x->s->ssl, buf, len, &nread);
rv = SSL_read_ex(x->s->ssl, buf, len, pnread);
if(rv <= 0) {
int detail = SSL_get_error(x->s->ssl, rv);
if(detail == SSL_ERROR_WANT_READ || detail == SSL_ERROR_WANT_WRITE) {
*err = CURLE_AGAIN;
return -1;
return CURLE_AGAIN;
}
else if(detail == SSL_ERROR_ZERO_RETURN) {
CURL_TRC_CF(x->data, x->cf, "[%" FMT_PRId64 "] h3_quic_recv -> EOS",
x->s->id);
x->s->recvd_eos = TRUE;
return 0;
return CURLE_OK;
}
else if(SSL_get_stream_read_state(x->s->ssl) ==
SSL_STREAM_STATE_RESET_REMOTE) {
@ -1275,14 +1271,13 @@ static ssize_t h3_quic_recv(void *reader_ctx,
x->s->reset = TRUE;
}
x->s->recvd_eos = TRUE;
return 0;
return CURLE_OK;
}
else {
*err = cf_osslq_ssl_err(x->cf, x->data, detail, CURLE_RECV_ERROR);
return -1;
return cf_osslq_ssl_err(x->cf, x->data, detail, CURLE_RECV_ERROR);
}
}
return (ssize_t)nread;
return CURLE_OK;
}
static CURLcode cf_osslq_stream_recv(struct cf_osslq_stream *s,
@ -1292,6 +1287,7 @@ static CURLcode cf_osslq_stream_recv(struct cf_osslq_stream *s,
struct cf_osslq_ctx *ctx = cf->ctx;
CURLcode result = CURLE_OK;
ssize_t nread;
size_t n;
struct h3_quic_recv_ctx x;
bool eagain = FALSE;
size_t total_recv_len = 0;
@ -1307,8 +1303,8 @@ static CURLcode cf_osslq_stream_recv(struct cf_osslq_stream *s,
(total_recv_len < H3_STREAM_CHUNK_SIZE)) {
if(Curl_bufq_is_empty(&s->recvbuf) && !s->recvd_eos) {
while(!eagain && !s->recvd_eos && !Curl_bufq_is_full(&s->recvbuf)) {
nread = Curl_bufq_sipn(&s->recvbuf, 0, h3_quic_recv, &x, &result);
if(nread < 0) {
result = Curl_bufq_sipn(&s->recvbuf, 0, h3_quic_recv, &x, &n);
if(result) {
if(result != CURLE_AGAIN)
goto out;
result = CURLE_OK;
@ -2045,14 +2041,12 @@ static CURLcode cf_osslq_send(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out;
}
else {
nwritten = Curl_bufq_write(&stream->sendbuf, buf, len, &result);
result = Curl_bufq_write(&stream->sendbuf, buf, len, pnwritten);
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] cf_send, add to "
"sendbuf(len=%zu) -> %zd, %d",
stream->s.id, len, nwritten, result);
if(nwritten < 0) {
"sendbuf(len=%zu) -> %d, %zu",
stream->s.id, len, result, *pnwritten);
if(result)
goto out;
}
*pnwritten = (size_t)nwritten;
(void)nghttp3_conn_resume_stream(ctx->h3.conn, stream->s.id);
}
@ -2099,7 +2093,6 @@ static CURLcode cf_osslq_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
{
struct cf_osslq_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nread = -1;
struct cf_call_data save;
CURLcode result = CURLE_OK, r2;
@ -2117,14 +2110,12 @@ static CURLcode cf_osslq_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
}
if(!Curl_bufq_is_empty(&stream->recvbuf)) {
nread = Curl_bufq_read(&stream->recvbuf,
(unsigned char *)buf, len, &result);
if(nread < 0) {
result = Curl_bufq_cread(&stream->recvbuf, buf, len, pnread);
if(result) {
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] read recvbuf(len=%zu) "
"-> %zd, %d", stream->s.id, len, nread, result);
"-> %d, %zu", stream->s.id, len, result, *pnread);
goto out;
}
*pnread = (size_t)nread;
}
r2 = cf_progress_ingress(cf, data);
@ -2135,14 +2126,12 @@ static CURLcode cf_osslq_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
/* recvbuf had nothing before, maybe after progressing ingress? */
if(!*pnread && !Curl_bufq_is_empty(&stream->recvbuf)) {
nread = Curl_bufq_read(&stream->recvbuf,
(unsigned char *)buf, len, &result);
if(nread < 0) {
result = Curl_bufq_cread(&stream->recvbuf, buf, len, pnread);
if(result) {
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] read recvbuf(len=%zu) "
"-> %zd, %d", stream->s.id, len, nread, result);
"-> %d, %zu", stream->s.id, len, result, *pnread);
goto out;
}
*pnread = (size_t)nread;
}
if(*pnread) {

View file

@ -327,16 +327,16 @@ static CURLcode write_resp_raw(struct Curl_cfilter *cf,
struct cf_quiche_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result = CURLE_OK;
ssize_t nwritten;
size_t nwritten;
(void)cf;
if(!stream)
return CURLE_RECV_ERROR;
nwritten = Curl_bufq_write(&stream->recvbuf, mem, memlen, &result);
if(nwritten < 0)
result = Curl_bufq_write(&stream->recvbuf, mem, memlen, &nwritten);
if(result)
return result;
if((size_t)nwritten < memlen) {
if(nwritten < memlen) {
/* This MUST not happen. Our recbuf is dimensioned to hold the
* full max_stream_window and then some for this very reason. */
DEBUGASSERT(0);
@ -390,30 +390,26 @@ static int cb_each_header(uint8_t *name, size_t name_len,
return result;
}
static ssize_t stream_resp_read(void *reader_ctx,
unsigned char *buf, size_t len,
CURLcode *err)
static CURLcode stream_resp_read(void *reader_ctx,
unsigned char *buf, size_t len,
size_t *pnread)
{
struct cb_ctx *x = reader_ctx;
struct cf_quiche_ctx *ctx = x->cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, x->data);
ssize_t nread;
if(!stream) {
*err = CURLE_RECV_ERROR;
return -1;
}
*pnread = 0;
if(!stream)
return CURLE_RECV_ERROR;
nread = quiche_h3_recv_body(ctx->h3c, ctx->qconn, stream->id,
buf, len);
nread = quiche_h3_recv_body(ctx->h3c, ctx->qconn, stream->id, buf, len);
if(nread >= 0) {
*err = CURLE_OK;
return nread;
}
else {
*err = CURLE_AGAIN;
return -1;
*pnread = (size_t)nread;
return CURLE_OK;
}
else
return CURLE_AGAIN;
}
static CURLcode cf_recv_body(struct Curl_cfilter *cf,
@ -421,7 +417,7 @@ static CURLcode cf_recv_body(struct Curl_cfilter *cf,
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nwritten;
size_t nread;
struct cb_ctx cb_ctx;
CURLcode result = CURLE_OK;
@ -437,12 +433,12 @@ static CURLcode cf_recv_body(struct Curl_cfilter *cf,
cb_ctx.cf = cf;
cb_ctx.data = data;
nwritten = Curl_bufq_slurp(&stream->recvbuf,
stream_resp_read, &cb_ctx, &result);
result = Curl_bufq_slurp(&stream->recvbuf,
stream_resp_read, &cb_ctx, &nread);
if(nwritten < 0 && result != CURLE_AGAIN) {
CURL_TRC_CF(data, cf, "[%"FMT_PRIu64"] recv_body error %zd",
stream->id, nwritten);
if(result && result != CURLE_AGAIN) {
CURL_TRC_CF(data, cf, "[%"FMT_PRIu64"] recv_body error %zu",
stream->id, nread);
failf(data, "Error %d in HTTP/3 response body for stream[%"FMT_PRIu64"]",
result, stream->id);
stream->closed = TRUE;
@ -714,27 +710,25 @@ struct read_ctx {
quiche_send_info send_info;
};
static ssize_t read_pkt_to_send(void *userp,
unsigned char *buf, size_t buflen,
CURLcode *err)
static CURLcode read_pkt_to_send(void *userp,
unsigned char *buf, size_t buflen,
size_t *pnread)
{
struct read_ctx *x = userp;
struct cf_quiche_ctx *ctx = x->cf->ctx;
ssize_t nwritten;
ssize_t n;
nwritten = quiche_conn_send(ctx->qconn, buf, buflen, &x->send_info);
if(nwritten == QUICHE_ERR_DONE) {
*err = CURLE_AGAIN;
return -1;
}
*pnread = 0;
n = quiche_conn_send(ctx->qconn, buf, buflen, &x->send_info);
if(n == QUICHE_ERR_DONE)
return CURLE_AGAIN;
if(nwritten < 0) {
failf(x->data, "quiche_conn_send returned %zd", nwritten);
*err = CURLE_SEND_ERROR;
return -1;
if(n < 0) {
failf(x->data, "quiche_conn_send returned %zd", n);
return CURLE_SEND_ERROR;
}
*err = CURLE_OK;
return nwritten;
*pnread = (size_t)n;
return CURLE_OK;
}
/*
@ -745,7 +739,7 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
ssize_t nread;
size_t nread;
CURLcode result;
curl_int64_t expiry_ns;
curl_int64_t timeout_ns;
@ -783,9 +777,9 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
gsolen = quiche_conn_max_send_udp_payload_size(ctx->qconn);
for(;;) {
/* add the next packet to send, if any, to our buffer */
nread = Curl_bufq_sipn(&ctx->q.sendbuf, 0,
read_pkt_to_send, &readx, &result);
if(nread < 0) {
result = Curl_bufq_sipn(&ctx->q.sendbuf, 0,
read_pkt_to_send, &readx, &nread);
if(result) {
if(result != CURLE_AGAIN)
return result;
/* Nothing more to add, flush and leave */
@ -801,7 +795,7 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
}
++pkt_count;
if((size_t)nread < gsolen || pkt_count >= MAX_PKT_BURST) {
if(nread < gsolen || pkt_count >= MAX_PKT_BURST) {
result = vquic_send(cf, data, &ctx->q, gsolen);
if(result) {
if(result == CURLE_AGAIN) {
@ -823,37 +817,31 @@ out:
return result;
}
static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
static CURLcode recv_closed_stream(struct Curl_cfilter *cf,
struct Curl_easy *data,
CURLcode *err)
size_t *pnread)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nread = -1;
CURLcode result = CURLE_OK;
DEBUGASSERT(stream);
*pnread = 0;
if(stream->reset) {
failf(data,
"HTTP/3 stream %" FMT_PRIu64 " reset by server", stream->id);
*err = data->req.bytecount ? CURLE_PARTIAL_FILE : CURLE_HTTP3;
result = data->req.bytecount ? CURLE_PARTIAL_FILE : CURLE_HTTP3;
CURL_TRC_CF(data, cf, "[%" FMT_PRIu64 "] cf_recv, was reset -> %d",
stream->id, *err);
stream->id, result);
}
else if(!stream->resp_got_header) {
failf(data,
"HTTP/3 stream %" FMT_PRIu64 " was closed cleanly, but before "
"getting all response header fields, treated as error",
stream->id);
/* *err = CURLE_PARTIAL_FILE; */
*err = CURLE_HTTP3;
CURL_TRC_CF(data, cf, "[%" FMT_PRIu64 "] cf_recv, closed incomplete"
" -> %d", stream->id, *err);
result = CURLE_HTTP3;
}
else {
*err = CURLE_OK;
nread = 0;
}
return nread;
return result;
}
static CURLcode cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
@ -861,7 +849,6 @@ static CURLcode cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nread = -1;
CURLcode result = CURLE_OK, r2;
*pnread = 0;
@ -872,13 +859,11 @@ static CURLcode cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
if(!Curl_bufq_is_empty(&stream->recvbuf)) {
nread = Curl_bufq_read(&stream->recvbuf,
(unsigned char *)buf, len, &result);
result = Curl_bufq_cread(&stream->recvbuf, buf, len, pnread);
CURL_TRC_CF(data, cf, "[%" FMT_PRIu64 "] read recvbuf(len=%zu) "
"-> %zd, %d", stream->id, len, nread, result);
if(nread < 0)
"-> %d, %zu", stream->id, len, result, *pnread);
if(result)
goto out;
*pnread = (size_t)nread;
}
if(cf_process_ingress(cf, data)) {
@ -888,14 +873,12 @@ static CURLcode cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
}
/* recvbuf had nothing before, maybe after progressing ingress? */
if(nread < 0 && !Curl_bufq_is_empty(&stream->recvbuf)) {
nread = Curl_bufq_read(&stream->recvbuf,
(unsigned char *)buf, len, &result);
if(!*pnread && !Curl_bufq_is_empty(&stream->recvbuf)) {
result = Curl_bufq_cread(&stream->recvbuf, buf, len, pnread);
CURL_TRC_CF(data, cf, "[%" FMT_PRIu64 "] read recvbuf(len=%zu) "
"-> %zd, %d", stream->id, len, nread, result);
if(nread < 0)
"-> %d, %zu", stream->id, len, result, *pnread);
if(result)
goto out;
*pnread = (size_t)nread;
}
if(*pnread) {
@ -903,16 +886,14 @@ static CURLcode cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
Curl_multi_mark_dirty(data);
}
else {
if(stream->closed) {
nread = recv_closed_stream(cf, data, &result);
goto out;
}
if(stream->closed)
result = recv_closed_stream(cf, data, pnread);
else if(quiche_conn_is_draining(ctx->qconn)) {
failf(data, "QUIC connection is draining");
result = CURLE_HTTP3;
goto out;
}
result = CURLE_AGAIN;
else
result = CURLE_AGAIN;
}
out:
@ -922,7 +903,7 @@ out:
result = r2;
}
if(*pnread > 0)
ctx->data_recvd += nread;
ctx->data_recvd += *pnread;
CURL_TRC_CF(data, cf, "[%"FMT_PRIu64"] cf_recv(total=%"
FMT_OFF_T ") -> %d, %zu",
stream->id, ctx->data_recvd, result, *pnread);

View file

@ -1346,7 +1346,7 @@ static CURLcode ssl_cf_set_earlydata(struct Curl_cfilter *cf,
const void *buf, size_t blen)
{
struct ssl_connect_data *connssl = cf->ctx;
ssize_t nwritten = 0;
size_t nwritten = 0;
CURLcode result = CURLE_OK;
DEBUGASSERT(connssl->earlydata_state == ssl_earlydata_await);
@ -1354,10 +1354,10 @@ static CURLcode ssl_cf_set_earlydata(struct Curl_cfilter *cf,
if(blen) {
if(blen > connssl->earlydata_max)
blen = connssl->earlydata_max;
nwritten = Curl_bufq_write(&connssl->earlydata, buf, blen, &result);
result = Curl_bufq_write(&connssl->earlydata, buf, blen, &nwritten);
CURL_TRC_CF(data, cf, "ssl_cf_set_earlydata(len=%zu) -> %zd",
blen, nwritten);
if(nwritten < 0)
if(result)
return result;
}
return CURLE_OK;

View file

@ -653,11 +653,11 @@ static CURLcode ws_cw_write(struct Curl_easy *data,
}
if(nbytes) {
ssize_t nwritten;
nwritten = Curl_bufq_write(&ctx->buf, (const unsigned char *)buf,
nbytes, &result);
if(nwritten < 0) {
infof(data, "[WS] error adding data to buffer %d", result);
size_t nwritten;
result = Curl_bufq_write(&ctx->buf, (const unsigned char *)buf,
nbytes, &nwritten);
if(result) {
infof(data, "WS: error adding data to buffer %d", result);
return result;
}
}
@ -756,8 +756,7 @@ static ssize_t ws_enc_write_head(struct Curl_easy *data,
{
unsigned char firstbyte = 0;
unsigned char head[14];
size_t hlen;
ssize_t n;
size_t hlen, n;
if(payload_len < 0) {
failf(data, "[WS] starting new frame with negative payload length %"
@ -837,16 +836,16 @@ static ssize_t ws_enc_write_head(struct Curl_easy *data,
/* reset for payload to come */
enc->xori = 0;
n = Curl_bufq_write(out, head, hlen, err);
if(n < 0)
*err = Curl_bufq_write(out, head, hlen, &n);
if(*err)
return -1;
if((size_t)n != hlen) {
if(n != hlen) {
/* We use a bufq with SOFT_LIMIT, writing should always succeed */
DEBUGASSERT(0);
*err = CURLE_SEND_ERROR;
return -1;
}
return n;
return (ssize_t)n;
}
static ssize_t ws_enc_write_payload(struct ws_encoder *enc,
@ -854,8 +853,7 @@ static ssize_t ws_enc_write_payload(struct ws_encoder *enc,
const unsigned char *buf, size_t buflen,
struct bufq *out, CURLcode *err)
{
ssize_t n;
size_t i, len;
size_t i, len, n;
if(Curl_bufq_is_full(out)) {
*err = CURLE_AGAIN;
@ -869,8 +867,8 @@ static ssize_t ws_enc_write_payload(struct ws_encoder *enc,
for(i = 0; i < len; ++i) {
unsigned char c = buf[i] ^ enc->mask[enc->xori];
n = Curl_bufq_write(out, &c, 1, err);
if(n < 0) {
*err = Curl_bufq_write(out, &c, 1, &n);
if(*err) {
if((*err != CURLE_AGAIN) || !i)
return -1;
break;
@ -1054,15 +1052,16 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
}
if(data->set.connect_only) {
ssize_t nwritten;
size_t nwritten;
/* In CONNECT_ONLY setup, the payloads from `mem` need to be received
* when using `curl_ws_recv` later on after this transfer is already
* marked as DONE. */
nwritten = Curl_bufq_write(&ws->recvbuf, (const unsigned char *)mem,
nread, &result);
if(nwritten < 0)
result = Curl_bufq_write(&ws->recvbuf, (const unsigned char *)mem,
nread, &nwritten);
if(result)
return result;
CURL_TRC_WS(data, "%zu bytes payload", nread);
DEBUGASSERT(nread == nwritten);
infof(data, "%zu bytes websocket payload", nread);
}
else { /* !connect_only */
/* And pass any additional data to the writers */
@ -1137,17 +1136,12 @@ static ssize_t ws_client_collect(const unsigned char *buf, size_t buflen,
return nwritten;
}
static ssize_t nw_in_recv(void *reader_ctx,
unsigned char *buf, size_t buflen,
CURLcode *err)
static CURLcode nw_in_recv(void *reader_ctx,
unsigned char *buf, size_t buflen,
size_t *pnread)
{
struct Curl_easy *data = reader_ctx;
size_t nread;
*err = curl_easy_recv(data, buf, buflen, &nread);
if(*err)
return -1;
return (ssize_t)nread;
return curl_easy_recv(data, buf, buflen, pnread);
}
CURL_EXTERN CURLcode curl_ws_recv(CURL *d, void *buffer,
@ -1192,10 +1186,10 @@ CURL_EXTERN CURLcode curl_ws_recv(CURL *d, void *buffer,
/* receive more when our buffer is empty */
if(Curl_bufq_is_empty(&ws->recvbuf)) {
ssize_t n = Curl_bufq_slurp(&ws->recvbuf, nw_in_recv, data, &result);
if(n < 0) {
size_t n;
result = Curl_bufq_slurp(&ws->recvbuf, nw_in_recv, data, &n);
if(result)
return result;
}
else if(n == 0) {
/* connection closed */
infof(data, "[WS] connection expectedly closed?");

View file

@ -86,7 +86,8 @@ static void check_bufq(size_t pool_spares,
struct bufc_pool pool;
size_t max_len = chunk_size * max_chunks;
CURLcode result;
ssize_t n, i;
ssize_t i;
size_t n2;
size_t nwritten, nread;
if(pool_spares > 0) {
@ -104,18 +105,17 @@ static void check_bufq(size_t pool_spares,
fail_unless(q.spare == NULL, "init: spare not NULL");
fail_unless(Curl_bufq_len(&q) == 0, "init: bufq length != 0");
n = Curl_bufq_write(&q, test_data, wsize, &result);
fail_unless(n >= 0, "write: negative size returned");
fail_unless((size_t)n <= wsize, "write: wrong size returned");
result = Curl_bufq_write(&q, test_data, wsize, &n2);
fail_unless(n2 <= wsize, "write: wrong size returned");
fail_unless(result == CURLE_OK, "write: wrong result returned");
/* write empty bufq full */
nwritten = 0;
Curl_bufq_reset(&q);
while(!Curl_bufq_is_full(&q)) {
n = Curl_bufq_write(&q, test_data, wsize, &result);
if(n >= 0) {
nwritten += (size_t)n;
result = Curl_bufq_write(&q, test_data, wsize, &n2);
if(!result) {
nwritten += n2;
}
else if(result != CURLE_AGAIN) {
fail_unless(result == CURLE_AGAIN, "write-loop: unexpected result");
@ -132,9 +132,9 @@ static void check_bufq(size_t pool_spares,
/* read full bufq empty */
nread = 0;
while(!Curl_bufq_is_empty(&q)) {
n = Curl_bufq_read(&q, test_data, rsize, &result);
if(n >= 0) {
nread += (size_t)n;
result = Curl_bufq_read(&q, test_data, rsize, &n2);
if(!result) {
nread += n2;
}
else if(result != CURLE_AGAIN) {
fail_unless(result == CURLE_AGAIN, "read-loop: unexpected result");
@ -153,13 +153,13 @@ static void check_bufq(size_t pool_spares,
}
for(i = 0; i < 1000; ++i) {
n = Curl_bufq_write(&q, test_data, wsize, &result);
if(n < 0 && result != CURLE_AGAIN) {
result = Curl_bufq_write(&q, test_data, wsize, &n2);
if(result && result != CURLE_AGAIN) {
fail_unless(result == CURLE_AGAIN, "rw-loop: unexpected write result");
break;
}
n = Curl_bufq_read(&q, test_data, rsize, &result);
if(n < 0 && result != CURLE_AGAIN) {
result = Curl_bufq_read(&q, test_data, rsize, &n2);
if(result && result != CURLE_AGAIN) {
fail_unless(result == CURLE_AGAIN, "rw-loop: unexpected read result");
break;
}
@ -170,12 +170,12 @@ static void check_bufq(size_t pool_spares,
Curl_bufq_init2(&q, chunk_size, max_chunks, (opts|BUFQ_OPT_SOFT_LIMIT));
nwritten = 0;
while(!Curl_bufq_is_full(&q)) {
n = Curl_bufq_write(&q, test_data, wsize, &result);
if(n < 0 || (size_t)n != wsize) {
fail_unless(n > 0 && (size_t)n == wsize, "write should be complete");
result = Curl_bufq_write(&q, test_data, wsize, &n2);
if(result || n2 != wsize) {
fail_unless(!result && n2 == wsize, "write should be complete");
break;
}
nwritten += (size_t)n;
nwritten += n2;
}
if(nwritten < max_len) {
curl_mfprintf(stderr, "%zu bytes written, but max_len=%zu\n",
@ -184,18 +184,18 @@ static void check_bufq(size_t pool_spares,
fail_if(TRUE, "write: bufq full but nwritten wrong");
}
/* do one more write on a full bufq, should work */
n = Curl_bufq_write(&q, test_data, wsize, &result);
fail_unless(n > 0 && (size_t)n == wsize, "write should be complete");
nwritten += (size_t)n;
result = Curl_bufq_write(&q, test_data, wsize, &n2);
fail_unless(!result && n2 == wsize, "write should be complete");
nwritten += n2;
/* see that we get all out again */
nread = 0;
while(!Curl_bufq_is_empty(&q)) {
n = Curl_bufq_read(&q, test_data, rsize, &result);
if(n <= 0) {
fail_unless(n > 0, "read-loop: unexpected fail");
result = Curl_bufq_read(&q, test_data, rsize, &n2);
if(result) {
fail_unless(result, "read-loop: unexpected fail");
break;
}
nread += (size_t)n;
nread += n2;
}
fail_unless(nread == nwritten, "did not get the same out as put in");
@ -210,13 +210,13 @@ static CURLcode test_unit2601(char *arg)
UNITTEST_BEGIN_SIMPLE
struct bufq q;
ssize_t n;
size_t n;
CURLcode result;
unsigned char buf[16*1024];
Curl_bufq_init(&q, 8*1024, 12);
n = Curl_bufq_read(&q, buf, 128, &result);
fail_unless(n < 0 && result == CURLE_AGAIN, "read empty fail");
result = Curl_bufq_read(&q, buf, 128, &n);
fail_unless(result && result == CURLE_AGAIN, "read empty fail");
Curl_bufq_free(&q);
check_bufq(0, 1024, 4, 128, 128, BUFQ_OPT_NONE);