lib: unify recv/send function signatures

cfilter/conn: change send/recv function signatures. Unify the
calling/return conventions in our send/receive handling.

Curl_conn_recv(), adjust pnread type

Parameter `pnread` was a `ssize_t *`, but `size_t *` is better since the
function returns any error in its `CURLcode` return value.

Closes #17546
This commit is contained in:
Stefan Eissing 2025-06-11 10:18:15 +02:00 committed by Daniel Stenberg
parent 3934431421
commit 20c90ba298
No known key found for this signature in database
GPG key ID: 5CC908FDB71E12C2
37 changed files with 1114 additions and 1219 deletions

View file

@ -238,18 +238,17 @@ static ssize_t proxy_nw_in_reader(void *reader_ctx,
CURLcode *err)
{
struct Curl_cfilter *cf = reader_ctx;
ssize_t nread;
if(cf) {
struct Curl_easy *data = CF_DATA_CURRENT(cf);
nread = Curl_conn_cf_recv(cf->next, data, (char *)buf, buflen, err);
CURL_TRC_CF(data, cf, "[0] nw_in_reader(len=%zu) -> %zd, %d",
buflen, nread, *err);
size_t nread;
*err = Curl_conn_cf_recv(cf->next, data, (char *)buf, buflen, &nread);
CURL_TRC_CF(data, cf, "[0] nw_in_reader(len=%zu) -> %d, %zu",
buflen, *err, nread);
return *err ? -1 : (ssize_t)nread;
}
else {
nread = 0;
}
return nread;
*err = CURLE_FAILED_INIT;
return -1;
}
static ssize_t proxy_h2_nw_out_writer(void *writer_ctx,
@ -257,19 +256,18 @@ static ssize_t proxy_h2_nw_out_writer(void *writer_ctx,
CURLcode *err)
{
struct Curl_cfilter *cf = writer_ctx;
ssize_t nwritten;
if(cf) {
struct Curl_easy *data = CF_DATA_CURRENT(cf);
nwritten = Curl_conn_cf_send(cf->next, data, (const char *)buf, buflen,
FALSE, err);
size_t nwritten;
*err = Curl_conn_cf_send(cf->next, data, (const char *)buf, buflen,
FALSE, &nwritten);
CURL_TRC_CF(data, cf, "[0] nw_out_writer(len=%zu) -> %zd, %d",
buflen, nwritten, *err);
return *err ? -1 : (ssize_t)nwritten;
}
else {
nwritten = 0;
}
return nwritten;
*err = CURLE_FAILED_INIT;
return -1;
}
static int proxy_h2_client_new(struct Curl_cfilter *cf,
@ -1277,212 +1275,203 @@ static void cf_h2_proxy_adjust_pollset(struct Curl_cfilter *cf,
}
}
static ssize_t h2_handle_tunnel_close(struct Curl_cfilter *cf,
struct Curl_easy *data,
CURLcode *err)
static CURLcode h2_handle_tunnel_close(struct Curl_cfilter *cf,
struct Curl_easy *data,
size_t *pnread)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
ssize_t rv = 0;
*pnread = 0;
if(ctx->tunnel.error == NGHTTP2_REFUSED_STREAM) {
CURL_TRC_CF(data, cf, "[%d] REFUSED_STREAM, try again on a new "
"connection", ctx->tunnel.stream_id);
connclose(cf->conn, "REFUSED_STREAM"); /* do not use this anymore */
*err = CURLE_RECV_ERROR; /* trigger Curl_retry_request() later */
return -1;
return CURLE_RECV_ERROR; /* trigger Curl_retry_request() later */
}
else if(ctx->tunnel.error != NGHTTP2_NO_ERROR) {
failf(data, "HTTP/2 stream %u was not closed cleanly: %s (err %u)",
ctx->tunnel.stream_id, nghttp2_http2_strerror(ctx->tunnel.error),
ctx->tunnel.error);
*err = CURLE_HTTP2_STREAM;
return -1;
return CURLE_HTTP2_STREAM;
}
else if(ctx->tunnel.reset) {
failf(data, "HTTP/2 stream %u was reset", ctx->tunnel.stream_id);
*err = CURLE_RECV_ERROR;
return -1;
return CURLE_RECV_ERROR;
}
*err = CURLE_OK;
rv = 0;
CURL_TRC_CF(data, cf, "[%d] handle_tunnel_close -> %zd, %d",
ctx->tunnel.stream_id, rv, *err);
return rv;
CURL_TRC_CF(data, cf, "[%d] handle_tunnel_close -> 0",
ctx->tunnel.stream_id);
return CURLE_OK;
}
static ssize_t tunnel_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
char *buf, size_t len, CURLcode *err)
static CURLcode tunnel_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
char *buf, size_t len, size_t *pnread)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
ssize_t nread = -1;
CURLcode result = CURLE_AGAIN;
*err = CURLE_AGAIN;
*pnread = 0;
if(!Curl_bufq_is_empty(&ctx->tunnel.recvbuf)) {
nread = Curl_bufq_read(&ctx->tunnel.recvbuf,
(unsigned char *)buf, len, err);
ssize_t nread = Curl_bufq_read(&ctx->tunnel.recvbuf,
(unsigned char *)buf, len, &result);
if(nread < 0)
goto out;
DEBUGASSERT(nread > 0);
*pnread = (size_t)nread;
}
if(nread < 0) {
if(!*pnread) {
if(ctx->tunnel.closed) {
nread = h2_handle_tunnel_close(cf, data, err);
result = h2_handle_tunnel_close(cf, data, pnread);
}
else if(ctx->tunnel.reset ||
(ctx->conn_closed && Curl_bufq_is_empty(&ctx->inbufq)) ||
(ctx->rcvd_goaway &&
ctx->last_stream_id < ctx->tunnel.stream_id)) {
*err = CURLE_RECV_ERROR;
nread = -1;
result = CURLE_RECV_ERROR;
}
}
else if(nread == 0) {
*err = CURLE_AGAIN;
nread = -1;
else
result = CURLE_AGAIN;
}
out:
CURL_TRC_CF(data, cf, "[%d] tunnel_recv(len=%zu) -> %zd, %d",
ctx->tunnel.stream_id, len, nread, *err);
return nread;
CURL_TRC_CF(data, cf, "[%d] tunnel_recv(len=%zu) -> %d, %zu",
ctx->tunnel.stream_id, len, result, *pnread);
return result;
}
static ssize_t cf_h2_proxy_recv(struct Curl_cfilter *cf,
struct Curl_easy *data,
char *buf, size_t len, CURLcode *err)
static CURLcode cf_h2_proxy_recv(struct Curl_cfilter *cf,
struct Curl_easy *data,
char *buf, size_t len,
size_t *pnread)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
ssize_t nread = -1;
struct cf_call_data save;
CURLcode result;
CURLcode result, r2;
if(ctx->tunnel.state != H2_TUNNEL_ESTABLISHED) {
*err = CURLE_RECV_ERROR;
return -1;
}
*pnread = 0;
CF_DATA_SAVE(save, cf, data);
if(ctx->tunnel.state != H2_TUNNEL_ESTABLISHED) {
result = CURLE_RECV_ERROR;
goto out;
}
if(Curl_bufq_is_empty(&ctx->tunnel.recvbuf)) {
*err = proxy_h2_progress_ingress(cf, data);
if(*err)
result = proxy_h2_progress_ingress(cf, data);
if(result)
goto out;
}
nread = tunnel_recv(cf, data, buf, len, err);
result = tunnel_recv(cf, data, buf, len, pnread);
if(nread > 0) {
CURL_TRC_CF(data, cf, "[%d] increase window by %zd",
ctx->tunnel.stream_id, nread);
nghttp2_session_consume(ctx->h2, ctx->tunnel.stream_id, (size_t)nread);
if(!result) {
CURL_TRC_CF(data, cf, "[%d] increase window by %zu",
ctx->tunnel.stream_id, *pnread);
nghttp2_session_consume(ctx->h2, ctx->tunnel.stream_id, *pnread);
}
result = proxy_h2_progress_egress(cf, data);
if(result && (result != CURLE_AGAIN)) {
*err = result;
nread = -1;
}
r2 = proxy_h2_progress_egress(cf, data);
if(r2 && (r2 != CURLE_AGAIN))
result = r2;
out:
if(!Curl_bufq_is_empty(&ctx->tunnel.recvbuf) &&
(nread >= 0 || *err == CURLE_AGAIN)) {
(!result || (result == CURLE_AGAIN))) {
/* data pending and no fatal error to report. Need to trigger
* draining to avoid stalling when no socket events happen. */
drain_tunnel(cf, data, &ctx->tunnel);
}
CURL_TRC_CF(data, cf, "[%d] cf_recv(len=%zu) -> %zd %d",
ctx->tunnel.stream_id, len, nread, *err);
CURL_TRC_CF(data, cf, "[%d] cf_recv(len=%zu) -> %d, %zu",
ctx->tunnel.stream_id, len, result, *pnread);
CF_DATA_RESTORE(cf, save);
return nread;
return result;
}
static ssize_t cf_h2_proxy_send(struct Curl_cfilter *cf,
struct Curl_easy *data,
const void *buf, size_t len, bool eos,
CURLcode *err)
static CURLcode cf_h2_proxy_send(struct Curl_cfilter *cf,
struct Curl_easy *data,
const void *buf, size_t len, bool eos,
size_t *pnwritten)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
struct cf_call_data save;
int rv;
ssize_t nwritten;
CURLcode result;
ssize_t nwritten = 0;
CURLcode result, r2;
(void)eos;
if(ctx->tunnel.state != H2_TUNNEL_ESTABLISHED) {
*err = CURLE_SEND_ERROR;
return -1;
}
*pnwritten = 0;
CF_DATA_SAVE(save, cf, data);
if(ctx->tunnel.closed) {
nwritten = -1;
*err = CURLE_SEND_ERROR;
if(ctx->tunnel.state != H2_TUNNEL_ESTABLISHED) {
result = CURLE_SEND_ERROR;
goto out;
}
else {
nwritten = Curl_bufq_write(&ctx->tunnel.sendbuf, buf, len, err);
if(nwritten < 0 && (*err != CURLE_AGAIN))
goto out;
if(ctx->tunnel.closed) {
result = CURLE_SEND_ERROR;
goto out;
}
nwritten = Curl_bufq_write(&ctx->tunnel.sendbuf, buf, len, &result);
CURL_TRC_CF(data, cf, "cf_send(), bufq_write %d, %zd", result, nwritten);
if(nwritten >= 0)
*pnwritten = (size_t)nwritten;
else if(result && (result != CURLE_AGAIN))
goto out;
if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
/* req body data is buffered, resume the potentially suspended stream */
rv = nghttp2_session_resume_data(ctx->h2, ctx->tunnel.stream_id);
if(nghttp2_is_fatal(rv)) {
*err = CURLE_SEND_ERROR;
nwritten = -1;
result = CURLE_SEND_ERROR;
goto out;
}
}
result = proxy_h2_progress_ingress(cf, data);
if(result) {
*err = result;
nwritten = -1;
r2 = proxy_h2_progress_ingress(cf, data);
if(r2 && (r2 != CURLE_AGAIN)) {
result = r2;
goto out;
}
/* Call the nghttp2 send loop and flush to write ALL buffered data,
* headers and/or request body completely out to the network */
result = proxy_h2_progress_egress(cf, data);
if(result && (result != CURLE_AGAIN)) {
*err = result;
nwritten = -1;
r2 = proxy_h2_progress_egress(cf, data);
if(r2 && (r2 != CURLE_AGAIN)) {
result = r2;
goto out;
}
if(proxy_h2_should_close_session(ctx)) {
if(!result && proxy_h2_should_close_session(ctx)) {
/* nghttp2 thinks this session is done. If the stream has not been
* closed, this is an error state for out transfer */
if(ctx->tunnel.closed) {
*err = CURLE_SEND_ERROR;
nwritten = -1;
result = CURLE_SEND_ERROR;
}
else {
CURL_TRC_CF(data, cf, "[0] send: nothing to do in this session");
*err = CURLE_HTTP2;
nwritten = -1;
result = CURLE_HTTP2;
}
}
out:
if(!Curl_bufq_is_empty(&ctx->tunnel.recvbuf) &&
(nwritten >= 0 || *err == CURLE_AGAIN)) {
(!result || (result == CURLE_AGAIN))) {
/* data pending and no fatal error to report. Need to trigger
* draining to avoid stalling when no socket events happen. */
drain_tunnel(cf, data, &ctx->tunnel);
}
CURL_TRC_CF(data, cf, "[%d] cf_send(len=%zu) -> %zd, %d, "
CURL_TRC_CF(data, cf, "[%d] cf_send(len=%zu) -> %d, %zu, "
"h2 windows %d-%d (stream-conn), buffers %zu-%zu (stream-conn)",
ctx->tunnel.stream_id, len, nwritten, *err,
ctx->tunnel.stream_id, len, result, *pnwritten,
nghttp2_session_get_stream_remote_window_size(
ctx->h2, ctx->tunnel.stream_id),
nghttp2_session_get_remote_window_size(ctx->h2),
Curl_bufq_len(&ctx->tunnel.sendbuf),
Curl_bufq_len(&ctx->outbufq));
CF_DATA_RESTORE(cf, save);
return nwritten;
return result;
}
static CURLcode cf_h2_proxy_flush(struct Curl_cfilter *cf,