mirror of
https://github.com/curl/curl.git
synced 2026-04-15 00:51:42 +03:00
quiche: use client writer
Instead of buffering response body data until it is received by the transfer loop, write the response data directly to the client. Use a connection wide scratch buffer to get the response body from quiche. Eliminates need for maintaining individual buffers for each stream. Fixes #19803 Reported-by: Stanislav Fort Closes #19806
This commit is contained in:
parent
4f2374810a
commit
b30c1b97b9
1 changed files with 185 additions and 200 deletions
|
|
@ -89,8 +89,9 @@ struct cf_quiche_ctx {
|
|||
uint8_t scid[QUICHE_MAX_CONN_ID_LEN];
|
||||
struct curltime started_at; /* time the current attempt started */
|
||||
struct curltime handshake_at; /* time connect handshake finished */
|
||||
struct bufc_pool stream_bufcp; /* chunk pool for streams */
|
||||
struct uint_hash streams; /* hash `data->mid` to `stream_ctx` */
|
||||
struct dynbuf scratch; /* temp buffer for header construction */
|
||||
struct bufq writebuf; /* temp buffer for writing bodies */
|
||||
curl_off_t data_recvd;
|
||||
BIT(initialized);
|
||||
BIT(goaway); /* got GOAWAY from server */
|
||||
|
|
@ -119,9 +120,10 @@ static void cf_quiche_ctx_init(struct cf_quiche_ctx *ctx)
|
|||
debug_log_init = 1;
|
||||
}
|
||||
#endif
|
||||
Curl_bufcp_init(&ctx->stream_bufcp, H3_STREAM_CHUNK_SIZE,
|
||||
H3_STREAM_POOL_SPARES);
|
||||
curlx_dyn_init(&ctx->scratch, CURL_MAX_HTTP_HEADER);
|
||||
Curl_uint32_hash_init(&ctx->streams, 63, h3_stream_hash_free);
|
||||
Curl_bufq_init2(&ctx->writebuf, H3_STREAM_CHUNK_SIZE, H3_STREAM_RECV_CHUNKS,
|
||||
BUFQ_OPT_SOFT_LIMIT);
|
||||
ctx->data_recvd = 0;
|
||||
ctx->initialized = TRUE;
|
||||
}
|
||||
|
|
@ -134,8 +136,9 @@ static void cf_quiche_ctx_free(struct cf_quiche_ctx *ctx)
|
|||
Curl_vquic_tls_cleanup(&ctx->tls);
|
||||
Curl_ssl_peer_cleanup(&ctx->peer);
|
||||
vquic_ctx_free(&ctx->q);
|
||||
Curl_bufcp_free(&ctx->stream_bufcp);
|
||||
Curl_uint32_hash_destroy(&ctx->streams);
|
||||
curlx_dyn_free(&ctx->scratch);
|
||||
Curl_bufq_free(&ctx->writebuf);
|
||||
}
|
||||
curlx_free(ctx);
|
||||
}
|
||||
|
|
@ -168,9 +171,10 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
|
|||
*/
|
||||
struct h3_stream_ctx {
|
||||
uint64_t id; /* HTTP/3 protocol stream identifier */
|
||||
struct bufq recvbuf; /* h3 response */
|
||||
struct h1_req_parser h1; /* h1 request parsing */
|
||||
uint64_t error3; /* HTTP/3 stream error code */
|
||||
int status_code; /* HTTP status code */
|
||||
CURLcode xfer_result; /* result from cf_quiche_write_(hd/body) */
|
||||
BIT(opened); /* TRUE after stream has been opened */
|
||||
BIT(closed); /* TRUE on stream close */
|
||||
BIT(reset); /* TRUE on stream reset */
|
||||
|
|
@ -182,7 +186,6 @@ struct h3_stream_ctx {
|
|||
|
||||
static void h3_stream_ctx_free(struct h3_stream_ctx *stream)
|
||||
{
|
||||
Curl_bufq_free(&stream->recvbuf);
|
||||
Curl_h1_req_parse_free(&stream->h1);
|
||||
curlx_free(stream);
|
||||
}
|
||||
|
|
@ -252,6 +255,7 @@ static bool cf_quiche_do_expire(struct Curl_cfilter *cf,
|
|||
(void)stream;
|
||||
(void)user_data;
|
||||
CURL_TRC_CF(sdata, cf, "conn closed, mark as dirty");
|
||||
stream->xfer_result = CURLE_SEND_ERROR;
|
||||
Curl_multi_mark_dirty(sdata);
|
||||
return TRUE;
|
||||
}
|
||||
|
|
@ -270,8 +274,6 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf,
|
|||
return CURLE_OUT_OF_MEMORY;
|
||||
|
||||
stream->id = -1;
|
||||
Curl_bufq_initp(&stream->recvbuf, &ctx->stream_bufcp,
|
||||
H3_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT);
|
||||
Curl_h1_req_parse_init(&stream->h1, H1_PARSE_DEFAULT_MAX_LINE_LEN);
|
||||
|
||||
if(!Curl_uint32_hash_set(&ctx->streams, data->mid, stream)) {
|
||||
|
|
@ -282,28 +284,38 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf,
|
|||
return CURLE_OK;
|
||||
}
|
||||
|
||||
static void cf_quiche_stream_close(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
struct h3_stream_ctx *stream)
|
||||
{
|
||||
struct cf_quiche_ctx *ctx = cf->ctx;
|
||||
CURLcode result;
|
||||
|
||||
if(ctx->qconn && !stream->closed) {
|
||||
quiche_conn_stream_shutdown(ctx->qconn, stream->id,
|
||||
QUICHE_SHUTDOWN_READ, CURL_H3_NO_ERROR);
|
||||
if(!stream->send_closed) {
|
||||
quiche_conn_stream_shutdown(ctx->qconn, stream->id,
|
||||
QUICHE_SHUTDOWN_WRITE, CURL_H3_NO_ERROR);
|
||||
stream->send_closed = TRUE;
|
||||
}
|
||||
stream->closed = TRUE;
|
||||
result = cf_flush_egress(cf, data);
|
||||
if(result)
|
||||
CURL_TRC_CF(data, cf, "[%" PRIu64 "] stream close, flush egress -> %d",
|
||||
stream->id, result);
|
||||
}
|
||||
}
|
||||
|
||||
static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
{
|
||||
struct cf_quiche_ctx *ctx = cf->ctx;
|
||||
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
|
||||
CURLcode result;
|
||||
|
||||
(void)cf;
|
||||
if(stream) {
|
||||
CURL_TRC_CF(data, cf, "[%" PRIu64 "] easy handle is done", stream->id);
|
||||
if(ctx->qconn && !stream->closed) {
|
||||
quiche_conn_stream_shutdown(ctx->qconn, stream->id,
|
||||
QUICHE_SHUTDOWN_READ, CURL_H3_NO_ERROR);
|
||||
if(!stream->send_closed) {
|
||||
quiche_conn_stream_shutdown(ctx->qconn, stream->id,
|
||||
QUICHE_SHUTDOWN_WRITE, CURL_H3_NO_ERROR);
|
||||
stream->send_closed = TRUE;
|
||||
}
|
||||
stream->closed = TRUE;
|
||||
result = cf_flush_egress(cf, data);
|
||||
if(result)
|
||||
CURL_TRC_CF(data, cf, "data_done, flush egress -> %d", result);
|
||||
}
|
||||
cf_quiche_stream_close(cf, data, stream);
|
||||
Curl_uint32_hash_remove(&ctx->streams, data->mid);
|
||||
}
|
||||
}
|
||||
|
|
@ -316,39 +328,29 @@ static void cf_quiche_expire_conn_closed(struct Curl_cfilter *cf,
|
|||
cf_quiche_for_all_streams(cf, data->multi, cf_quiche_do_expire, NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* write_resp_raw() copies response data in raw format to the `data`'s
|
||||
* receive buffer. If not enough space is available, it appends to the
|
||||
* `data`'s overflow buffer.
|
||||
*/
|
||||
static CURLcode write_resp_raw(struct Curl_cfilter *cf,
|
||||
static void cf_quiche_write_hd(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
const void *mem, size_t memlen)
|
||||
struct h3_stream_ctx *stream,
|
||||
const char *buf, size_t blen, bool eos)
|
||||
{
|
||||
struct cf_quiche_ctx *ctx = cf->ctx;
|
||||
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
|
||||
CURLcode result = CURLE_OK;
|
||||
size_t nwritten;
|
||||
|
||||
(void)cf;
|
||||
if(!stream)
|
||||
return CURLE_RECV_ERROR;
|
||||
result = Curl_bufq_write(&stream->recvbuf, mem, memlen, &nwritten);
|
||||
if(result)
|
||||
return result;
|
||||
|
||||
if(nwritten < memlen) {
|
||||
/* This MUST not happen. Our recbuf is dimensioned to hold the
|
||||
* full max_stream_window and then some for this reason. */
|
||||
DEBUGASSERT(0);
|
||||
return CURLE_RECV_ERROR;
|
||||
/* This function returns no error intentionally, but records
|
||||
* the result at the stream, skipping further writes once the
|
||||
* `result` of the transfer is known.
|
||||
* The stream is subsequently cancelled "higher up" in the filter's
|
||||
* send/recv callbacks. Closing the stream here leads to SEND/RECV
|
||||
* errors in other places that then overwrite the transfer's result. */
|
||||
if(!stream->xfer_result) {
|
||||
stream->xfer_result = Curl_xfer_write_resp_hd(data, buf, blen, eos);
|
||||
if(stream->xfer_result)
|
||||
CURL_TRC_CF(data, cf, "[%" PRId64 "] error %d writing %zu "
|
||||
"bytes of headers", stream->id, stream->xfer_result, blen);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
struct cb_ctx {
|
||||
struct Curl_cfilter *cf;
|
||||
struct Curl_easy *data;
|
||||
struct h3_stream_ctx *stream;
|
||||
};
|
||||
|
||||
static int cb_each_header(uint8_t *name, size_t name_len,
|
||||
|
|
@ -356,39 +358,59 @@ static int cb_each_header(uint8_t *name, size_t name_len,
|
|||
void *argp)
|
||||
{
|
||||
struct cb_ctx *x = argp;
|
||||
struct cf_quiche_ctx *ctx = x->cf->ctx;
|
||||
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, x->data);
|
||||
struct Curl_cfilter *cf = x->cf;
|
||||
struct Curl_easy *data = x->data;
|
||||
struct h3_stream_ctx *stream = x->stream;
|
||||
struct cf_quiche_ctx *ctx = cf->ctx;
|
||||
CURLcode result;
|
||||
|
||||
if(!stream)
|
||||
return CURLE_OK;
|
||||
if(!stream || stream->xfer_result)
|
||||
return 1; /* abort iteration */
|
||||
|
||||
if((name_len == 7) && !strncmp(HTTP_PSEUDO_STATUS, (char *)name, 7)) {
|
||||
CURL_TRC_CF(x->data, x->cf, "[%" PRIu64 "] status: %.*s",
|
||||
stream->id, (int)value_len, value);
|
||||
result = write_resp_raw(x->cf, x->data, "HTTP/3 ", sizeof("HTTP/3 ") - 1);
|
||||
curlx_dyn_reset(&ctx->scratch);
|
||||
result = Curl_http_decode_status(&stream->status_code,
|
||||
(const char *)value, value_len);
|
||||
if(!result)
|
||||
result = write_resp_raw(x->cf, x->data, value, value_len);
|
||||
result = curlx_dyn_addn(&ctx->scratch, STRCONST("HTTP/3 "));
|
||||
if(!result)
|
||||
result = write_resp_raw(x->cf, x->data, " \r\n", 3);
|
||||
result = curlx_dyn_addn(&ctx->scratch,
|
||||
(const char *)value, value_len);
|
||||
if(!result)
|
||||
result = curlx_dyn_addn(&ctx->scratch, STRCONST(" \r\n"));
|
||||
if(!result)
|
||||
cf_quiche_write_hd(cf, data, stream, curlx_dyn_ptr(&ctx->scratch),
|
||||
curlx_dyn_len(&ctx->scratch), FALSE);
|
||||
CURL_TRC_CF(data, cf, "[%" PRId64 "] status: %s",
|
||||
stream->id, curlx_dyn_ptr(&ctx->scratch));
|
||||
}
|
||||
else {
|
||||
CURL_TRC_CF(x->data, x->cf, "[%" PRIu64 "] header: %.*s: %.*s",
|
||||
/* store as an HTTP1-style header */
|
||||
CURL_TRC_CF(data, cf, "[%" PRId64 "] header: %.*s: %.*s",
|
||||
stream->id, (int)name_len, name,
|
||||
(int)value_len, value);
|
||||
result = write_resp_raw(x->cf, x->data, name, name_len);
|
||||
curlx_dyn_reset(&ctx->scratch);
|
||||
result = curlx_dyn_addn(&ctx->scratch,
|
||||
(const char *)name, name_len);
|
||||
if(!result)
|
||||
result = write_resp_raw(x->cf, x->data, ": ", 2);
|
||||
result = curlx_dyn_addn(&ctx->scratch, STRCONST(": "));
|
||||
if(!result)
|
||||
result = write_resp_raw(x->cf, x->data, value, value_len);
|
||||
result = curlx_dyn_addn(&ctx->scratch,
|
||||
(const char *)value, value_len);
|
||||
if(!result)
|
||||
result = write_resp_raw(x->cf, x->data, "\r\n", 2);
|
||||
result = curlx_dyn_addn(&ctx->scratch, STRCONST("\r\n"));
|
||||
if(!result)
|
||||
cf_quiche_write_hd(cf, data, stream, curlx_dyn_ptr(&ctx->scratch),
|
||||
curlx_dyn_len(&ctx->scratch), FALSE);
|
||||
}
|
||||
|
||||
if(result) {
|
||||
CURL_TRC_CF(x->data, x->cf, "[%" PRIu64 "] on header error %d",
|
||||
stream->id, result);
|
||||
if(!stream->xfer_result)
|
||||
stream->xfer_result = result;
|
||||
}
|
||||
return result;
|
||||
return stream->xfer_result ? 1 : 0;
|
||||
}
|
||||
|
||||
static CURLcode stream_resp_read(void *reader_ctx,
|
||||
|
|
@ -410,94 +432,99 @@ static CURLcode stream_resp_read(void *reader_ctx,
|
|||
return CURLE_OK;
|
||||
}
|
||||
|
||||
static CURLcode cf_recv_body(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data)
|
||||
static void cf_quiche_flush_body(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
struct h3_stream_ctx *stream)
|
||||
{
|
||||
struct cf_quiche_ctx *ctx = cf->ctx;
|
||||
const uint8_t *buf;
|
||||
size_t blen;
|
||||
|
||||
while(stream && !stream->xfer_result) {
|
||||
if(Curl_bufq_peek(&ctx->writebuf, &buf, &blen)) {
|
||||
stream->xfer_result = Curl_xfer_write_resp(
|
||||
data, (const char *)buf, blen, FALSE);
|
||||
Curl_bufq_skip(&ctx->writebuf, blen);
|
||||
if(stream->xfer_result) {
|
||||
CURL_TRC_CF(data, cf, "[%" PRId64 "] error %d writing %zu bytes"
|
||||
" of data", stream->id, stream->xfer_result, blen);
|
||||
}
|
||||
}
|
||||
else
|
||||
break;
|
||||
}
|
||||
Curl_bufq_reset(&ctx->writebuf);
|
||||
}
|
||||
|
||||
static void cf_quiche_recv_body(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
struct h3_stream_ctx *stream)
|
||||
{
|
||||
struct cf_quiche_ctx *ctx = cf->ctx;
|
||||
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
|
||||
size_t nread;
|
||||
struct cb_ctx cb_ctx;
|
||||
CURLcode result = CURLE_OK;
|
||||
|
||||
if(!stream)
|
||||
return CURLE_RECV_ERROR;
|
||||
|
||||
if(!stream->resp_hds_complete) {
|
||||
result = write_resp_raw(cf, data, "\r\n", 2);
|
||||
if(result)
|
||||
return result;
|
||||
stream->resp_hds_complete = TRUE;
|
||||
}
|
||||
return;
|
||||
|
||||
/* Even when the transfer has already errored, we need to receive
|
||||
* the data from quiche, as quiche will otherwise get stuck and
|
||||
* raise events to receive over and over again. */
|
||||
cb_ctx.cf = cf;
|
||||
cb_ctx.data = data;
|
||||
result = Curl_bufq_slurp(&stream->recvbuf,
|
||||
stream_resp_read, &cb_ctx, &nread);
|
||||
|
||||
if(result && result != CURLE_AGAIN) {
|
||||
CURL_TRC_CF(data, cf, "[%" PRIu64 "] recv_body error %zu",
|
||||
stream->id, nread);
|
||||
failf(data, "Error %d in HTTP/3 response body for stream[%" PRIu64 "]",
|
||||
result, stream->id);
|
||||
stream->closed = TRUE;
|
||||
stream->reset = TRUE;
|
||||
stream->send_closed = TRUE;
|
||||
return result;
|
||||
cb_ctx.stream = stream;
|
||||
Curl_bufq_reset(&ctx->writebuf);
|
||||
while(!result) {
|
||||
result = Curl_bufq_slurp(&ctx->writebuf,
|
||||
stream_resp_read, &cb_ctx, &nread);
|
||||
if(!result)
|
||||
cf_quiche_flush_body(cf, data, stream);
|
||||
else if(result == CURLE_AGAIN)
|
||||
break;
|
||||
else if(result) {
|
||||
CURL_TRC_CF(data, cf, "[%" PRIu64 "] recv_body error %d",
|
||||
stream->id, result);
|
||||
failf(data, "[%" PRIu64 "] Error %d in HTTP/3 response body for stream",
|
||||
stream->id, result);
|
||||
stream->closed = TRUE;
|
||||
stream->reset = TRUE;
|
||||
stream->send_closed = TRUE;
|
||||
if(!stream->xfer_result)
|
||||
stream->xfer_result = result;
|
||||
}
|
||||
}
|
||||
return CURLE_OK;
|
||||
cf_quiche_flush_body(cf, data, stream);
|
||||
}
|
||||
|
||||
#ifdef DEBUGBUILD
|
||||
static const char *cf_ev_name(quiche_h3_event *ev)
|
||||
{
|
||||
switch(quiche_h3_event_type(ev)) {
|
||||
case QUICHE_H3_EVENT_HEADERS:
|
||||
return "HEADERS";
|
||||
case QUICHE_H3_EVENT_DATA:
|
||||
return "DATA";
|
||||
case QUICHE_H3_EVENT_RESET:
|
||||
return "RESET";
|
||||
case QUICHE_H3_EVENT_FINISHED:
|
||||
return "FINISHED";
|
||||
case QUICHE_H3_EVENT_GOAWAY:
|
||||
return "GOAWAY";
|
||||
default:
|
||||
return "Unknown";
|
||||
}
|
||||
}
|
||||
#else
|
||||
#define cf_ev_name(x) ""
|
||||
#endif
|
||||
|
||||
static CURLcode h3_process_event(struct Curl_cfilter *cf,
|
||||
static void cf_quiche_process_ev(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
struct h3_stream_ctx *stream,
|
||||
quiche_h3_event *ev)
|
||||
{
|
||||
struct cb_ctx cb_ctx;
|
||||
CURLcode result = CURLE_OK;
|
||||
int rc;
|
||||
|
||||
if(!stream)
|
||||
return CURLE_OK;
|
||||
return;
|
||||
|
||||
switch(quiche_h3_event_type(ev)) {
|
||||
case QUICHE_H3_EVENT_HEADERS:
|
||||
case QUICHE_H3_EVENT_HEADERS: {
|
||||
struct cb_ctx cb_ctx;
|
||||
stream->resp_got_header = TRUE;
|
||||
cb_ctx.cf = cf;
|
||||
cb_ctx.data = data;
|
||||
rc = quiche_h3_event_for_each_header(ev, cb_each_header, &cb_ctx);
|
||||
if(rc) {
|
||||
failf(data, "Error %d in HTTP/3 response header for stream[%" PRIu64 "]",
|
||||
rc, stream->id);
|
||||
return CURLE_RECV_ERROR;
|
||||
}
|
||||
cb_ctx.stream = stream;
|
||||
quiche_h3_event_for_each_header(ev, cb_each_header, &cb_ctx);
|
||||
CURL_TRC_CF(data, cf, "[%" PRIu64 "] <- [HEADERS]", stream->id);
|
||||
Curl_multi_mark_dirty(data);
|
||||
break;
|
||||
|
||||
}
|
||||
case QUICHE_H3_EVENT_DATA:
|
||||
if(!stream->closed) {
|
||||
result = cf_recv_body(cf, data);
|
||||
if(!stream->resp_hds_complete) {
|
||||
stream->resp_hds_complete = TRUE;
|
||||
cf_quiche_write_hd(cf, data, stream, "\r\n", 2, FALSE);
|
||||
}
|
||||
cf_quiche_recv_body(cf, data, stream);
|
||||
CURL_TRC_CF(data, cf, "[%" PRIu64 "] <- [DATA]", stream->id);
|
||||
Curl_multi_mark_dirty(data);
|
||||
break;
|
||||
|
||||
case QUICHE_H3_EVENT_RESET:
|
||||
|
|
@ -505,17 +532,17 @@ static CURLcode h3_process_event(struct Curl_cfilter *cf,
|
|||
stream->closed = TRUE;
|
||||
stream->reset = TRUE;
|
||||
stream->send_closed = TRUE;
|
||||
Curl_multi_mark_dirty(data);
|
||||
break;
|
||||
|
||||
case QUICHE_H3_EVENT_FINISHED:
|
||||
CURL_TRC_CF(data, cf, "[%" PRIu64 "] CLOSED", stream->id);
|
||||
if(!stream->resp_hds_complete) {
|
||||
result = write_resp_raw(cf, data, "\r\n", 2);
|
||||
if(result)
|
||||
return result;
|
||||
stream->resp_hds_complete = TRUE;
|
||||
cf_quiche_write_hd(cf, data, stream, "\r\n", 2, TRUE);
|
||||
}
|
||||
stream->closed = TRUE;
|
||||
Curl_multi_mark_dirty(data);
|
||||
break;
|
||||
|
||||
case QUICHE_H3_EVENT_GOAWAY:
|
||||
|
|
@ -527,20 +554,6 @@ static CURLcode h3_process_event(struct Curl_cfilter *cf,
|
|||
stream->id, quiche_h3_event_type(ev));
|
||||
break;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static CURLcode cf_quiche_ev_process(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
struct h3_stream_ctx *stream,
|
||||
quiche_h3_event *ev)
|
||||
{
|
||||
CURLcode result = h3_process_event(cf, data, stream, ev);
|
||||
Curl_multi_mark_dirty(data);
|
||||
if(result)
|
||||
CURL_TRC_CF(data, cf, "error processing event %s for [%" PRIu64 "] -> %d",
|
||||
cf_ev_name(ev), stream->id, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
struct cf_quich_disp_ctx {
|
||||
|
|
@ -548,7 +561,6 @@ struct cf_quich_disp_ctx {
|
|||
struct Curl_cfilter *cf;
|
||||
struct Curl_multi *multi;
|
||||
quiche_h3_event *ev;
|
||||
CURLcode result;
|
||||
};
|
||||
|
||||
static bool cf_quiche_disp_event(uint32_t mid, void *val, void *user_data)
|
||||
|
|
@ -559,7 +571,7 @@ static bool cf_quiche_disp_event(uint32_t mid, void *val, void *user_data)
|
|||
if(stream->id == dctx->stream_id) {
|
||||
struct Curl_easy *sdata = Curl_multi_get_easy(dctx->multi, mid);
|
||||
if(sdata)
|
||||
dctx->result = cf_quiche_ev_process(dctx->cf, sdata, stream, dctx->ev);
|
||||
cf_quiche_process_ev(dctx->cf, sdata, stream, dctx->ev);
|
||||
return FALSE; /* stop iterating */
|
||||
}
|
||||
return TRUE;
|
||||
|
|
@ -583,23 +595,22 @@ static CURLcode cf_poll_events(struct Curl_cfilter *cf,
|
|||
return CURLE_HTTP3;
|
||||
}
|
||||
else {
|
||||
struct cf_quich_disp_ctx dctx;
|
||||
dctx.stream_id = (uint64_t)rv;
|
||||
dctx.cf = cf;
|
||||
dctx.multi = data->multi;
|
||||
dctx.ev = ev;
|
||||
dctx.result = CURLE_OK;
|
||||
stream = H3_STREAM_CTX(ctx, data);
|
||||
if(stream && stream->id == dctx.stream_id) {
|
||||
if(stream && stream->id == (uint64_t)rv) {
|
||||
/* event for calling transfer */
|
||||
CURLcode result = cf_quiche_ev_process(cf, data, stream, ev);
|
||||
cf_quiche_process_ev(cf, data, stream, ev);
|
||||
quiche_h3_event_free(ev);
|
||||
if(result)
|
||||
return result;
|
||||
if(stream->xfer_result)
|
||||
return stream->xfer_result;
|
||||
}
|
||||
else {
|
||||
/* another transfer, do not return errors, as they are not for
|
||||
* the calling transfer */
|
||||
struct cf_quich_disp_ctx dctx;
|
||||
dctx.stream_id = (uint64_t)rv;
|
||||
dctx.cf = cf;
|
||||
dctx.multi = data->multi;
|
||||
dctx.ev = ev;
|
||||
Curl_uint32_hash_visit(&ctx->streams, cf_quiche_disp_event, &dctx);
|
||||
quiche_h3_event_free(ev);
|
||||
}
|
||||
|
|
@ -844,63 +855,46 @@ static CURLcode recv_closed_stream(struct Curl_cfilter *cf,
|
|||
}
|
||||
|
||||
static CURLcode cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
char *buf, size_t len, size_t *pnread)
|
||||
char *buf, size_t blen, size_t *pnread)
|
||||
{
|
||||
struct cf_quiche_ctx *ctx = cf->ctx;
|
||||
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
|
||||
CURLcode result = CURLE_OK;
|
||||
|
||||
*pnread = 0;
|
||||
(void)buf;
|
||||
(void)blen;
|
||||
vquic_ctx_update_time(&ctx->q);
|
||||
|
||||
if(!stream)
|
||||
return CURLE_RECV_ERROR;
|
||||
|
||||
if(!Curl_bufq_is_empty(&stream->recvbuf)) {
|
||||
result = Curl_bufq_cread(&stream->recvbuf, buf, len, pnread);
|
||||
CURL_TRC_CF(data, cf, "[%" PRIu64 "] read recvbuf(len=%zu) "
|
||||
"-> %d, %zu", stream->id, len, result, *pnread);
|
||||
if(result)
|
||||
goto out;
|
||||
}
|
||||
|
||||
result = cf_process_ingress(cf, data);
|
||||
if(result) {
|
||||
CURL_TRC_CF(data, cf, "cf_recv, error on ingress");
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* recvbuf had nothing before, maybe after progressing ingress? */
|
||||
if(!*pnread && !Curl_bufq_is_empty(&stream->recvbuf)) {
|
||||
result = Curl_bufq_cread(&stream->recvbuf, buf, len, pnread);
|
||||
CURL_TRC_CF(data, cf, "[%" PRIu64 "] read recvbuf(len=%zu) "
|
||||
"-> %d, %zu", stream->id, len, result, *pnread);
|
||||
if(result)
|
||||
goto out;
|
||||
if(stream->xfer_result) {
|
||||
cf_quiche_stream_close(cf, data, stream);
|
||||
result = stream->xfer_result;
|
||||
goto out;
|
||||
}
|
||||
|
||||
if(*pnread) {
|
||||
if(stream->closed)
|
||||
Curl_multi_mark_dirty(data);
|
||||
}
|
||||
else {
|
||||
if(stream->closed)
|
||||
result = recv_closed_stream(cf, data, pnread);
|
||||
else if(quiche_conn_is_draining(ctx->qconn)) {
|
||||
failf(data, "QUIC connection is draining");
|
||||
result = CURLE_HTTP3;
|
||||
}
|
||||
else
|
||||
result = CURLE_AGAIN;
|
||||
else if(stream->closed)
|
||||
result = recv_closed_stream(cf, data, pnread);
|
||||
else if(quiche_conn_is_draining(ctx->qconn)) {
|
||||
failf(data, "QUIC connection is draining");
|
||||
result = CURLE_HTTP3;
|
||||
}
|
||||
else
|
||||
result = CURLE_AGAIN;
|
||||
|
||||
out:
|
||||
result = Curl_1st_err(result, cf_flush_egress(cf, data));
|
||||
if(*pnread > 0)
|
||||
ctx->data_recvd += *pnread;
|
||||
CURL_TRC_CF(data, cf, "[%" PRIu64 "] cf_recv(total=%"
|
||||
FMT_OFF_T ") -> %d, %zu",
|
||||
stream->id, ctx->data_recvd, result, *pnread);
|
||||
CURL_TRC_CF(data, cf, "[%" PRIu64 "] cf_recv(len=%zu) -> %d, %zu, total=%"
|
||||
FMT_OFF_T, stream->id, blen, result, *pnread, ctx->data_recvd);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -1092,6 +1086,10 @@ static CURLcode cf_quiche_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
|||
goto out;
|
||||
stream = H3_STREAM_CTX(ctx, data);
|
||||
}
|
||||
else if(stream->xfer_result) {
|
||||
cf_quiche_stream_close(cf, data, stream);
|
||||
result = stream->xfer_result;
|
||||
}
|
||||
else if(stream->closed) {
|
||||
if(stream->resp_hds_complete) {
|
||||
/* sending request body on a stream that has been closed by the
|
||||
|
|
@ -1165,19 +1163,6 @@ static CURLcode cf_quiche_adjust_pollset(struct Curl_cfilter *cf,
|
|||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* Called from transfer.c:data_pending to know if we should keep looping
|
||||
* to receive more data from the connection.
|
||||
*/
|
||||
static bool cf_quiche_data_pending(struct Curl_cfilter *cf,
|
||||
const struct Curl_easy *data)
|
||||
{
|
||||
struct cf_quiche_ctx *ctx = cf->ctx;
|
||||
const struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
|
||||
(void)cf;
|
||||
return stream && !Curl_bufq_is_empty(&stream->recvbuf);
|
||||
}
|
||||
|
||||
static CURLcode h3_data_pause(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
bool pause)
|
||||
|
|
@ -1608,7 +1593,7 @@ struct Curl_cftype Curl_cft_http3 = {
|
|||
cf_quiche_close,
|
||||
cf_quiche_shutdown,
|
||||
cf_quiche_adjust_pollset,
|
||||
cf_quiche_data_pending,
|
||||
Curl_cf_def_data_pending,
|
||||
cf_quiche_send,
|
||||
cf_quiche_recv,
|
||||
cf_quiche_cntrl,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue