ratelimit: redesign

Description of how this works in `docs/internal/RATELIMITS.ms`.

Notable implementation changes:
- KEEP_SEND_PAUSE/KEEP_SEND_HOLD and KEEP_RECV_PAUSE/KEEP_RECV_HOLD
  no longer exist. Pausing is down via blocked the new rlimits.
- KEEP_SEND_TIMED no longer exists. Pausing "100-continue" transfers
  is done in the new `Curl_http_perform_pollset()` method.
- HTTP/2 rate limiting implemented via window updates. When
  transfer initiaiting connection has a ratelimit, adjust the
  initial window size
- HTTP/3 ngtcp2 rate limitin implemnented via ack updates
- HTTP/3 quiche does not seem to support this via its API
- the default progress-meter has been improved for accuracy
  in "current speed" results.

pytest speed tests have been improved.

Closes #19384
This commit is contained in:
Stefan Eissing 2025-11-11 14:26:48 +01:00 committed by Daniel Stenberg
parent bfde781121
commit 24b36fdd15
No known key found for this signature in database
GPG key ID: 5CC908FDB71E12C2
48 changed files with 1146 additions and 675 deletions

View file

@ -63,6 +63,7 @@ INTERNALDOCS = \
internals/MULTI-EV.md \ internals/MULTI-EV.md \
internals/NEW-PROTOCOL.md \ internals/NEW-PROTOCOL.md \
internals/PORTING.md \ internals/PORTING.md \
internals/RATELIMITS.md \
internals/README.md \ internals/README.md \
internals/SCORECARD.md \ internals/SCORECARD.md \
internals/SPLAY.md \ internals/SPLAY.md \

View file

@ -0,0 +1,100 @@
<!--
Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
SPDX-License-Identifier: curl
-->
# Rate Limiting Transfers
Rate limiting a transfer means that no more than "n bytes per second"
shall be sent or received. It can be set individually for both directions
via `CURLOPT_MAX_RECV_SPEED_LARGE` and `CURLOPT_MAX_SEND_SPEED_LARGE`. These
options may be adjusted for an ongoing transfer.
### Implementation Base
`ratelimit.[ch]` implements `struct Curl_rlimit` and functions to manage
such limits. It has the following properties:
* `rate_per_sec`: how many "tokens" can be used per second, 0 for infinite.
* `tokens`: the currently available tokens to consume
* `burst_per_sec`: an upper limit on tokens available
* `ts`: the microsecond timestamp of the last tokens update
* `spare_us`: elapsed microseconds that have not counted yet for a token update
* `blocked`: if the limit is blocked
Tokens can be *drained* from an `rlimit`. This reduces `tokens`, even to
negative values. To enforce the limits, tokens should not be drained
further when they reach 0, but such things may happen.
An `rlimit`can be asked how long to wait until `tokens` are positive again.
This is given in milliseconds. When token are available, this wait
time is 0.
Ideally a user of `rlimit` would consume the available tokens to 0, then
get a wait times of 1000ms, after which the set rate of tokens has
regenerated. Rinse and repeat.
Should a user drain twice the amount of the rate, tokens are negative
and the wait time is 2 seconds. The `spare_us` account for the
time that has passed for the consumption. When a user takes 250ms to
consume the rate, the wait time is then 750ms.
When a user drains nothing for two seconds, the available tokens would
grow to twice the rate, unless a burst rate is set.
Finally, an `rlimit` may be set to `blocked` and later unblocked again.
A blocked `rlimit` has no tokens available. This works also when the rate
is unlimited (`rate_per_sec` set to 0).
### Downloads
`rlimit` is in `data->progress.dl.rlimit`. `setopt.c` initializes it whenever
the application sets `CURLOPT_MAX_RECV_SPEED_LARGE`. This may be done
in the middle of a transfer.
`rlimit` tokens are drained in the "protocol" client writer. Checks for
capacity depend on the protocol:
* HTTP and other plain protocols: `transfer.c:sendrecv_dl()` reads only
up to capacity.
* HTTP/2: capacity is used to adjust a stream's window size. Since all
streams start with `64kb`, `rlimit` takes a few seconds to take effect.
* HTTP/3: ngtcp2 acknowledges stream data according to capacity. It
keeps track of bytes not acknowledged yet. This has the same effect as HTTP/2
window sizes.
(The quiche API does not offer control of `ACK`s and `rlimits` for download
do not work in that backend.)
### Uploads
`rlimit` is in `data->progress.ul.rlimit`. `setopt.c` initializes it whenever
the application sets `CURLOPT_MAX_SEND_SPEED_LARGE`. This may be done
in the middle of a transfer.
The upload capacity is checked in `Curl_client_read()` and readers are
only asked to read bytes up to the `rlimit` capacity. This limits upload
of data for all protocols in the same way.
### Pause/Unpause
Pausing of up-/downloads sets the corresponding `rlimit` to blocked. Unpausing
removes that block.
### Suspending transfers
While obeying the `rlimit` for up-/download leads to the desired transfer
rates, the other issue that needs care is CPU consumption.
`rlimits` are inspected when computing the "pollset" of a transfer. When
a transfer wants to send, but not send tokens are available, the `POLLOUT`
is removed from the pollset. Same for receiving.
For a transfer that is, due to `rlimit`, not able to progress, the pollset
is then empty. No socket events are monitored, no CPU activity
happens. For paused transfers, this is sufficient.
Draining `rlimit` happens when a transfer is in `PERFORM` state and
exhausted limits cause the timer `TOOFAST` to be set. When the fires,
the transfer runs again and `rlimit`s are re-evaluated.

View file

@ -234,6 +234,7 @@ LIB_CFILES = \
progress.c \ progress.c \
psl.c \ psl.c \
rand.c \ rand.c \
ratelimit.c \
rename.c \ rename.c \
request.c \ request.c \
rtsp.c \ rtsp.c \
@ -249,7 +250,6 @@ LIB_CFILES = \
socks.c \ socks.c \
socks_gssapi.c \ socks_gssapi.c \
socks_sspi.c \ socks_sspi.c \
speedcheck.c \
splay.c \ splay.c \
strcase.c \ strcase.c \
strdup.c \ strdup.c \
@ -366,6 +366,7 @@ LIB_HFILES = \
progress.h \ progress.h \
psl.h \ psl.h \
rand.h \ rand.h \
ratelimit.h \
rename.h \ rename.h \
request.h \ request.h \
rtsp.h \ rtsp.h \
@ -383,7 +384,6 @@ LIB_HFILES = \
sockaddr.h \ sockaddr.h \
socketpair.h \ socketpair.h \
socks.h \ socks.h \
speedcheck.h \
splay.h \ splay.h \
strcase.h \ strcase.h \
strdup.h \ strdup.h \

View file

@ -384,8 +384,8 @@ static CURLcode recv_CONNECT_resp(struct Curl_cfilter *cf,
/* socket buffer drained, return */ /* socket buffer drained, return */
return CURLE_OK; return CURLE_OK;
if(Curl_pgrsUpdate(data)) if(!result)
return CURLE_ABORTED_BY_CALLBACK; result = Curl_pgrsUpdate(data);
if(result) { if(result) {
ts->keepon = KEEPON_DONE; ts->keepon = KEEPON_DONE;
@ -565,10 +565,8 @@ static CURLcode H1_CONNECT(struct Curl_cfilter *cf,
/* read what is there */ /* read what is there */
CURL_TRC_CF(data, cf, "CONNECT receive"); CURL_TRC_CF(data, cf, "CONNECT receive");
result = recv_CONNECT_resp(cf, data, ts, &done); result = recv_CONNECT_resp(cf, data, ts, &done);
if(Curl_pgrsUpdate(data)) { if(!result)
result = CURLE_ABORTED_BY_CALLBACK; result = Curl_pgrsUpdate(data);
goto out;
}
/* error or not complete yet. return for more multi-multi */ /* error or not complete yet. return for more multi-multi */
if(result || !done) if(result || !done)
goto out; goto out;
@ -671,8 +669,7 @@ out:
/* The real request will follow the CONNECT, reset request partially */ /* The real request will follow the CONNECT, reset request partially */
Curl_req_soft_reset(&data->req, data); Curl_req_soft_reset(&data->req, data);
Curl_client_reset(data); Curl_client_reset(data);
Curl_pgrsSetUploadCounter(data, 0); Curl_pgrsReset(data);
Curl_pgrsSetDownloadCounter(data, 0);
tunnel_free(cf, data); tunnel_free(cf, data);
} }

View file

@ -60,7 +60,6 @@
#include "sendf.h" #include "sendf.h"
#include "escape.h" #include "escape.h"
#include "file.h" #include "file.h"
#include "speedcheck.h"
#include "multiif.h" #include "multiif.h"
#include "transfer.h" #include "transfer.h"
#include "url.h" #include "url.h"
@ -415,13 +414,10 @@ static CURLcode file_upload(struct Curl_easy *data,
Curl_pgrsSetUploadCounter(data, bytecount); Curl_pgrsSetUploadCounter(data, bytecount);
if(Curl_pgrsUpdate(data)) result = Curl_pgrsCheck(data);
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, curlx_now());
} }
if(!result && Curl_pgrsUpdate(data)) if(!result)
result = CURLE_ABORTED_BY_CALLBACK; result = Curl_pgrsUpdate(data);
out: out:
close(fd); close(fd);
@ -620,10 +616,7 @@ static CURLcode file_do(struct Curl_easy *data, bool *done)
if(result) if(result)
goto out; goto out;
if(Curl_pgrsUpdate(data)) result = Curl_pgrsCheck(data);
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, curlx_now());
if(result) if(result)
goto out; goto out;
} }
@ -657,8 +650,8 @@ static CURLcode file_do(struct Curl_easy *data, bool *done)
#endif #endif
} }
if(Curl_pgrsUpdate(data)) if(!result)
result = CURLE_ABORTED_BY_CALLBACK; result = Curl_pgrsUpdate(data);
out: out:
Curl_multi_xfer_buf_release(data, xfer_buf); Curl_multi_xfer_buf_release(data, xfer_buf);

View file

@ -65,7 +65,6 @@
#include "sockaddr.h" /* required for Curl_sockaddr_storage */ #include "sockaddr.h" /* required for Curl_sockaddr_storage */
#include "multiif.h" #include "multiif.h"
#include "url.h" #include "url.h"
#include "speedcheck.h"
#include "curlx/warnless.h" #include "curlx/warnless.h"
#include "http_proxy.h" #include "http_proxy.h"
#include "socks.h" #include "socks.h"
@ -675,8 +674,7 @@ static CURLcode getftpresponse(struct Curl_easy *data,
return CURLE_RECV_ERROR; return CURLE_RECV_ERROR;
} }
else if(ev == 0) { else if(ev == 0) {
if(Curl_pgrsUpdate(data)) result = Curl_pgrsUpdate(data);
return CURLE_ABORTED_BY_CALLBACK;
continue; /* just continue in our loop for the timeout duration */ continue; /* just continue in our loop for the timeout duration */
} }
} }
@ -4344,10 +4342,7 @@ CURLcode ftp_regular_transfer(struct Curl_easy *data,
bool connected = FALSE; bool connected = FALSE;
data->req.size = -1; /* make sure this is unknown at this point */ data->req.size = -1; /* make sure this is unknown at this point */
Curl_pgrsSetUploadCounter(data, 0); Curl_pgrsReset(data);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
ftpc->ctl_valid = TRUE; /* starts good */ ftpc->ctl_valid = TRUE; /* starts good */

View file

@ -129,9 +129,9 @@ const struct Curl_handler Curl_handler_http = {
ZERO_NULL, /* connecting */ ZERO_NULL, /* connecting */
ZERO_NULL, /* doing */ ZERO_NULL, /* doing */
ZERO_NULL, /* proto_pollset */ ZERO_NULL, /* proto_pollset */
Curl_http_do_pollset, /* doing_pollset */ Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */ ZERO_NULL, /* domore_pollset */
ZERO_NULL, /* perform_pollset */ Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */ ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */ Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */ Curl_http_write_resp_hd, /* write_resp_hd */
@ -159,9 +159,9 @@ const struct Curl_handler Curl_handler_https = {
NULL, /* connecting */ NULL, /* connecting */
ZERO_NULL, /* doing */ ZERO_NULL, /* doing */
NULL, /* proto_pollset */ NULL, /* proto_pollset */
Curl_http_do_pollset, /* doing_pollset */ Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */ ZERO_NULL, /* domore_pollset */
ZERO_NULL, /* perform_pollset */ Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */ ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */ Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */ Curl_http_write_resp_hd, /* write_resp_hd */
@ -1560,13 +1560,30 @@ CURLcode Curl_http_connect(struct Curl_easy *data, bool *done)
/* this returns the socket to wait for in the DO and DOING state for the multi /* this returns the socket to wait for in the DO and DOING state for the multi
interface and then we are always _sending_ a request and thus we wait for interface and then we are always _sending_ a request and thus we wait for
the single socket to become writable only */ the single socket to become writable only */
CURLcode Curl_http_do_pollset(struct Curl_easy *data, CURLcode Curl_http_doing_pollset(struct Curl_easy *data,
struct easy_pollset *ps) struct easy_pollset *ps)
{ {
/* write mode */ /* write mode */
return Curl_pollset_add_out(data, ps, data->conn->sock[FIRSTSOCKET]); return Curl_pollset_add_out(data, ps, data->conn->sock[FIRSTSOCKET]);
} }
CURLcode Curl_http_perform_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
struct connectdata *conn = data->conn;
CURLcode result = CURLE_OK;
if(CURL_WANT_RECV(data)) {
result = Curl_pollset_add_in(data, ps, conn->sock[FIRSTSOCKET]);
}
/* on a "Expect: 100-continue" timed wait, do not poll for outgoing */
if(!result && Curl_req_want_send(data) && !http_exp100_is_waiting(data)) {
result = Curl_pollset_add_out(data, ps, conn->sock[FIRSTSOCKET]);
}
return result;
}
/* /*
* Curl_http_done() gets called after a single HTTP request has been * Curl_http_done() gets called after a single HTTP request has been
* performed. * performed.
@ -4872,8 +4889,6 @@ static void http_exp100_continue(struct Curl_easy *data,
struct cr_exp100_ctx *ctx = reader->ctx; struct cr_exp100_ctx *ctx = reader->ctx;
if(ctx->state > EXP100_SEND_DATA) { if(ctx->state > EXP100_SEND_DATA) {
ctx->state = EXP100_SEND_DATA; ctx->state = EXP100_SEND_DATA;
data->req.keepon |= KEEP_SEND;
data->req.keepon &= ~KEEP_SEND_TIMED;
Curl_expire_done(data, EXPIRE_100_TIMEOUT); Curl_expire_done(data, EXPIRE_100_TIMEOUT);
} }
} }
@ -4903,8 +4918,6 @@ static CURLcode cr_exp100_read(struct Curl_easy *data,
ctx->state = EXP100_AWAITING_CONTINUE; ctx->state = EXP100_AWAITING_CONTINUE;
ctx->start = curlx_now(); ctx->start = curlx_now();
Curl_expire(data, data->set.expect_100_timeout, EXPIRE_100_TIMEOUT); Curl_expire(data, data->set.expect_100_timeout, EXPIRE_100_TIMEOUT);
data->req.keepon &= ~KEEP_SEND;
data->req.keepon |= KEEP_SEND_TIMED;
*nread = 0; *nread = 0;
*eos = FALSE; *eos = FALSE;
return CURLE_OK; return CURLE_OK;
@ -4917,8 +4930,6 @@ static CURLcode cr_exp100_read(struct Curl_easy *data,
ms = curlx_timediff_ms(curlx_now(), ctx->start); ms = curlx_timediff_ms(curlx_now(), ctx->start);
if(ms < data->set.expect_100_timeout) { if(ms < data->set.expect_100_timeout) {
DEBUGF(infof(data, "cr_exp100_read, AWAITING_CONTINUE, not expired")); DEBUGF(infof(data, "cr_exp100_read, AWAITING_CONTINUE, not expired"));
data->req.keepon &= ~KEEP_SEND;
data->req.keepon |= KEEP_SEND_TIMED;
*nread = 0; *nread = 0;
*eos = FALSE; *eos = FALSE;
return CURLE_OK; return CURLE_OK;
@ -4938,7 +4949,6 @@ static void cr_exp100_done(struct Curl_easy *data,
{ {
struct cr_exp100_ctx *ctx = reader->ctx; struct cr_exp100_ctx *ctx = reader->ctx;
ctx->state = premature ? EXP100_FAILED : EXP100_SEND_DATA; ctx->state = premature ? EXP100_FAILED : EXP100_SEND_DATA;
data->req.keepon &= ~KEEP_SEND_TIMED;
Curl_expire_done(data, EXPIRE_100_TIMEOUT); Curl_expire_done(data, EXPIRE_100_TIMEOUT);
} }

View file

@ -115,8 +115,10 @@ CURLcode Curl_http_setup_conn(struct Curl_easy *data,
CURLcode Curl_http(struct Curl_easy *data, bool *done); CURLcode Curl_http(struct Curl_easy *data, bool *done);
CURLcode Curl_http_done(struct Curl_easy *data, CURLcode, bool premature); CURLcode Curl_http_done(struct Curl_easy *data, CURLcode, bool premature);
CURLcode Curl_http_connect(struct Curl_easy *data, bool *done); CURLcode Curl_http_connect(struct Curl_easy *data, bool *done);
CURLcode Curl_http_do_pollset(struct Curl_easy *data, CURLcode Curl_http_doing_pollset(struct Curl_easy *data,
struct easy_pollset *ps); struct easy_pollset *ps);
CURLcode Curl_http_perform_pollset(struct Curl_easy *data,
struct easy_pollset *ps);
CURLcode Curl_http_write_resp(struct Curl_easy *data, CURLcode Curl_http_write_resp(struct Curl_easy *data,
const char *buf, size_t blen, const char *buf, size_t blen,
bool is_eos); bool is_eos);

View file

@ -98,33 +98,6 @@
#define H2_SETTINGS_IV_LEN 3 #define H2_SETTINGS_IV_LEN 3
#define H2_BINSETTINGS_LEN 80 #define H2_BINSETTINGS_LEN 80
static size_t populate_settings(nghttp2_settings_entry *iv,
struct Curl_easy *data)
{
iv[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
iv[0].value = Curl_multi_max_concurrent_streams(data->multi);
iv[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
iv[1].value = H2_STREAM_WINDOW_SIZE_INITIAL;
iv[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
iv[2].value = data->multi->push_cb != NULL;
return 3;
}
static ssize_t populate_binsettings(uint8_t *binsettings,
struct Curl_easy *data)
{
nghttp2_settings_entry iv[H2_SETTINGS_IV_LEN];
size_t ivlen;
ivlen = populate_settings(iv, data);
/* this returns number of bytes it wrote or a negative number on error. */
return nghttp2_pack_settings_payload(binsettings, H2_BINSETTINGS_LEN,
iv, ivlen);
}
struct cf_h2_ctx { struct cf_h2_ctx {
nghttp2_session *h2; nghttp2_session *h2;
/* The easy handle used in the current filter call, cleared at return */ /* The easy handle used in the current filter call, cleared at return */
@ -137,6 +110,7 @@ struct cf_h2_ctx {
struct uint_hash streams; /* hash of `data->mid` to `h2_stream_ctx` */ struct uint_hash streams; /* hash of `data->mid` to `h2_stream_ctx` */
size_t drain_total; /* sum of all stream's UrlState drain */ size_t drain_total; /* sum of all stream's UrlState drain */
uint32_t initial_win_size; /* current initial window size (settings) */
uint32_t max_concurrent_streams; uint32_t max_concurrent_streams;
uint32_t goaway_error; /* goaway error code from server */ uint32_t goaway_error; /* goaway error code from server */
int32_t remote_max_sid; /* max id processed by server */ int32_t remote_max_sid; /* max id processed by server */
@ -204,6 +178,60 @@ static void cf_h2_ctx_close(struct cf_h2_ctx *ctx)
} }
} }
static uint32_t cf_h2_initial_win_size(struct Curl_easy *data)
{
#if NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE
/* If the transfer has a rate-limit lower than the default initial
* stream window size, use that. It needs to be at least 8k or servers
* may be unhappy. */
if(data->progress.dl.rlimit.rate_per_step &&
(data->progress.dl.rlimit.rate_per_step < H2_STREAM_WINDOW_SIZE_INITIAL))
return CURLMAX((uint32_t)data->progress.dl.rlimit.rate_per_step, 8192);
#endif
return H2_STREAM_WINDOW_SIZE_INITIAL;
}
static size_t populate_settings(nghttp2_settings_entry *iv,
struct Curl_easy *data,
struct cf_h2_ctx *ctx)
{
iv[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
iv[0].value = Curl_multi_max_concurrent_streams(data->multi);
iv[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
iv[1].value = cf_h2_initial_win_size(data);
if(ctx)
ctx->initial_win_size = iv[1].value;
iv[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
iv[2].value = data->multi->push_cb != NULL;
return 3;
}
static ssize_t populate_binsettings(uint8_t *binsettings,
struct Curl_easy *data)
{
nghttp2_settings_entry iv[H2_SETTINGS_IV_LEN];
size_t ivlen;
ivlen = populate_settings(iv, data, NULL);
/* this returns number of bytes it wrote or a negative number on error. */
return nghttp2_pack_settings_payload(binsettings, H2_BINSETTINGS_LEN,
iv, ivlen);
}
static CURLcode cf_h2_update_settings(struct cf_h2_ctx *ctx,
uint32_t initial_win_size)
{
nghttp2_settings_entry entry;
entry.settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
entry.value = initial_win_size;
if(nghttp2_submit_settings(ctx->h2, NGHTTP2_FLAG_NONE, &entry, 1))
return CURLE_SEND_ERROR;
ctx->initial_win_size = initial_win_size;
return CURLE_OK;
}
static CURLcode nw_out_flush(struct Curl_cfilter *cf, static CURLcode nw_out_flush(struct Curl_cfilter *cf,
struct Curl_easy *data); struct Curl_easy *data);
@ -296,16 +324,18 @@ static void h2_stream_hash_free(unsigned int id, void *stream)
static int32_t cf_h2_get_desired_local_win(struct Curl_cfilter *cf, static int32_t cf_h2_get_desired_local_win(struct Curl_cfilter *cf,
struct Curl_easy *data) struct Curl_easy *data)
{ {
curl_off_t avail = Curl_rlimit_avail(&data->progress.dl.rlimit,
curlx_now());
(void)cf; (void)cf;
if(data->set.max_recv_speed && data->set.max_recv_speed < INT32_MAX) { if(avail < CURL_OFF_T_MAX) { /* limit in place */
/* The transfer should only receive `max_recv_speed` bytes per second. if(avail <= 0)
* We restrict the stream's local window size, so that the server cannot return 0;
* send us "too much" at a time. else if(avail < INT32_MAX)
* This gets less precise the higher the latency. */ return (int32_t)avail;
return (int32_t)data->set.max_recv_speed;
} }
#ifdef DEBUGBUILD #ifdef DEBUGBUILD
else { {
struct cf_h2_ctx *ctx = cf->ctx; struct cf_h2_ctx *ctx = cf->ctx;
CURL_TRC_CF(data, cf, "stream_win_max=%d", ctx->stream_win_max); CURL_TRC_CF(data, cf, "stream_win_max=%d", ctx->stream_win_max);
return ctx->stream_win_max; return ctx->stream_win_max;
@ -580,7 +610,7 @@ static CURLcode cf_h2_ctx_open(struct Curl_cfilter *cf,
nghttp2_settings_entry iv[H2_SETTINGS_IV_LEN]; nghttp2_settings_entry iv[H2_SETTINGS_IV_LEN];
size_t ivlen; size_t ivlen;
ivlen = populate_settings(iv, data); ivlen = populate_settings(iv, data, ctx);
rc = nghttp2_submit_settings(ctx->h2, NGHTTP2_FLAG_NONE, rc = nghttp2_submit_settings(ctx->h2, NGHTTP2_FLAG_NONE,
iv, ivlen); iv, ivlen);
if(rc) { if(rc) {
@ -2007,6 +2037,9 @@ static CURLcode stream_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
(void)len; (void)len;
*pnread = 0; *pnread = 0;
if(!stream->xfer_result)
stream->xfer_result = cf_h2_update_local_win(cf, data, stream);
if(stream->xfer_result) { if(stream->xfer_result) {
CURL_TRC_CF(data, cf, "[%d] xfer write failed", stream->id); CURL_TRC_CF(data, cf, "[%d] xfer write failed", stream->id);
result = stream->xfer_result; result = stream->xfer_result;
@ -2239,6 +2272,7 @@ static CURLcode h2_submit(struct h2_stream_ctx **pstream,
nghttp2_priority_spec pri_spec; nghttp2_priority_spec pri_spec;
size_t nwritten; size_t nwritten;
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
uint32_t initial_win_size;
*pnwritten = 0; *pnwritten = 0;
Curl_dynhds_init(&h2_headers, 0, DYN_HTTP_REQUEST); Curl_dynhds_init(&h2_headers, 0, DYN_HTTP_REQUEST);
@ -2276,6 +2310,15 @@ static CURLcode h2_submit(struct h2_stream_ctx **pstream,
if(!nghttp2_session_check_request_allowed(ctx->h2)) if(!nghttp2_session_check_request_allowed(ctx->h2))
CURL_TRC_CF(data, cf, "send request NOT allowed (via nghttp2)"); CURL_TRC_CF(data, cf, "send request NOT allowed (via nghttp2)");
/* Check the initial windows size of the transfer (rate-limits?) and
* send an updated settings on changes from previous value. */
initial_win_size = cf_h2_initial_win_size(data);
if(initial_win_size != ctx->initial_win_size) {
result = cf_h2_update_settings(ctx, initial_win_size);
if(result)
goto out;
}
switch(data->state.httpreq) { switch(data->state.httpreq) {
case HTTPREQ_POST: case HTTPREQ_POST:
case HTTPREQ_POST_FORM: case HTTPREQ_POST_FORM:

View file

@ -1938,10 +1938,7 @@ static CURLcode imap_regular_transfer(struct Curl_easy *data,
data->req.size = -1; data->req.size = -1;
/* Set the progress data */ /* Set the progress data */
Curl_pgrsSetUploadCounter(data, 0); Curl_pgrsReset(data);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
/* Carry out the perform */ /* Carry out the perform */
result = imap_perform(data, &connected, dophase_done); result = imap_perform(data, &connected, dophase_done);

View file

@ -509,7 +509,7 @@ static CURLcode ldap_do(struct Curl_easy *data, bool *done)
goto quit; goto quit;
} }
Curl_pgrsSetDownloadCounter(data, 0); Curl_pgrsReset(data);
rc = ldap_search_s(server, ludp->lud_dn, rc = ldap_search_s(server, ludp->lud_dn,
(curl_ldap_num_t)ludp->lud_scope, (curl_ldap_num_t)ludp->lud_scope,
ludp->lud_filter, ludp->lud_attrs, 0, &ldapmsg); ludp->lud_filter, ludp->lud_attrs, 0, &ldapmsg);

View file

@ -43,7 +43,6 @@
#include "select.h" #include "select.h"
#include "curlx/warnless.h" #include "curlx/warnless.h"
#include "curlx/wait.h" #include "curlx/wait.h"
#include "speedcheck.h"
#include "conncache.h" #include "conncache.h"
#include "multihandle.h" #include "multihandle.h"
#include "sigpipe.h" #include "sigpipe.h"
@ -923,79 +922,145 @@ void Curl_attach_connection(struct Curl_easy *data,
conn->handler->attach(data, conn); conn->handler->attach(data, conn);
} }
/* adjust pollset for rate limits/pauses */
static CURLcode multi_adjust_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
CURLcode result = CURLE_OK;
if(ps->n) {
struct curltime now = curlx_now();
bool send_blocked, recv_blocked;
recv_blocked = (Curl_rlimit_avail(&data->progress.dl.rlimit, now) <= 0);
send_blocked = (Curl_rlimit_avail(&data->progress.ul.rlimit, now) <= 0);
if(send_blocked || recv_blocked) {
int i;
for(i = 0; i <= SECONDARYSOCKET; ++i) {
curl_socket_t sock = data->conn->sock[i];
if(sock == CURL_SOCKET_BAD)
continue;
if(recv_blocked && Curl_pollset_want_recv(data, ps, sock)) {
result = Curl_pollset_remove_in(data, ps, sock);
if(result)
break;
}
if(send_blocked && Curl_pollset_want_send(data, ps, sock)) {
result = Curl_pollset_remove_out(data, ps, sock);
if(result)
break;
}
}
}
/* Not blocked and wanting to receive. If there is data pending
* in the connection filters, make transfer run again. */
if(!recv_blocked &&
((Curl_pollset_want_recv(data, ps, data->conn->sock[FIRSTSOCKET]) &&
Curl_conn_data_pending(data, FIRSTSOCKET)) ||
(Curl_pollset_want_recv(data, ps, data->conn->sock[SECONDARYSOCKET]) &&
Curl_conn_data_pending(data, SECONDARYSOCKET)))) {
CURL_TRC_M(data, "pollset[] has POLLIN, but there is still "
"buffered input -> mark as dirty");
Curl_multi_mark_dirty(data);
}
}
return result;
}
static CURLcode mstate_connecting_pollset(struct Curl_easy *data, static CURLcode mstate_connecting_pollset(struct Curl_easy *data,
struct easy_pollset *ps) struct easy_pollset *ps)
{ {
if(data->conn) { struct connectdata *conn = data->conn;
curl_socket_t sockfd = Curl_conn_get_first_socket(data); curl_socket_t sockfd;
if(sockfd != CURL_SOCKET_BAD) { CURLcode result = CURLE_OK;
/* Default is to wait to something from the server */
return Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0); if(Curl_xfer_recv_is_paused(data))
} return CURLE_OK;
/* If a socket is set, receiving is default. If the socket
* has not been determined yet (eyeballing), always ask the
* connection filters for what to monitor. */
sockfd = Curl_conn_get_first_socket(data);
if(sockfd != CURL_SOCKET_BAD) {
result = Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0);
if(!result)
result = multi_adjust_pollset(data, ps);
} }
return CURLE_OK; if(!result)
result = Curl_conn_adjust_pollset(data, conn, ps);
return result;
} }
static CURLcode mstate_protocol_pollset(struct Curl_easy *data, static CURLcode mstate_protocol_pollset(struct Curl_easy *data,
struct easy_pollset *ps) struct easy_pollset *ps)
{ {
struct connectdata *conn = data->conn; struct connectdata *conn = data->conn;
if(conn) { CURLcode result = CURLE_OK;
curl_socket_t sockfd;
if(conn->handler->proto_pollset) if(conn->handler->proto_pollset)
return conn->handler->proto_pollset(data, ps); result = conn->handler->proto_pollset(data, ps);
sockfd = conn->sock[FIRSTSOCKET]; else {
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
if(sockfd != CURL_SOCKET_BAD) { if(sockfd != CURL_SOCKET_BAD) {
/* Default is to wait to something from the server */ /* Default is to wait to something from the server */
return Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0); result = Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0);
} }
} }
return CURLE_OK; if(!result)
result = multi_adjust_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, conn, ps);
return result;
} }
static CURLcode mstate_do_pollset(struct Curl_easy *data, static CURLcode mstate_do_pollset(struct Curl_easy *data,
struct easy_pollset *ps) struct easy_pollset *ps)
{ {
struct connectdata *conn = data->conn; struct connectdata *conn = data->conn;
if(conn) { CURLcode result = CURLE_OK;
if(conn->handler->doing_pollset)
return conn->handler->doing_pollset(data, ps); if(conn->handler->doing_pollset)
else if(CONN_SOCK_IDX_VALID(conn->send_idx)) { result = conn->handler->doing_pollset(data, ps);
/* Default is that we want to send something to the server */ else if(CONN_SOCK_IDX_VALID(conn->send_idx)) {
return Curl_pollset_add_out( /* Default is that we want to send something to the server */
data, ps, conn->sock[conn->send_idx]); result = Curl_pollset_add_out(data, ps, conn->sock[conn->send_idx]);
}
} }
return CURLE_OK; if(!result)
result = multi_adjust_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, conn, ps);
return result;
} }
static CURLcode mstate_domore_pollset(struct Curl_easy *data, static CURLcode mstate_domore_pollset(struct Curl_easy *data,
struct easy_pollset *ps) struct easy_pollset *ps)
{ {
struct connectdata *conn = data->conn; struct connectdata *conn = data->conn;
if(conn) { CURLcode result = CURLE_OK;
if(conn->handler->domore_pollset)
return conn->handler->domore_pollset(data, ps); if(conn->handler->domore_pollset)
else if(CONN_SOCK_IDX_VALID(conn->send_idx)) { result = conn->handler->domore_pollset(data, ps);
/* Default is that we want to send something to the server */ else if(CONN_SOCK_IDX_VALID(conn->send_idx)) {
return Curl_pollset_add_out( /* Default is that we want to send something to the server */
data, ps, conn->sock[conn->send_idx]); result = Curl_pollset_add_out(data, ps, conn->sock[conn->send_idx]);
}
} }
return CURLE_OK; if(!result)
result = multi_adjust_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, conn, ps);
return result;
} }
static CURLcode mstate_perform_pollset(struct Curl_easy *data, static CURLcode mstate_perform_pollset(struct Curl_easy *data,
struct easy_pollset *ps) struct easy_pollset *ps)
{ {
struct connectdata *conn = data->conn; struct connectdata *conn = data->conn;
if(!conn) CURLcode result = CURLE_OK;
return CURLE_OK;
else if(conn->handler->perform_pollset) if(conn->handler->perform_pollset)
return conn->handler->perform_pollset(data, ps); result = conn->handler->perform_pollset(data, ps);
else { else {
/* Default is to obey the data->req.keepon flags for send/recv */ /* Default is to obey the data->req.keepon flags for send/recv */
CURLcode result = CURLE_OK;
if(CURL_WANT_RECV(data) && CONN_SOCK_IDX_VALID(conn->recv_idx)) { if(CURL_WANT_RECV(data) && CONN_SOCK_IDX_VALID(conn->recv_idx)) {
result = Curl_pollset_add_in( result = Curl_pollset_add_in(
data, ps, conn->sock[conn->recv_idx]); data, ps, conn->sock[conn->recv_idx]);
@ -1006,19 +1071,21 @@ static CURLcode mstate_perform_pollset(struct Curl_easy *data,
result = Curl_pollset_add_out( result = Curl_pollset_add_out(
data, ps, conn->sock[conn->send_idx]); data, ps, conn->sock[conn->send_idx]);
} }
return result;
} }
if(!result)
result = multi_adjust_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, conn, ps);
return result;
} }
/* Initializes `poll_set` with the current socket poll actions needed /* Initializes `poll_set` with the current socket poll actions needed
* for transfer `data`. */ * for transfer `data`. */
CURLMcode Curl_multi_pollset(struct Curl_easy *data, CURLMcode Curl_multi_pollset(struct Curl_easy *data,
struct easy_pollset *ps, struct easy_pollset *ps)
const char *caller)
{ {
CURLMcode mresult = CURLM_OK; CURLMcode mresult = CURLM_OK;
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
bool expect_sockets = TRUE;
/* If the transfer has no connection, this is fine. Happens when /* If the transfer has no connection, this is fine. Happens when
called via curl_multi_remove_handle() => Curl_multi_ev_assess() => called via curl_multi_remove_handle() => Curl_multi_ev_assess() =>
@ -1033,70 +1100,49 @@ CURLMcode Curl_multi_pollset(struct Curl_easy *data,
case MSTATE_SETUP: case MSTATE_SETUP:
case MSTATE_CONNECT: case MSTATE_CONNECT:
/* nothing to poll for yet */ /* nothing to poll for yet */
expect_sockets = FALSE;
break; break;
case MSTATE_RESOLVING: case MSTATE_RESOLVING:
result = Curl_resolv_pollset(data, ps); result = Curl_resolv_pollset(data, ps);
/* connection filters are not involved in this phase. It is OK if we get no
* sockets to wait for. Resolving can wake up from other sources. */
expect_sockets = FALSE;
break; break;
case MSTATE_CONNECTING: case MSTATE_CONNECTING:
case MSTATE_TUNNELING: case MSTATE_TUNNELING:
if(!Curl_xfer_recv_is_paused(data)) { result = mstate_connecting_pollset(data, ps);
result = mstate_connecting_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, data->conn, ps);
}
else
expect_sockets = FALSE;
break; break;
case MSTATE_PROTOCONNECT: case MSTATE_PROTOCONNECT:
case MSTATE_PROTOCONNECTING: case MSTATE_PROTOCONNECTING:
result = mstate_protocol_pollset(data, ps); result = mstate_protocol_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, data->conn, ps);
break; break;
case MSTATE_DO: case MSTATE_DO:
case MSTATE_DOING: case MSTATE_DOING:
result = mstate_do_pollset(data, ps); result = mstate_do_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, data->conn, ps);
break; break;
case MSTATE_DOING_MORE: case MSTATE_DOING_MORE:
result = mstate_domore_pollset(data, ps); result = mstate_domore_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, data->conn, ps);
break; break;
case MSTATE_DID: /* same as PERFORMING in regard to polling */ case MSTATE_DID: /* same as PERFORMING in regard to polling */
case MSTATE_PERFORMING: case MSTATE_PERFORMING:
result = mstate_perform_pollset(data, ps); result = mstate_perform_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, data->conn, ps);
break; break;
case MSTATE_RATELIMITING: case MSTATE_RATELIMITING:
/* we need to let time pass, ignore socket(s) */ /* we need to let time pass, ignore socket(s) */
expect_sockets = FALSE;
break; break;
case MSTATE_DONE: case MSTATE_DONE:
case MSTATE_COMPLETED: case MSTATE_COMPLETED:
case MSTATE_MSGSENT: case MSTATE_MSGSENT:
/* nothing more to poll for */ /* nothing more to poll for */
expect_sockets = FALSE;
break; break;
default: default:
failf(data, "multi_getsock: unexpected multi state %d", data->mstate); failf(data, "multi_getsock: unexpected multi state %d", data->mstate);
DEBUGASSERT(0); DEBUGASSERT(0);
expect_sockets = FALSE;
break; break;
} }
@ -1110,39 +1156,27 @@ CURLMcode Curl_multi_pollset(struct Curl_easy *data,
goto out; goto out;
} }
/* Unblocked and waiting to receive with buffered input.
* Make transfer run again at next opportunity. */
if(!Curl_xfer_is_blocked(data) && !Curl_xfer_is_too_fast(data) &&
((Curl_pollset_want_read(data, ps, data->conn->sock[FIRSTSOCKET]) &&
Curl_conn_data_pending(data, FIRSTSOCKET)) ||
(Curl_pollset_want_read(data, ps, data->conn->sock[SECONDARYSOCKET]) &&
Curl_conn_data_pending(data, SECONDARYSOCKET)))) {
CURL_TRC_M(data, "%s pollset[] has POLLIN, but there is still "
"buffered input to consume -> mark as dirty", caller);
Curl_multi_mark_dirty(data);
}
#ifndef CURL_DISABLE_VERBOSE_STRINGS #ifndef CURL_DISABLE_VERBOSE_STRINGS
if(CURL_TRC_M_is_verbose(data)) { if(CURL_TRC_M_is_verbose(data)) {
size_t timeout_count = Curl_llist_count(&data->state.timeoutlist); size_t timeout_count = Curl_llist_count(&data->state.timeoutlist);
switch(ps->n) { switch(ps->n) {
case 0: case 0:
CURL_TRC_M(data, "%s pollset[], timeouts=%zu, paused %d/%d (r/w)", CURL_TRC_M(data, "pollset[], timeouts=%zu, paused %d/%d (r/w)",
caller, timeout_count, timeout_count,
Curl_xfer_send_is_paused(data), Curl_xfer_send_is_paused(data),
Curl_xfer_recv_is_paused(data)); Curl_xfer_recv_is_paused(data));
break; break;
case 1: case 1:
CURL_TRC_M(data, "%s pollset[fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu", CURL_TRC_M(data, "pollset[fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu",
caller, ps->sockets[0], ps->sockets[0],
(ps->actions[0] & CURL_POLL_IN) ? "IN" : "", (ps->actions[0] & CURL_POLL_IN) ? "IN" : "",
(ps->actions[0] & CURL_POLL_OUT) ? "OUT" : "", (ps->actions[0] & CURL_POLL_OUT) ? "OUT" : "",
timeout_count); timeout_count);
break; break;
case 2: case 2:
CURL_TRC_M(data, "%s pollset[fd=%" FMT_SOCKET_T " %s%s, " CURL_TRC_M(data, "pollset[fd=%" FMT_SOCKET_T " %s%s, "
"fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu", "fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu",
caller, ps->sockets[0], ps->sockets[0],
(ps->actions[0] & CURL_POLL_IN) ? "IN" : "", (ps->actions[0] & CURL_POLL_IN) ? "IN" : "",
(ps->actions[0] & CURL_POLL_OUT) ? "OUT" : "", (ps->actions[0] & CURL_POLL_OUT) ? "OUT" : "",
ps->sockets[1], ps->sockets[1],
@ -1151,27 +1185,14 @@ CURLMcode Curl_multi_pollset(struct Curl_easy *data,
timeout_count); timeout_count);
break; break;
default: default:
CURL_TRC_M(data, "%s pollset[fds=%u], timeouts=%zu", CURL_TRC_M(data, "pollset[fds=%u], timeouts=%zu",
caller, ps->n, timeout_count); ps->n, timeout_count);
break; break;
} }
CURL_TRC_EASY_TIMERS(data); CURL_TRC_EASY_TIMERS(data);
} }
#endif #endif
if(expect_sockets && !ps->n && data->multi &&
!Curl_uint_bset_contains(&data->multi->dirty, data->mid) &&
!Curl_llist_count(&data->state.timeoutlist) &&
!Curl_cwriter_is_paused(data) && !Curl_creader_is_paused(data) &&
Curl_conn_is_ip_connected(data, FIRSTSOCKET)) {
/* We expected sockets for POLL monitoring, but none are set.
* We are not dirty (and run anyway).
* We are not waiting on any timer.
* None of the READ/WRITE directions are paused.
* We are connected to the server on IP level, at least. */
infof(data, "WARNING: no socket in pollset or timer, transfer may stall!");
DEBUGASSERT(0);
}
out: out:
return mresult; return mresult;
} }
@ -1205,7 +1226,7 @@ CURLMcode curl_multi_fdset(CURLM *m,
continue; continue;
} }
Curl_multi_pollset(data, &ps, "curl_multi_fdset"); Curl_multi_pollset(data, &ps);
for(i = 0; i < ps.n; i++) { for(i = 0; i < ps.n; i++) {
if(!FDSET_SOCK(ps.sockets[i])) if(!FDSET_SOCK(ps.sockets[i]))
/* pretend it does not exist */ /* pretend it does not exist */
@ -1268,7 +1289,7 @@ CURLMcode curl_multi_waitfds(CURLM *m,
Curl_uint_bset_remove(&multi->dirty, mid); Curl_uint_bset_remove(&multi->dirty, mid);
continue; continue;
} }
Curl_multi_pollset(data, &ps, "curl_multi_waitfds"); Curl_multi_pollset(data, &ps);
need += Curl_waitfds_add_ps(&cwfds, &ps); need += Curl_waitfds_add_ps(&cwfds, &ps);
} }
while(Curl_uint_bset_next(&multi->process, mid, &mid)); while(Curl_uint_bset_next(&multi->process, mid, &mid));
@ -1354,7 +1375,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
Curl_uint_bset_remove(&multi->dirty, mid); Curl_uint_bset_remove(&multi->dirty, mid);
continue; continue;
} }
Curl_multi_pollset(data, &ps, "multi_wait"); Curl_multi_pollset(data, &ps);
if(Curl_pollfds_add_ps(&cpfds, &ps)) { if(Curl_pollfds_add_ps(&cpfds, &ps)) {
result = CURLM_OUT_OF_MEMORY; result = CURLM_OUT_OF_MEMORY;
goto out; goto out;
@ -1907,35 +1928,28 @@ static CURLcode multi_follow(struct Curl_easy *data,
} }
static CURLcode mspeed_check(struct Curl_easy *data, static CURLcode mspeed_check(struct Curl_easy *data,
struct curltime *nowp) struct curltime now)
{ {
timediff_t recv_wait_ms = 0; timediff_t recv_wait_ms = 0;
timediff_t send_wait_ms = 0; timediff_t send_wait_ms = 0;
/* check if over send speed */ /* check if our send/recv limits require idle waits */
if(data->set.max_send_speed) send_wait_ms = Curl_rlimit_wait_ms(&data->progress.ul.rlimit, now);
send_wait_ms = Curl_pgrsLimitWaitTime(&data->progress.ul, recv_wait_ms = Curl_rlimit_wait_ms(&data->progress.dl.rlimit, now);
data->set.max_send_speed,
*nowp);
/* check if over recv speed */
if(data->set.max_recv_speed)
recv_wait_ms = Curl_pgrsLimitWaitTime(&data->progress.dl,
data->set.max_recv_speed,
*nowp);
if(send_wait_ms || recv_wait_ms) { if(send_wait_ms || recv_wait_ms) {
if(data->mstate != MSTATE_RATELIMITING) { if(data->mstate != MSTATE_RATELIMITING) {
Curl_ratelimit(data, *nowp);
multistate(data, MSTATE_RATELIMITING); multistate(data, MSTATE_RATELIMITING);
} }
Curl_expire(data, CURLMAX(send_wait_ms, recv_wait_ms), EXPIRE_TOOFAST); Curl_expire(data, CURLMAX(send_wait_ms, recv_wait_ms), EXPIRE_TOOFAST);
Curl_multi_clear_dirty(data); Curl_multi_clear_dirty(data);
CURL_TRC_M(data, "[RLIMIT] waiting %" FMT_TIMEDIFF_T "ms",
CURLMAX(send_wait_ms, recv_wait_ms));
return CURLE_AGAIN; return CURLE_AGAIN;
} }
else if(data->mstate != MSTATE_PERFORMING) { else if(data->mstate != MSTATE_PERFORMING) {
CURL_TRC_M(data, "[RLIMIT] wait over, continue");
multistate(data, MSTATE_PERFORMING); multistate(data, MSTATE_PERFORMING);
Curl_ratelimit(data, *nowp);
} }
return CURLE_OK; return CURLE_OK;
} }
@ -1951,7 +1965,7 @@ static CURLMcode state_performing(struct Curl_easy *data,
CURLcode result = *resultp = CURLE_OK; CURLcode result = *resultp = CURLE_OK;
*stream_errorp = FALSE; *stream_errorp = FALSE;
if(mspeed_check(data, nowp) == CURLE_AGAIN) if(mspeed_check(data, *nowp) == CURLE_AGAIN)
return CURLM_OK; return CURLM_OK;
/* read/write data if it is ready to do so */ /* read/write data if it is ready to do so */
@ -2073,7 +2087,8 @@ static CURLMcode state_performing(struct Curl_easy *data,
} }
} }
else { /* not errored, not done */ else { /* not errored, not done */
mspeed_check(data, nowp); *nowp = curlx_now();
mspeed_check(data, *nowp);
} }
free(newurl); free(newurl);
*resultp = result; *resultp = result;
@ -2228,10 +2243,7 @@ static CURLMcode state_ratelimiting(struct Curl_easy *data,
CURLMcode rc = CURLM_OK; CURLMcode rc = CURLM_OK;
DEBUGASSERT(data->conn); DEBUGASSERT(data->conn);
/* if both rates are within spec, resume transfer */ /* if both rates are within spec, resume transfer */
if(Curl_pgrsUpdate(data)) result = Curl_pgrsCheck(data);
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, *nowp);
if(result) { if(result) {
if(!(data->conn->handler->flags & PROTOPT_DUAL) && if(!(data->conn->handler->flags & PROTOPT_DUAL) &&
@ -2242,7 +2254,7 @@ static CURLMcode state_ratelimiting(struct Curl_easy *data,
multi_done(data, result, TRUE); multi_done(data, result, TRUE);
} }
else { else {
if(!mspeed_check(data, nowp)) if(!mspeed_check(data, *nowp))
rc = CURLM_CALL_MULTI_PERFORM; rc = CURLM_CALL_MULTI_PERFORM;
} }
*resultp = result; *resultp = result;
@ -2387,6 +2399,8 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
(HTTP/2), or the full connection for older protocols */ (HTTP/2), or the full connection for older protocols */
bool stream_error = FALSE; bool stream_error = FALSE;
rc = CURLM_OK; rc = CURLM_OK;
/* update at start for continuous increase when looping */
*nowp = curlx_now();
if(multi_ischanged(multi, TRUE)) { if(multi_ischanged(multi, TRUE)) {
CURL_TRC_M(data, "multi changed, check CONNECT_PEND queue"); CURL_TRC_M(data, "multi changed, check CONNECT_PEND queue");
@ -2704,16 +2718,18 @@ statemachine_end:
rc = CURLM_CALL_MULTI_PERFORM; rc = CURLM_CALL_MULTI_PERFORM;
} }
/* if there is still a connection to use, call the progress function */ /* if there is still a connection to use, call the progress function */
else if(data->conn && Curl_pgrsUpdate(data)) { else if(data->conn) {
/* aborted due to progress callback return code must close the result = Curl_pgrsUpdate(data);
connection */ if(result) {
result = CURLE_ABORTED_BY_CALLBACK; /* aborted due to progress callback return code must close the
streamclose(data->conn, "Aborted by callback"); connection */
streamclose(data->conn, "Aborted by callback");
/* if not yet in DONE state, go there, otherwise COMPLETED */ /* if not yet in DONE state, go there, otherwise COMPLETED */
multistate(data, (data->mstate < MSTATE_DONE) ? multistate(data, (data->mstate < MSTATE_DONE) ?
MSTATE_DONE : MSTATE_COMPLETED); MSTATE_DONE : MSTATE_COMPLETED);
rc = CURLM_CALL_MULTI_PERFORM; rc = CURLM_CALL_MULTI_PERFORM;
}
} }
} }

View file

@ -508,7 +508,7 @@ static CURLMcode mev_assess(struct Curl_multi *multi,
} }
} }
else else
Curl_multi_pollset(data, &ps, "ev assess"); Curl_multi_pollset(data, &ps);
last_ps = mev_get_last_pollset(data, conn); last_ps = mev_get_last_pollset(data, conn);
if(!last_ps && ps.n) { if(!last_ps && ps.n) {

View file

@ -73,8 +73,7 @@ CURLMcode Curl_multi_add_perform(struct Curl_multi *multi,
unsigned int Curl_multi_max_concurrent_streams(struct Curl_multi *multi); unsigned int Curl_multi_max_concurrent_streams(struct Curl_multi *multi);
CURLMcode Curl_multi_pollset(struct Curl_easy *data, CURLMcode Curl_multi_pollset(struct Curl_easy *data,
struct easy_pollset *ps, struct easy_pollset *ps);
const char *caller);
/** /**
* Borrow the transfer buffer from the multi, suitable * Borrow the transfer buffer from the multi, suitable

View file

@ -33,7 +33,6 @@
#include "sendf.h" #include "sendf.h"
#include "select.h" #include "select.h"
#include "progress.h" #include "progress.h"
#include "speedcheck.h"
#include "pingpong.h" #include "pingpong.h"
#include "multiif.h" #include "multiif.h"
#include "vtls/vtls.h" #include "vtls/vtls.h"
@ -122,11 +121,7 @@ CURLcode Curl_pp_statemach(struct Curl_easy *data,
if(block) { if(block) {
/* if we did not wait, we do not have to spend time on this now */ /* if we did not wait, we do not have to spend time on this now */
if(Curl_pgrsUpdate(data)) result = Curl_pgrsCheck(data);
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, curlx_now());
if(result) if(result)
return result; return result;
} }

View file

@ -1503,10 +1503,7 @@ static CURLcode pop3_regular_transfer(struct Curl_easy *data,
data->req.size = -1; data->req.size = -1;
/* Set the progress data */ /* Set the progress data */
Curl_pgrsSetUploadCounter(data, 0); Curl_pgrsReset(data);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
/* Carry out the perform */ /* Carry out the perform */
result = pop3_perform(data, &connected, dophase_done); result = pop3_perform(data, &connected, dophase_done);

View file

@ -28,6 +28,7 @@
#include "sendf.h" #include "sendf.h"
#include "multiif.h" #include "multiif.h"
#include "progress.h" #include "progress.h"
#include "transfer.h"
#include "curlx/timeval.h" #include "curlx/timeval.h"
/* check rate limits within this many recent milliseconds, at minimum. */ /* check rate limits within this many recent milliseconds, at minimum. */
@ -92,6 +93,55 @@ static char *max6data(curl_off_t bytes, char *max6)
} }
#endif #endif
static void pgrs_speedinit(struct Curl_easy *data)
{
memset(&data->state.keeps_speed, 0, sizeof(struct curltime));
}
/*
* @unittest: 1606
*/
UNITTEST CURLcode pgrs_speedcheck(struct Curl_easy *data,
struct curltime *pnow)
{
if(!data->set.low_speed_time || !data->set.low_speed_limit ||
Curl_xfer_recv_is_paused(data) || Curl_xfer_send_is_paused(data))
/* A paused transfer is not qualified for speed checks */
return CURLE_OK;
if((data->progress.current_speed >= 0) && data->set.low_speed_time) {
if(data->progress.current_speed < data->set.low_speed_limit) {
if(!data->state.keeps_speed.tv_sec)
/* under the limit at this moment */
data->state.keeps_speed = *pnow;
else {
/* how long has it been under the limit */
timediff_t howlong = curlx_timediff_ms(*pnow, data->state.keeps_speed);
if(howlong >= data->set.low_speed_time * 1000) {
/* too long */
failf(data,
"Operation too slow. "
"Less than %ld bytes/sec transferred the last %ld seconds",
data->set.low_speed_limit,
data->set.low_speed_time);
return CURLE_OPERATION_TIMEDOUT;
}
}
}
else
/* faster right now */
data->state.keeps_speed.tv_sec = 0;
}
if(data->set.low_speed_limit)
/* if low speed limit is enabled, set the expire timer to make this
connection's speed get checked again in a second */
Curl_expire(data, 1000, EXPIRE_SPEEDCHECK);
return CURLE_OK;
}
/* /*
New proposed interface, 9th of February 2000: New proposed interface, 9th of February 2000:
@ -119,10 +169,19 @@ int Curl_pgrsDone(struct Curl_easy *data)
* hidden */ * hidden */
curl_mfprintf(data->set.err, "\n"); curl_mfprintf(data->set.err, "\n");
data->progress.speeder_c = 0; /* reset the progress meter display */
return 0; return 0;
} }
void Curl_pgrsReset(struct Curl_easy *data)
{
Curl_pgrsSetUploadCounter(data, 0);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
data->progress.speeder_c = 0; /* reset speed records */
pgrs_speedinit(data);
}
/* reset the known transfer sizes */ /* reset the known transfer sizes */
void Curl_pgrsResetTransferSizes(struct Curl_easy *data) void Curl_pgrsResetTransferSizes(struct Curl_easy *data)
{ {
@ -130,6 +189,14 @@ void Curl_pgrsResetTransferSizes(struct Curl_easy *data)
Curl_pgrsSetUploadSize(data, -1); Curl_pgrsSetUploadSize(data, -1);
} }
void Curl_pgrsRecvPause(struct Curl_easy *data, bool enable)
{
if(!enable) {
data->progress.speeder_c = 0; /* reset speed records */
pgrs_speedinit(data); /* reset low speed measurements */
}
}
/* /*
* *
* Curl_pgrsTimeWas(). Store the timestamp time at the given label. * Curl_pgrsTimeWas(). Store the timestamp time at the given label.
@ -228,72 +295,11 @@ void Curl_pgrsStartNow(struct Curl_easy *data)
p->speeder_c = 0; /* reset the progress meter display */ p->speeder_c = 0; /* reset the progress meter display */
p->start = curlx_now(); p->start = curlx_now();
p->is_t_startransfer_set = FALSE; p->is_t_startransfer_set = FALSE;
p->ul.limit.start = p->start;
p->dl.limit.start = p->start;
p->ul.limit.start_size = 0;
p->dl.limit.start_size = 0;
p->dl.cur_size = 0; p->dl.cur_size = 0;
p->ul.cur_size = 0; p->ul.cur_size = 0;
/* the sizes are unknown at start */ /* the sizes are unknown at start */
p->dl_size_known = FALSE; p->dl_size_known = FALSE;
p->ul_size_known = FALSE; p->ul_size_known = FALSE;
Curl_ratelimit(data, p->start);
}
/*
* This is used to handle speed limits, calculating how many milliseconds to
* wait until we are back under the speed limit, if needed.
*
* The way it works is by having a "starting point" (time & amount of data
* transferred by then) used in the speed computation, to be used instead of
* the start of the transfer. This starting point is regularly moved as
* transfer goes on, to keep getting accurate values (instead of average over
* the entire transfer).
*
* This function takes the current amount of data transferred, the amount at
* the starting point, the limit (in bytes/s), the time of the starting point
* and the current time.
*
* Returns 0 if no waiting is needed or when no waiting is needed but the
* starting point should be reset (to current); or the number of milliseconds
* to wait to get back under the speed limit.
*/
timediff_t Curl_pgrsLimitWaitTime(struct pgrs_dir *d,
curl_off_t bytes_per_sec,
struct curltime now)
{
curl_off_t bytes = d->cur_size - d->limit.start_size;
timediff_t should_ms;
timediff_t took_ms;
/* no limit or we did not get to any bytes yet */
if(!bytes_per_sec || !bytes)
return 0;
/* The time it took us to have `bytes` */
took_ms = curlx_timediff_ceil_ms(now, d->limit.start);
/* The time it *should* have taken us to have `bytes`
* when obeying the bytes_per_sec speed_limit. */
if(bytes < CURL_OFF_T_MAX/1000) {
/* (1000 * bytes / (bytes / sec)) = 1000 * sec = ms */
should_ms = (timediff_t) (1000 * bytes / bytes_per_sec);
}
else {
/* large `bytes`, first calc the seconds it should have taken.
* if that is small enough, convert to milliseconds. */
should_ms = (timediff_t) (bytes / bytes_per_sec);
if(should_ms < TIMEDIFF_T_MAX/1000)
should_ms *= 1000;
else
should_ms = TIMEDIFF_T_MAX;
}
if(took_ms < should_ms) {
/* when gotten to `bytes` too fast, wait the difference */
return should_ms - took_ms;
}
return 0;
} }
/* /*
@ -304,28 +310,6 @@ void Curl_pgrsSetDownloadCounter(struct Curl_easy *data, curl_off_t size)
data->progress.dl.cur_size = size; data->progress.dl.cur_size = size;
} }
/*
* Update the timestamp and sizestamp to use for rate limit calculations.
*/
void Curl_ratelimit(struct Curl_easy *data, struct curltime now)
{
/* do not set a new stamp unless the time since last update is long enough */
if(data->set.max_recv_speed) {
if(curlx_timediff_ms(now, data->progress.dl.limit.start) >=
MIN_RATE_LIMIT_PERIOD) {
data->progress.dl.limit.start = now;
data->progress.dl.limit.start_size = data->progress.dl.cur_size;
}
}
if(data->set.max_send_speed) {
if(curlx_timediff_ms(now, data->progress.ul.limit.start) >=
MIN_RATE_LIMIT_PERIOD) {
data->progress.ul.limit.start = now;
data->progress.ul.limit.start_size = data->progress.ul.cur_size;
}
}
}
/* /*
* Set the number of uploaded bytes so far. * Set the number of uploaded bytes so far.
*/ */
@ -378,75 +362,82 @@ static curl_off_t trspeed(curl_off_t size, /* number of bytes */
} }
/* returns TRUE if it is time to show the progress meter */ /* returns TRUE if it is time to show the progress meter */
static bool progress_calc(struct Curl_easy *data, struct curltime now) static bool progress_calc(struct Curl_easy *data, struct curltime *pnow)
{ {
bool timetoshow = FALSE;
struct Progress * const p = &data->progress; struct Progress * const p = &data->progress;
int i_next, i_oldest, i_latest;
timediff_t duration_ms;
curl_off_t amount;
/* The time spent so far (from the start) in microseconds */ /* The time spent so far (from the start) in microseconds */
p->timespent = curlx_timediff_us(now, p->start); p->timespent = curlx_timediff_us(*pnow, p->start);
p->dl.speed = trspeed(p->dl.cur_size, p->timespent); p->dl.speed = trspeed(p->dl.cur_size, p->timespent);
p->ul.speed = trspeed(p->ul.cur_size, p->timespent); p->ul.speed = trspeed(p->ul.cur_size, p->timespent);
/* Calculations done at most once a second, unless end is reached */ if(!p->speeder_c) { /* no previous record exists */
if(p->lastshow != now.tv_sec) { p->speed_amount[0] = p->dl.cur_size + p->ul.cur_size;
int countindex; /* amount of seconds stored in the speeder array */ p->speed_time[0] = *pnow;
int nowindex = p->speeder_c% CURR_TIME;
p->lastshow = now.tv_sec;
timetoshow = TRUE;
/* Let's do the "current speed" thing, with the dl + ul speeds
combined. Store the speed at entry 'nowindex'. */
p->speeder[ nowindex ] = p->dl.cur_size + p->ul.cur_size;
/* remember the exact time for this moment */
p->speeder_time [ nowindex ] = now;
/* advance our speeder_c counter, which is increased every time we get
here and we expect it to never wrap as 2^32 is a lot of seconds! */
p->speeder_c++; p->speeder_c++;
/* use the overall average at the start */
p->current_speed = p->ul.speed + p->dl.speed;
p->lastshow = pnow->tv_sec;
return TRUE;
}
/* We have at least one record now. Where to put the next and
* where is the latest one? */
i_next = p->speeder_c % CURL_SPEED_RECORDS;
i_latest = (i_next > 0) ? (i_next - 1) : (CURL_SPEED_RECORDS - 1);
/* figure out how many index entries of data we have stored in our speeder /* Make a new record only when some time has passed.
array. With N_ENTRIES filled in, we have about N_ENTRIES-1 seconds of * Too frequent calls otherwise ruin the history. */
transfer. Imagine, after one second we have filled in two entries, if(curlx_timediff_ms(*pnow, p->speed_time[i_latest]) >= 1000) {
after two seconds we have filled in three entries etc. */ p->speeder_c++;
countindex = ((p->speeder_c >= CURR_TIME) ? CURR_TIME : p->speeder_c) - 1; i_latest = i_next;
p->speed_amount[i_latest] = p->dl.cur_size + p->ul.cur_size;
/* first of all, we do not do this if there is no counted seconds yet */ p->speed_time[i_latest] = *pnow;
if(countindex) { }
int checkindex; else if(data->req.done) {
timediff_t span_ms; /* When a transfer is done, and we did not have a current speed
curl_off_t amount; * already, update the last record. Otherwise, stay at the speed
* we have. The last chunk of data, when rate limiting, would increase
/* Get the index position to compare with the 'nowindex' position. * reported speed since it no longer measures a full second. */
Get the oldest entry possible. While we have less than CURR_TIME if(!p->current_speed) {
entries, the first entry will remain the oldest. */ p->speed_amount[i_latest] = p->dl.cur_size + p->ul.cur_size;
checkindex = (p->speeder_c >= CURR_TIME) ? p->speeder_c%CURR_TIME : 0; p->speed_time[i_latest] = *pnow;
/* Figure out the exact time for the time span */
span_ms = curlx_timediff_ms(now, p->speeder_time[checkindex]);
if(span_ms == 0)
span_ms = 1; /* at least one millisecond MUST have passed */
/* Calculate the average speed the last 'span_ms' milliseconds */
amount = p->speeder[nowindex]- p->speeder[checkindex];
if(amount > (0xffffffff/1000))
/* the 'amount' value is bigger than would fit in 32 bits if
multiplied with 1000, so we use the double math for this */
p->current_speed = (curl_off_t)
((double)amount/((double)span_ms/1000.0));
else
/* the 'amount' value is small enough to fit within 32 bits even
when multiplied with 1000 */
p->current_speed = amount * 1000/span_ms;
} }
else }
/* the first second we use the average */ else {
p->current_speed = p->ul.speed + p->dl.speed; /* transfer ongoing, wait for more time to pass. */
return FALSE;
}
} /* Calculations end */ i_oldest = (p->speeder_c < CURL_SPEED_RECORDS) ? 0 :
return timetoshow; ((i_latest + 1) % CURL_SPEED_RECORDS);
/* How much we transferred between oldest and current records */
amount = p->speed_amount[i_latest]- p->speed_amount[i_oldest];
/* How long this took */
duration_ms = curlx_timediff_ms(p->speed_time[i_latest],
p->speed_time[i_oldest]);
if(duration_ms <= 0)
duration_ms = 1;
if(amount > (CURL_OFF_T_MAX/1000)) {
/* the 'amount' value is bigger than would fit in 64 bits if
multiplied with 1000, so we use the double math for this */
p->current_speed = (curl_off_t)
(((double)amount * 1000.0)/(double)duration_ms);
}
else {
/* the 'amount' value is small enough to fit within 32 bits even
when multiplied with 1000 */
p->current_speed = amount * 1000 / duration_ms;
}
if((p->lastshow == pnow->tv_sec) && !data->req.done)
return FALSE;
p->lastshow = pnow->tv_sec;
return TRUE;
} }
#ifndef CURL_DISABLE_PROGRESS_METER #ifndef CURL_DISABLE_PROGRESS_METER
@ -568,7 +559,7 @@ static void progress_meter(struct Curl_easy *data)
* Curl_pgrsUpdate() returns 0 for success or the value returned by the * Curl_pgrsUpdate() returns 0 for success or the value returned by the
* progress callback! * progress callback!
*/ */
static int pgrsupdate(struct Curl_easy *data, bool showprogress) static CURLcode pgrsupdate(struct Curl_easy *data, bool showprogress)
{ {
if(!data->progress.hide) { if(!data->progress.hide) {
if(data->set.fxferinfo) { if(data->set.fxferinfo) {
@ -582,9 +573,11 @@ static int pgrsupdate(struct Curl_easy *data, bool showprogress)
data->progress.ul.cur_size); data->progress.ul.cur_size);
Curl_set_in_callback(data, FALSE); Curl_set_in_callback(data, FALSE);
if(result != CURL_PROGRESSFUNC_CONTINUE) { if(result != CURL_PROGRESSFUNC_CONTINUE) {
if(result) if(result) {
failf(data, "Callback aborted"); failf(data, "Callback aborted");
return result; return CURLE_ABORTED_BY_CALLBACK;
}
return CURLE_OK;
} }
} }
else if(data->set.fprogress) { else if(data->set.fprogress) {
@ -598,9 +591,11 @@ static int pgrsupdate(struct Curl_easy *data, bool showprogress)
(double)data->progress.ul.cur_size); (double)data->progress.ul.cur_size);
Curl_set_in_callback(data, FALSE); Curl_set_in_callback(data, FALSE);
if(result != CURL_PROGRESSFUNC_CONTINUE) { if(result != CURL_PROGRESSFUNC_CONTINUE) {
if(result) if(result) {
failf(data, "Callback aborted"); failf(data, "Callback aborted");
return result; return CURLE_ABORTED_BY_CALLBACK;
}
return CURLE_OK;
} }
} }
@ -608,14 +603,30 @@ static int pgrsupdate(struct Curl_easy *data, bool showprogress)
progress_meter(data); progress_meter(data);
} }
return 0; return CURLE_OK;
} }
int Curl_pgrsUpdate(struct Curl_easy *data) static CURLcode pgrs_update(struct Curl_easy *data, struct curltime *pnow)
{
bool showprogress = progress_calc(data, pnow);
return pgrsupdate(data, showprogress);
}
CURLcode Curl_pgrsUpdate(struct Curl_easy *data)
{ {
struct curltime now = curlx_now(); /* what time is it */ struct curltime now = curlx_now(); /* what time is it */
bool showprogress = progress_calc(data, now); return pgrs_update(data, &now);
return pgrsupdate(data, showprogress); }
CURLcode Curl_pgrsCheck(struct Curl_easy *data)
{
struct curltime now = curlx_now();
CURLcode result;
result = pgrs_update(data, &now);
if(!result && !data->req.done)
result = pgrs_speedcheck(data, &now);
return result;
} }
/* /*
@ -624,5 +635,5 @@ int Curl_pgrsUpdate(struct Curl_easy *data)
void Curl_pgrsUpdate_nometer(struct Curl_easy *data) void Curl_pgrsUpdate_nometer(struct Curl_easy *data)
{ {
struct curltime now = curlx_now(); /* what time is it */ struct curltime now = curlx_now(); /* what time is it */
(void)progress_calc(data, now); (void)progress_calc(data, &now);
} }

View file

@ -26,6 +26,7 @@
#include "curlx/timeval.h" #include "curlx/timeval.h"
struct Curl_easy;
typedef enum { typedef enum {
TIMER_NONE, TIMER_NONE,
@ -50,15 +51,23 @@ void Curl_pgrsSetUploadSize(struct Curl_easy *data, curl_off_t size);
void Curl_pgrsSetDownloadCounter(struct Curl_easy *data, curl_off_t size); void Curl_pgrsSetDownloadCounter(struct Curl_easy *data, curl_off_t size);
void Curl_pgrsSetUploadCounter(struct Curl_easy *data, curl_off_t size); void Curl_pgrsSetUploadCounter(struct Curl_easy *data, curl_off_t size);
void Curl_ratelimit(struct Curl_easy *data, struct curltime now);
int Curl_pgrsUpdate(struct Curl_easy *data);
void Curl_pgrsUpdate_nometer(struct Curl_easy *data);
/* perform progress update, invoking callbacks at intervals */
CURLcode Curl_pgrsUpdate(struct Curl_easy *data);
/* perform progress update, no callbacks invoked */
void Curl_pgrsUpdate_nometer(struct Curl_easy *data);
/* perform progress update with callbacks and speed checks */
CURLcode Curl_pgrsCheck(struct Curl_easy *data);
/* Inform progress/speedcheck about receive pausing */
void Curl_pgrsRecvPause(struct Curl_easy *data, bool enable);
/* Reset sizes and couners for up- and download. */
void Curl_pgrsReset(struct Curl_easy *data);
/* Reset sizes for up- and download. */
void Curl_pgrsResetTransferSizes(struct Curl_easy *data); void Curl_pgrsResetTransferSizes(struct Curl_easy *data);
struct curltime Curl_pgrsTime(struct Curl_easy *data, timerid timer); struct curltime Curl_pgrsTime(struct Curl_easy *data, timerid timer);
timediff_t Curl_pgrsLimitWaitTime(struct pgrs_dir *d,
curl_off_t speed_limit,
struct curltime now);
/** /**
* Update progress timer with the elapsed time from its start to `timestamp`. * Update progress timer with the elapsed time from its start to `timestamp`.
* This allows updating timers later and is used by happy eyeballing, where * This allows updating timers later and is used by happy eyeballing, where
@ -69,4 +78,9 @@ void Curl_pgrsTimeWas(struct Curl_easy *data, timerid timer,
void Curl_pgrsEarlyData(struct Curl_easy *data, curl_off_t sent); void Curl_pgrsEarlyData(struct Curl_easy *data, curl_off_t sent);
#ifdef UNITTESTS
UNITTEST CURLcode pgrs_speedcheck(struct Curl_easy *data,
struct curltime *pnow);
#endif
#endif /* HEADER_CURL_PROGRESS_H */ #endif /* HEADER_CURL_PROGRESS_H */

200
lib/ratelimit.c Normal file
View file

@ -0,0 +1,200 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curl_setup.h"
#include "curlx/timeval.h"
#include "ratelimit.h"
#define CURL_US_PER_SEC 1000000
#define CURL_RLIMIT_MIN_CHUNK (16 * 1024)
#define CURL_RLIMIT_MAX_STEPS 2 /* 500ms interval */
void Curl_rlimit_init(struct Curl_rlimit *r,
curl_off_t rate_per_s,
curl_off_t burst_per_s,
struct curltime ts)
{
curl_off_t rate_steps;
DEBUGASSERT(rate_per_s >= 0);
DEBUGASSERT(burst_per_s >= rate_per_s || !burst_per_s);
r->step_us = CURL_US_PER_SEC;
r->rate_per_step = rate_per_s;
r->burst_per_step = burst_per_s;
/* On rates that are multiples of CURL_RLIMIT_MIN_CHUNK, we reduce
* the interval `step_us` from 1 second to smaller steps with at
* most CURL_RLIMIT_MAX_STEPS.
* Smaller means more CPU, but also more precision. */
rate_steps = rate_per_s / CURL_RLIMIT_MIN_CHUNK;
rate_steps = CURLMIN(rate_steps, CURL_RLIMIT_MAX_STEPS);
if(rate_steps >= 2) {
r->step_us /= rate_steps;
r->rate_per_step /= rate_steps;
r->burst_per_step /= rate_steps;
}
r->tokens = r->rate_per_step;
r->spare_us = 0;
r->ts = ts;
r->blocked = FALSE;
}
void Curl_rlimit_start(struct Curl_rlimit *r, struct curltime ts)
{
r->tokens = r->rate_per_step;
r->spare_us = 0;
r->ts = ts;
}
bool Curl_rlimit_active(struct Curl_rlimit *r)
{
return (r->rate_per_step > 0) || r->blocked;
}
bool Curl_rlimit_is_blocked(struct Curl_rlimit *r)
{
return r->blocked;
}
static void ratelimit_update(struct Curl_rlimit *r,
struct curltime ts)
{
timediff_t elapsed_us, elapsed_steps;
curl_off_t token_gain;
DEBUGASSERT(r->rate_per_step);
if((r->ts.tv_sec == ts.tv_sec) && (r->ts.tv_usec == ts.tv_usec))
return;
elapsed_us = curlx_timediff_us(ts, r->ts);
if(elapsed_us < 0) { /* not going back in time */
curl_mfprintf(stderr, "rlimit: neg elapsed time %" FMT_TIMEDIFF_T "us\n",
elapsed_us);
DEBUGASSERT(0);
return;
}
elapsed_us += r->spare_us;
if(elapsed_us < r->step_us)
return;
/* we do the update */
r->ts = ts;
elapsed_steps = elapsed_us / r->step_us;
r->spare_us = elapsed_us % r->step_us;
/* How many tokens did we gain since the last update? */
if(r->rate_per_step > (CURL_OFF_T_MAX / elapsed_steps))
token_gain = CURL_OFF_T_MAX;
else {
token_gain = r->rate_per_step * elapsed_steps;
}
/* Limit the token again by the burst rate per second (if set), so we
* do not suddenly have a huge number of tokens after inactivity. */
r->tokens += token_gain;
if(r->burst_per_step && (r->tokens > r->burst_per_step)) {
r->tokens = r->burst_per_step;
}
}
curl_off_t Curl_rlimit_avail(struct Curl_rlimit *r,
struct curltime ts)
{
if(r->blocked)
return 0;
else if(r->rate_per_step) {
ratelimit_update(r, ts);
return r->tokens;
}
else
return CURL_OFF_T_MAX;
}
void Curl_rlimit_drain(struct Curl_rlimit *r,
size_t tokens,
struct curltime ts)
{
if(r->blocked || !r->rate_per_step)
return;
ratelimit_update(r, ts);
#if SIZEOF_CURL_OFF_T <= SIZEOF_SIZE_T
if(tokens > CURL_OFF_T_MAX) {
r->tokens = CURL_OFF_T_MIN;
return;
}
else
#endif
{
curl_off_t val = (curl_off_t)tokens;
if((CURL_OFF_T_MIN + val) < r->tokens)
r->tokens -= val;
else
r->tokens = CURL_OFF_T_MIN;
}
}
timediff_t Curl_rlimit_wait_ms(struct Curl_rlimit *r,
struct curltime ts)
{
timediff_t wait_us, elapsed_us;
if(r->blocked || !r->rate_per_step)
return 0;
ratelimit_update(r, ts);
if(r->tokens > 0)
return 0;
/* How much time will it take tokens to become positive again?
* Deduct `spare_us` and check against already elapsed time */
wait_us = (1 + (-r->tokens / r->rate_per_step)) * r->step_us;
wait_us -= r->spare_us;
elapsed_us = curlx_timediff_us(ts, r->ts);
if(elapsed_us >= wait_us)
return 0;
wait_us -= elapsed_us;
return (wait_us + 999) / 1000; /* in milliseconds */
}
void Curl_rlimit_block(struct Curl_rlimit *r,
bool activate,
struct curltime ts)
{
if(!activate == !r->blocked)
return;
r->ts = ts;
r->blocked = activate;
if(!r->blocked) {
/* Start rate limiting fresh. The amount of time this was blocked
* does not generate extra tokens. */
Curl_rlimit_start(r, ts);
}
else {
r->tokens = 0;
}
}

92
lib/ratelimit.h Normal file
View file

@ -0,0 +1,92 @@
#ifndef HEADER_Curl_rlimit_H
#define HEADER_Curl_rlimit_H
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curlx/timeval.h"
/* This is a rate limiter that provides "tokens" to be consumed
* per second with a "burst" rate limitation. Example:
* A rate limit of 1 megabyte per second with a burst rate of 1.5MB.
* - initially 1 million tokens are available.
* - these are drained in the first second.
* - checking available tokens before the 2nd second will return 0.
* - at/after the 2nd second, 1 million tokens are available again.
* - nothing happens for a second, the 1 million tokens would grow
* to 2 million, however the burst limit caps those at 1.5 million.
* Thus:
* - setting "burst" to CURL_OFF_T_MAX would average tokens over the
* complete lifetime. E.g. for a download, at the *end* of it, the
* average rate from start to finish would be the rate limit.
* - setting "burst" to the same value as "rate" would make a
* download always try to stay *at/below* the rate and slow times will
* not generate extra tokens.
* A rate limit can be blocked, causing the available tokens to become
* always 0 until unblocked. After unblocking, the rate limiting starts
* again with no history of the past.
* Finally, a rate limiter with rate 0 will always have CURL_OFF_T_MAX
* tokens available, unless blocked.
*/
struct Curl_rlimit {
curl_off_t rate_per_step; /* rate tokens are generated per step us */
curl_off_t burst_per_step; /* burst rate of tokens per step us */
timediff_t step_us; /* microseconds between token increases */
curl_off_t tokens; /* tokens available in the next second */
timediff_t spare_us; /* microseconds unaffecting tokens */
struct curltime ts; /* time of the last update */
BIT(blocked); /* blocking sets available tokens to 0 */
};
void Curl_rlimit_init(struct Curl_rlimit *r,
curl_off_t rate_per_s,
curl_off_t burst_per_s,
struct curltime ts);
/* Start ratelimiting with the given timestamp. Resets available tokens. */
void Curl_rlimit_start(struct Curl_rlimit *r, struct curltime ts);
/* How many milliseconds to wait until token are available again. */
timediff_t Curl_rlimit_wait_ms(struct Curl_rlimit *r,
struct curltime ts);
/* Return if rate limiting of tokens is active */
bool Curl_rlimit_active(struct Curl_rlimit *r);
bool Curl_rlimit_is_blocked(struct Curl_rlimit *r);
/* Return how many tokens are available to spend, may be negative */
curl_off_t Curl_rlimit_avail(struct Curl_rlimit *r,
struct curltime ts);
/* Drain tokens from the ratelimit, return how many are now available. */
void Curl_rlimit_drain(struct Curl_rlimit *r,
size_t tokens,
struct curltime ts);
/* Block/unblock ratelimiting. A blocked ratelimit has 0 tokens available. */
void Curl_rlimit_block(struct Curl_rlimit *r,
bool activate,
struct curltime ts);
#endif /* HEADER_Curl_rlimit_H */

View file

@ -258,7 +258,7 @@ static CURLcode req_set_upload_done(struct Curl_easy *data)
{ {
DEBUGASSERT(!data->req.upload_done); DEBUGASSERT(!data->req.upload_done);
data->req.upload_done = TRUE; data->req.upload_done = TRUE;
data->req.keepon &= ~(KEEP_SEND|KEEP_SEND_TIMED); /* we are done sending */ data->req.keepon &= ~KEEP_SEND; /* we are done sending */
Curl_pgrsTime(data, TIMER_POSTRANSFER); Curl_pgrsTime(data, TIMER_POSTRANSFER);
Curl_creader_done(data, data->req.upload_aborted); Curl_creader_done(data, data->req.upload_aborted);
@ -420,9 +420,9 @@ bool Curl_req_want_send(struct Curl_easy *data)
* - or request has buffered data to send * - or request has buffered data to send
* - or transfer connection has pending data to send */ * - or transfer connection has pending data to send */
return !data->req.done && return !data->req.done &&
(((data->req.keepon & KEEP_SENDBITS) == KEEP_SEND) || ((data->req.keepon & KEEP_SEND) ||
!Curl_req_sendbuf_empty(data) || !Curl_req_sendbuf_empty(data) ||
Curl_xfer_needs_flush(data)); Curl_xfer_needs_flush(data));
} }
bool Curl_req_done_sending(struct Curl_easy *data) bool Curl_req_done_sending(struct Curl_easy *data)
@ -458,8 +458,7 @@ CURLcode Curl_req_abort_sending(struct Curl_easy *data)
if(!data->req.upload_done) { if(!data->req.upload_done) {
Curl_bufq_reset(&data->req.sendbuf); Curl_bufq_reset(&data->req.sendbuf);
data->req.upload_aborted = TRUE; data->req.upload_aborted = TRUE;
/* no longer KEEP_SEND and KEEP_SEND_PAUSE */ data->req.keepon &= ~KEEP_SEND;
data->req.keepon &= ~KEEP_SENDBITS;
return req_set_upload_done(data); return req_set_upload_done(data);
} }
return CURLE_OK; return CURLE_OK;
@ -470,6 +469,6 @@ CURLcode Curl_req_stop_send_recv(struct Curl_easy *data)
/* stop receiving and ALL sending as well, including PAUSE and HOLD. /* stop receiving and ALL sending as well, including PAUSE and HOLD.
* We might still be paused on receive client writes though, so * We might still be paused on receive client writes though, so
* keep those bits around. */ * keep those bits around. */
data->req.keepon &= ~(KEEP_RECV|KEEP_SENDBITS); data->req.keepon &= ~(KEEP_RECV|KEEP_SEND);
return Curl_req_abort_sending(data); return Curl_req_abort_sending(data);
} }

View file

@ -130,6 +130,7 @@ struct SingleRequest {
BIT(sendbuf_init); /* sendbuf is initialized */ BIT(sendbuf_init); /* sendbuf is initialized */
BIT(shutdown); /* request end will shutdown connection */ BIT(shutdown); /* request end will shutdown connection */
BIT(shutdown_err_ignore); /* errors in shutdown will not fail request */ BIT(shutdown_err_ignore); /* errors in shutdown will not fail request */
BIT(reader_started); /* client reads have started */
}; };
/** /**

View file

@ -142,7 +142,7 @@ const struct Curl_handler Curl_handler_rtsp = {
ZERO_NULL, /* proto_pollset */ ZERO_NULL, /* proto_pollset */
rtsp_do_pollset, /* doing_pollset */ rtsp_do_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */ ZERO_NULL, /* domore_pollset */
ZERO_NULL, /* perform_pollset */ Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */ ZERO_NULL, /* disconnect */
rtsp_rtp_write_resp, /* write_resp */ rtsp_rtp_write_resp, /* write_resp */
rtsp_rtp_write_resp_hd, /* write_resp_hd */ rtsp_rtp_write_resp_hd, /* write_resp_hd */
@ -668,8 +668,7 @@ static CURLcode rtsp_do(struct Curl_easy *data, bool *done)
/* if a request-body has been sent off, we make sure this progress is /* if a request-body has been sent off, we make sure this progress is
noted properly */ noted properly */
Curl_pgrsSetUploadCounter(data, data->req.writebytecount); Curl_pgrsSetUploadCounter(data, data->req.writebytecount);
if(Curl_pgrsUpdate(data)) result = Curl_pgrsUpdate(data);
result = CURLE_ABORTED_BY_CALLBACK;
} }
out: out:
curlx_dyn_free(&req_buffer); curlx_dyn_free(&req_buffer);

View file

@ -711,7 +711,7 @@ void Curl_pollset_check(struct Curl_easy *data,
*pwant_read = *pwant_write = FALSE; *pwant_read = *pwant_write = FALSE;
} }
bool Curl_pollset_want_read(struct Curl_easy *data, bool Curl_pollset_want_recv(struct Curl_easy *data,
struct easy_pollset *ps, struct easy_pollset *ps,
curl_socket_t sock) curl_socket_t sock)
{ {
@ -723,3 +723,16 @@ bool Curl_pollset_want_read(struct Curl_easy *data,
} }
return FALSE; return FALSE;
} }
bool Curl_pollset_want_send(struct Curl_easy *data,
struct easy_pollset *ps,
curl_socket_t sock)
{
unsigned int i;
(void)data;
for(i = 0; i < ps->n; ++i) {
if((ps->sockets[i] == sock) && (ps->actions[i] & CURL_POLL_OUT))
return TRUE;
}
return FALSE;
}

View file

@ -163,8 +163,12 @@ CURLcode Curl_pollset_set(struct Curl_easy *data,
#define Curl_pollset_add_in(data, ps, sock) \ #define Curl_pollset_add_in(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), CURL_POLL_IN, 0) Curl_pollset_change((data), (ps), (sock), CURL_POLL_IN, 0)
#define Curl_pollset_remove_in(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), 0, CURL_POLL_IN)
#define Curl_pollset_add_out(data, ps, sock) \ #define Curl_pollset_add_out(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), CURL_POLL_OUT, 0) Curl_pollset_change((data), (ps), (sock), CURL_POLL_OUT, 0)
#define Curl_pollset_remove_out(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), 0, CURL_POLL_OUT)
#define Curl_pollset_add_inout(data, ps, sock) \ #define Curl_pollset_add_inout(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), \ Curl_pollset_change((data), (ps), (sock), \
CURL_POLL_IN|CURL_POLL_OUT, 0) CURL_POLL_IN|CURL_POLL_OUT, 0)
@ -188,10 +192,12 @@ void Curl_pollset_check(struct Curl_easy *data,
struct easy_pollset *ps, curl_socket_t sock, struct easy_pollset *ps, curl_socket_t sock,
bool *pwant_read, bool *pwant_write); bool *pwant_read, bool *pwant_write);
/** /* TRUE if the pollset contains socket with CURL_POLL_IN. */
* Return TRUE if the pollset contains socket with CURL_POLL_IN. bool Curl_pollset_want_recv(struct Curl_easy *data,
*/ struct easy_pollset *ps,
bool Curl_pollset_want_read(struct Curl_easy *data, curl_socket_t sock);
/* TRUE if the pollset contains socket with CURL_POLL_OUT. */
bool Curl_pollset_want_send(struct Curl_easy *data,
struct easy_pollset *ps, struct easy_pollset *ps,
curl_socket_t sock); curl_socket_t sock);

View file

@ -108,6 +108,7 @@ static void cl_reset_writer(struct Curl_easy *data)
static void cl_reset_reader(struct Curl_easy *data) static void cl_reset_reader(struct Curl_easy *data)
{ {
struct Curl_creader *reader = data->req.reader_stack; struct Curl_creader *reader = data->req.reader_stack;
data->req.reader_started = FALSE;
while(reader) { while(reader) {
data->req.reader_stack = reader->next; data->req.reader_stack = reader->next;
reader->crt->do_close(data, reader); reader->crt->do_close(data, reader);
@ -231,6 +232,7 @@ static CURLcode cw_download_write(struct Curl_easy *data,
if(!is_connect && !ctx->started_response) { if(!is_connect && !ctx->started_response) {
Curl_pgrsTime(data, TIMER_STARTTRANSFER); Curl_pgrsTime(data, TIMER_STARTTRANSFER);
Curl_rlimit_start(&data->progress.dl.rlimit, curlx_now());
ctx->started_response = TRUE; ctx->started_response = TRUE;
} }
@ -301,7 +303,9 @@ static CURLcode cw_download_write(struct Curl_easy *data,
if(result) if(result)
return result; return result;
} }
/* Update stats, write and report progress */ /* Update stats, write and report progress */
Curl_rlimit_drain(&data->progress.dl.rlimit, nwrite, curlx_now());
data->req.bytecount += nwrite; data->req.bytecount += nwrite;
Curl_pgrsSetDownloadCounter(data, data->req.bytecount); Curl_pgrsSetDownloadCounter(data, data->req.bytecount);
@ -1198,9 +1202,28 @@ CURLcode Curl_client_read(struct Curl_easy *data, char *buf, size_t blen,
return result; return result;
DEBUGASSERT(data->req.reader_stack); DEBUGASSERT(data->req.reader_stack);
} }
if(!data->req.reader_started) {
Curl_rlimit_start(&data->progress.ul.rlimit, curlx_now());
data->req.reader_started = TRUE;
}
if(Curl_rlimit_active(&data->progress.ul.rlimit)) {
curl_off_t ul_avail =
Curl_rlimit_avail(&data->progress.ul.rlimit, curlx_now());
if(ul_avail <= 0) {
result = CURLE_OK;
*eos = FALSE;
goto out;
}
if(ul_avail < (curl_off_t)blen)
blen = (size_t)ul_avail;
}
result = Curl_creader_read(data, data->req.reader_stack, buf, blen, result = Curl_creader_read(data, data->req.reader_stack, buf, blen,
nread, eos); nread, eos);
if(!result)
Curl_rlimit_drain(&data->progress.ul.rlimit, *nread, curlx_now());
out:
CURL_TRC_READ(data, "client_read(len=%zu) -> %d, nread=%zu, eos=%d", CURL_TRC_READ(data, "client_read(len=%zu) -> %d, nread=%zu, eos=%d",
blen, result, *nread, *eos); blen, result, *nread, *eos);
return result; return result;

View file

@ -2842,6 +2842,7 @@ static CURLcode setopt_offt(struct Curl_easy *data, CURLoption option,
if(offt < 0) if(offt < 0)
return CURLE_BAD_FUNCTION_ARGUMENT; return CURLE_BAD_FUNCTION_ARGUMENT;
s->max_send_speed = offt; s->max_send_speed = offt;
Curl_rlimit_init(&data->progress.ul.rlimit, offt, offt, curlx_now());
break; break;
case CURLOPT_MAX_RECV_SPEED_LARGE: case CURLOPT_MAX_RECV_SPEED_LARGE:
/* /*
@ -2851,6 +2852,7 @@ static CURLcode setopt_offt(struct Curl_easy *data, CURLoption option,
if(offt < 0) if(offt < 0)
return CURLE_BAD_FUNCTION_ARGUMENT; return CURLE_BAD_FUNCTION_ARGUMENT;
s->max_recv_speed = offt; s->max_recv_speed = offt;
Curl_rlimit_init(&data->progress.dl.rlimit, offt, offt, curlx_now());
break; break;
case CURLOPT_RESUME_FROM_LARGE: case CURLOPT_RESUME_FROM_LARGE:
/* /*

View file

@ -1697,10 +1697,7 @@ static CURLcode smtp_regular_transfer(struct Curl_easy *data,
data->req.size = -1; data->req.size = -1;
/* Set the progress data */ /* Set the progress data */
Curl_pgrsSetUploadCounter(data, 0); Curl_pgrsReset(data);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
/* Carry out the perform */ /* Carry out the perform */
result = smtp_perform(data, smtpc, smtp, &connected, dophase_done); result = smtp_perform(data, smtpc, smtp, &connected, dophase_done);

View file

@ -1,80 +0,0 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curl_setup.h"
#include <curl/curl.h>
#include "urldata.h"
#include "sendf.h"
#include "transfer.h"
#include "multiif.h"
#include "speedcheck.h"
void Curl_speedinit(struct Curl_easy *data)
{
memset(&data->state.keeps_speed, 0, sizeof(struct curltime));
}
/*
* @unittest: 1606
*/
CURLcode Curl_speedcheck(struct Curl_easy *data,
struct curltime now)
{
if(Curl_xfer_recv_is_paused(data) || Curl_xfer_send_is_paused(data))
/* A paused transfer is not qualified for speed checks */
return CURLE_OK;
if((data->progress.current_speed >= 0) && data->set.low_speed_time) {
if(data->progress.current_speed < data->set.low_speed_limit) {
if(!data->state.keeps_speed.tv_sec)
/* under the limit at this moment */
data->state.keeps_speed = now;
else {
/* how long has it been under the limit */
timediff_t howlong = curlx_timediff_ms(now, data->state.keeps_speed);
if(howlong >= data->set.low_speed_time * 1000) {
/* too long */
failf(data,
"Operation too slow. "
"Less than %ld bytes/sec transferred the last %ld seconds",
data->set.low_speed_limit,
data->set.low_speed_time);
return CURLE_OPERATION_TIMEDOUT;
}
}
}
else
/* faster right now */
data->state.keeps_speed.tv_sec = 0;
}
if(data->set.low_speed_limit)
/* if low speed limit is enabled, set the expire timer to make this
connection's speed get checked again in a second */
Curl_expire(data, 1000, EXPIRE_SPEEDCHECK);
return CURLE_OK;
}

View file

@ -1,35 +0,0 @@
#ifndef HEADER_CURL_SPEEDCHECK_H
#define HEADER_CURL_SPEEDCHECK_H
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curl_setup.h"
#include "curlx/timeval.h"
struct Curl_easy;
void Curl_speedinit(struct Curl_easy *data);
CURLcode Curl_speedcheck(struct Curl_easy *data,
struct curltime now);
#endif /* HEADER_CURL_SPEEDCHECK_H */

View file

@ -1659,9 +1659,10 @@ static CURLcode telnet_do(struct Curl_easy *data, bool *done)
} }
} }
if(Curl_pgrsUpdate(data)) { if(!result) {
result = CURLE_ABORTED_BY_CALLBACK; result = Curl_pgrsUpdate(data);
break; if(result)
keepon = FALSE;
} }
} }
#endif #endif

View file

@ -59,7 +59,6 @@
#include "multiif.h" #include "multiif.h"
#include "url.h" #include "url.h"
#include "strcase.h" #include "strcase.h"
#include "speedcheck.h"
#include "select.h" #include "select.h"
#include "escape.h" #include "escape.h"
#include "curlx/strerr.h" #include "curlx/strerr.h"
@ -1175,9 +1174,10 @@ static CURLcode tftp_receive_packet(struct Curl_easy *data,
} }
/* Update the progress meter */ /* Update the progress meter */
if(Curl_pgrsUpdate(data)) { result = Curl_pgrsUpdate(data);
if(result) {
tftp_state_machine(state, TFTP_EVENT_ERROR); tftp_state_machine(state, TFTP_EVENT_ERROR);
return CURLE_ABORTED_BY_CALLBACK; return result;
} }
} }
return result; return result;
@ -1297,10 +1297,7 @@ static CURLcode tftp_doing(struct Curl_easy *data, bool *dophase_done)
/* The multi code does not have this logic for the DOING state so we /* The multi code does not have this logic for the DOING state so we
provide it for TFTP since it may do the entire transfer in this provide it for TFTP since it may do the entire transfer in this
state. */ state. */
if(Curl_pgrsUpdate(data)) result = Curl_pgrsCheck(data);
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, curlx_now());
} }
return result; return result;
} }

View file

@ -65,7 +65,6 @@
#include "cw-out.h" #include "cw-out.h"
#include "transfer.h" #include "transfer.h"
#include "sendf.h" #include "sendf.h"
#include "speedcheck.h"
#include "progress.h" #include "progress.h"
#include "http.h" #include "http.h"
#include "url.h" #include "url.h"
@ -241,10 +240,9 @@ static CURLcode sendrecv_dl(struct Curl_easy *data,
char *buf, *xfer_buf; char *buf, *xfer_buf;
size_t blen, xfer_blen; size_t blen, xfer_blen;
int maxloops = 10; int maxloops = 10;
curl_off_t total_received = 0;
bool is_multiplex = FALSE; bool is_multiplex = FALSE;
bool rcvd_eagain = FALSE; bool rcvd_eagain = FALSE;
bool is_eos = FALSE; bool is_eos = FALSE, rate_limited = FALSE;
result = Curl_multi_xfer_buf_borrow(data, &xfer_buf, &xfer_blen); result = Curl_multi_xfer_buf_borrow(data, &xfer_buf, &xfer_blen);
if(result) if(result)
@ -265,15 +263,21 @@ static CURLcode sendrecv_dl(struct Curl_easy *data,
buf = xfer_buf; buf = xfer_buf;
bytestoread = xfer_blen; bytestoread = xfer_blen;
if(bytestoread && data->set.max_recv_speed > 0) { if(bytestoread && Curl_rlimit_active(&data->progress.dl.rlimit)) {
/* In case of speed limit on receiving: if this loop already got curl_off_t dl_avail = Curl_rlimit_avail(&data->progress.dl.rlimit,
* a quarter of the quota, break out. We want to stutter a bit curlx_now());
* to keep in the limit, but too small receives will just cost /* DEBUGF(infof(data, "dl_rlimit, available=%" FMT_OFF_T, dl_avail));
* cpu unnecessarily. */ */
if(total_received && (total_received >= (data->set.max_recv_speed / 4))) /* In case of rate limited downloads: if this loop already got
* data and less than 16k is left in the limit, break out.
* We want to stutter a bit to keep in the limit, but too small
* receives will just cost cpu unnecessarily. */
if(dl_avail <= 0) {
rate_limited = TRUE;
break; break;
if(data->set.max_recv_speed < (curl_off_t)bytestoread) }
bytestoread = (size_t)data->set.max_recv_speed; if(dl_avail < (curl_off_t)bytestoread)
bytestoread = (size_t)dl_avail;
} }
rcvd_eagain = FALSE; rcvd_eagain = FALSE;
@ -315,7 +319,6 @@ static CURLcode sendrecv_dl(struct Curl_easy *data,
if(k->eos_written) /* already did write this to client, leave */ if(k->eos_written) /* already did write this to client, leave */
break; break;
} }
total_received += blen;
result = Curl_xfer_write_resp(data, buf, blen, is_eos); result = Curl_xfer_write_resp(data, buf, blen, is_eos);
if(result || data->req.done) if(result || data->req.done)
@ -327,13 +330,13 @@ static CURLcode sendrecv_dl(struct Curl_easy *data,
if((!is_multiplex && data->req.download_done) || is_eos) { if((!is_multiplex && data->req.download_done) || is_eos) {
data->req.keepon &= ~KEEP_RECV; data->req.keepon &= ~KEEP_RECV;
} }
/* if we are PAUSEd or stopped receiving, leave the loop */ /* if we stopped receiving, leave the loop */
if((k->keepon & KEEP_RECV_PAUSE) || !(k->keepon & KEEP_RECV)) if(!(k->keepon & KEEP_RECV))
break; break;
} while(maxloops--); } while(maxloops--);
if(!is_eos && !Curl_xfer_is_blocked(data) && if(!is_eos && !rate_limited && CURL_WANT_RECV(data) &&
(!rcvd_eagain || data_pending(data, rcvd_eagain))) { (!rcvd_eagain || data_pending(data, rcvd_eagain))) {
/* Did not read until EAGAIN/EOS or there is still data pending /* Did not read until EAGAIN/EOS or there is still data pending
* in buffers. Mark as read-again via simulated SELECT results. */ * in buffers. Mark as read-again via simulated SELECT results. */
@ -396,16 +399,13 @@ CURLcode Curl_sendrecv(struct Curl_easy *data, struct curltime *nowp)
} }
/* If we still have writing to do, we check if we have a writable socket. */ /* If we still have writing to do, we check if we have a writable socket. */
if(Curl_req_want_send(data) || (data->req.keepon & KEEP_SEND_TIMED)) { if(Curl_req_want_send(data)) {
result = sendrecv_ul(data); result = sendrecv_ul(data);
if(result) if(result)
goto out; goto out;
} }
if(Curl_pgrsUpdate(data)) result = Curl_pgrsCheck(data);
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, *nowp);
if(result) if(result)
goto out; goto out;
@ -440,16 +440,14 @@ CURLcode Curl_sendrecv(struct Curl_easy *data, struct curltime *nowp)
result = CURLE_PARTIAL_FILE; result = CURLE_PARTIAL_FILE;
goto out; goto out;
} }
if(Curl_pgrsUpdate(data)) {
result = CURLE_ABORTED_BY_CALLBACK;
goto out;
}
} }
/* If there is nothing more to send/recv, the request is done */ /* If there is nothing more to send/recv, the request is done */
if((k->keepon & (KEEP_RECVBITS|KEEP_SENDBITS)) == 0) if((k->keepon & (KEEP_RECV|KEEP_SEND)) == 0)
data->req.done = TRUE; data->req.done = TRUE;
result = Curl_pgrsUpdate(data);
out: out:
if(result) if(result)
DEBUGF(infof(data, "Curl_sendrecv() -> %d", result)); DEBUGF(infof(data, "Curl_sendrecv() -> %d", result));
@ -913,51 +911,30 @@ bool Curl_xfer_is_blocked(struct Curl_easy *data)
bool Curl_xfer_send_is_paused(struct Curl_easy *data) bool Curl_xfer_send_is_paused(struct Curl_easy *data)
{ {
return (data->req.keepon & KEEP_SEND_PAUSE); return Curl_rlimit_is_blocked(&data->progress.ul.rlimit);
} }
bool Curl_xfer_recv_is_paused(struct Curl_easy *data) bool Curl_xfer_recv_is_paused(struct Curl_easy *data)
{ {
return (data->req.keepon & KEEP_RECV_PAUSE); return Curl_rlimit_is_blocked(&data->progress.dl.rlimit);
} }
CURLcode Curl_xfer_pause_send(struct Curl_easy *data, bool enable) CURLcode Curl_xfer_pause_send(struct Curl_easy *data, bool enable)
{ {
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
if(enable) { Curl_rlimit_block(&data->progress.ul.rlimit, enable, curlx_now());
data->req.keepon |= KEEP_SEND_PAUSE; if(!enable && Curl_creader_is_paused(data))
} result = Curl_creader_unpause(data);
else {
data->req.keepon &= ~KEEP_SEND_PAUSE;
if(Curl_creader_is_paused(data))
result = Curl_creader_unpause(data);
}
return result; return result;
} }
CURLcode Curl_xfer_pause_recv(struct Curl_easy *data, bool enable) CURLcode Curl_xfer_pause_recv(struct Curl_easy *data, bool enable)
{ {
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
if(enable) { Curl_rlimit_block(&data->progress.dl.rlimit, enable, curlx_now());
data->req.keepon |= KEEP_RECV_PAUSE; if(!enable && Curl_cwriter_is_paused(data))
} result = Curl_cwriter_unpause(data);
else {
data->req.keepon &= ~KEEP_RECV_PAUSE;
if(Curl_cwriter_is_paused(data))
result = Curl_cwriter_unpause(data);
}
Curl_conn_ev_data_pause(data, enable); Curl_conn_ev_data_pause(data, enable);
Curl_pgrsRecvPause(data, enable);
return result; return result;
} }
bool Curl_xfer_is_too_fast(struct Curl_easy *data)
{
struct Curl_llist_node *e = Curl_llist_head(&data->state.timeoutlist);
while(e) {
struct time_node *n = Curl_node_elem(e);
e = Curl_node_next(e);
if(n->eid == EXPIRE_TOOFAST)
return TRUE;
}
return FALSE;
}

View file

@ -143,7 +143,4 @@ bool Curl_xfer_recv_is_paused(struct Curl_easy *data);
CURLcode Curl_xfer_pause_send(struct Curl_easy *data, bool enable); CURLcode Curl_xfer_pause_send(struct Curl_easy *data, bool enable);
CURLcode Curl_xfer_pause_recv(struct Curl_easy *data, bool enable); CURLcode Curl_xfer_pause_recv(struct Curl_easy *data, bool enable);
/* Query if transfer has expire timeout TOOFAST set. */
bool Curl_xfer_is_too_fast(struct Curl_easy *data);
#endif /* HEADER_CURL_TRANSFER_H */ #endif /* HEADER_CURL_TRANSFER_H */

View file

@ -88,7 +88,6 @@
#include "select.h" #include "select.h"
#include "multiif.h" #include "multiif.h"
#include "easyif.h" #include "easyif.h"
#include "speedcheck.h"
#include "curlx/warnless.h" #include "curlx/warnless.h"
#include "getinfo.h" #include "getinfo.h"
#include "pop3.h" #include "pop3.h"
@ -3884,7 +3883,6 @@ CURLcode Curl_connect(struct Curl_easy *data,
CURLcode Curl_init_do(struct Curl_easy *data, struct connectdata *conn) CURLcode Curl_init_do(struct Curl_easy *data, struct connectdata *conn)
{ {
/* if this is a pushed stream, we need this: */
CURLcode result; CURLcode result;
if(conn) { if(conn) {
@ -3904,9 +3902,7 @@ CURLcode Curl_init_do(struct Curl_easy *data, struct connectdata *conn)
result = Curl_req_start(&data->req, data); result = Curl_req_start(&data->req, data);
if(!result) { if(!result) {
Curl_speedinit(data); Curl_pgrsReset(data);
Curl_pgrsSetUploadCounter(data, 0);
Curl_pgrsSetDownloadCounter(data, 0);
} }
return result; return result;
} }

View file

@ -156,6 +156,7 @@ typedef unsigned int curl_prot_t;
#include "curlx/dynbuf.h" #include "curlx/dynbuf.h"
#include "dynhds.h" #include "dynhds.h"
#include "request.h" #include "request.h"
#include "ratelimit.h"
#include "netrc.h" #include "netrc.h"
/* On error return, the value of `pnwritten` has no meaning */ /* On error return, the value of `pnwritten` has no meaning */
@ -426,30 +427,11 @@ struct hostname {
#define KEEP_NONE 0 #define KEEP_NONE 0
#define KEEP_RECV (1<<0) /* there is or may be data to read */ #define KEEP_RECV (1<<0) /* there is or may be data to read */
#define KEEP_SEND (1<<1) /* there is or may be data to write */ #define KEEP_SEND (1<<1) /* there is or may be data to write */
#define KEEP_RECV_HOLD (1<<2) /* when set, no reading should be done but there
might still be data to read */
#define KEEP_SEND_HOLD (1<<3) /* when set, no writing should be done but there
might still be data to write */
#define KEEP_RECV_PAUSE (1<<4) /* reading is paused */
#define KEEP_SEND_PAUSE (1<<5) /* writing is paused */
/* KEEP_SEND_TIMED is set when the transfer should attempt sending /* transfer wants to send */
* at timer (or other) events. A transfer waiting on a timer will #define CURL_WANT_SEND(data) ((data)->req.keepon & KEEP_SEND)
* remove KEEP_SEND to suppress POLLOUTs of the connection. /* transfer wants to receive */
* Adding KEEP_SEND_TIMED will then attempt to send whenever the transfer #define CURL_WANT_RECV(data) ((data)->req.keepon & KEEP_RECV)
* enters the "readwrite" loop, e.g. when a timer fires.
* This is used in HTTP for 'Expect: 100-continue' waiting. */
#define KEEP_SEND_TIMED (1<<6)
#define KEEP_RECVBITS (KEEP_RECV | KEEP_RECV_HOLD | KEEP_RECV_PAUSE)
#define KEEP_SENDBITS (KEEP_SEND | KEEP_SEND_HOLD | KEEP_SEND_PAUSE)
/* transfer wants to send is not PAUSE or HOLD */
#define CURL_WANT_SEND(data) \
(((data)->req.keepon & KEEP_SENDBITS) == KEEP_SEND)
/* transfer receive is not on PAUSE or HOLD */
#define CURL_WANT_RECV(data) \
(((data)->req.keepon & KEEP_RECVBITS) == KEEP_RECV)
#define FIRSTSOCKET 0 #define FIRSTSOCKET 0
#define SECONDARYSOCKET 1 #define SECONDARYSOCKET 1
@ -805,16 +787,11 @@ struct PureInfo {
BIT(used_proxy); /* the transfer used a proxy */ BIT(used_proxy); /* the transfer used a proxy */
}; };
struct pgrs_measure {
struct curltime start; /* when measure started */
curl_off_t start_size; /* the 'cur_size' the measure started at */
};
struct pgrs_dir { struct pgrs_dir {
curl_off_t total_size; /* total expected bytes */ curl_off_t total_size; /* total expected bytes */
curl_off_t cur_size; /* transferred bytes so far */ curl_off_t cur_size; /* transferred bytes so far */
curl_off_t speed; /* bytes per second transferred */ curl_off_t speed; /* bytes per second transferred */
struct pgrs_measure limit; struct Curl_rlimit rlimit; /* speed limiting / pausing */
}; };
struct Progress { struct Progress {
@ -843,10 +820,10 @@ struct Progress {
struct curltime t_startqueue; struct curltime t_startqueue;
struct curltime t_acceptdata; struct curltime t_acceptdata;
#define CURR_TIME (5 + 1) /* 6 entries for 5 seconds */ #define CURL_SPEED_RECORDS (5 + 1) /* 6 entries for 5 seconds */
curl_off_t speeder[ CURR_TIME ]; curl_off_t speed_amount[ CURL_SPEED_RECORDS ];
struct curltime speeder_time[ CURR_TIME ]; struct curltime speed_time[ CURL_SPEED_RECORDS ];
unsigned char speeder_c; unsigned char speeder_c;
BIT(hide); BIT(hide);
BIT(ul_size_known); BIT(ul_size_known);

View file

@ -80,10 +80,9 @@
#define QUIC_HANDSHAKE_TIMEOUT (10*NGTCP2_SECONDS) #define QUIC_HANDSHAKE_TIMEOUT (10*NGTCP2_SECONDS)
/* A stream window is the maximum amount we need to buffer for /* A stream window is the maximum amount we need to buffer for
* each active transfer. We use HTTP/3 flow control and only ACK * each active transfer.
* when we take things out of the buffer.
* Chunk size is large enough to take a full DATA frame */ * Chunk size is large enough to take a full DATA frame */
#define H3_STREAM_WINDOW_SIZE (128 * 1024) #define H3_STREAM_WINDOW_SIZE (64 * 1024)
#define H3_STREAM_CHUNK_SIZE (16 * 1024) #define H3_STREAM_CHUNK_SIZE (16 * 1024)
#if H3_STREAM_CHUNK_SIZE < NGTCP2_MAX_UDP_PAYLOAD_SIZE #if H3_STREAM_CHUNK_SIZE < NGTCP2_MAX_UDP_PAYLOAD_SIZE
#error H3_STREAM_CHUNK_SIZE smaller than NGTCP2_MAX_UDP_PAYLOAD_SIZE #error H3_STREAM_CHUNK_SIZE smaller than NGTCP2_MAX_UDP_PAYLOAD_SIZE
@ -242,6 +241,7 @@ struct h3_stream_ctx {
size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */ size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */
curl_uint64_t error3; /* HTTP/3 stream error code */ curl_uint64_t error3; /* HTTP/3 stream error code */
curl_off_t upload_left; /* number of request bytes left to upload */ curl_off_t upload_left; /* number of request bytes left to upload */
uint64_t download_unacked; /* bytes not acknowledged yet */
int status_code; /* HTTP status code */ int status_code; /* HTTP status code */
CURLcode xfer_result; /* result from xfer_resp_write(_hd) */ CURLcode xfer_result; /* result from xfer_resp_write(_hd) */
BIT(resp_hds_complete); /* we have a complete, final response */ BIT(resp_hds_complete); /* we have a complete, final response */
@ -472,7 +472,7 @@ static void quic_settings(struct cf_ngtcp2_ctx *ctx,
s->handshake_timeout = (data->set.connecttimeout > 0) ? s->handshake_timeout = (data->set.connecttimeout > 0) ?
data->set.connecttimeout * NGTCP2_MILLISECONDS : QUIC_HANDSHAKE_TIMEOUT; data->set.connecttimeout * NGTCP2_MILLISECONDS : QUIC_HANDSHAKE_TIMEOUT;
s->max_window = 100 * ctx->max_stream_window; s->max_window = 100 * ctx->max_stream_window;
s->max_stream_window = 10 * ctx->max_stream_window; s->max_stream_window = ctx->max_stream_window;
s->no_pmtud = FALSE; s->no_pmtud = FALSE;
#ifdef NGTCP2_SETTINGS_V3 #ifdef NGTCP2_SETTINGS_V3
/* try ten times the ngtcp2 defaults here for problems with Caddy */ /* try ten times the ngtcp2 defaults here for problems with Caddy */
@ -1057,6 +1057,35 @@ static void h3_xfer_write_resp(struct Curl_cfilter *cf,
} }
} }
static void cf_ngtcp2_ack_stream(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct h3_stream_ctx *stream)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct curltime now = curlx_now();
curl_off_t avail;
uint64_t ack_len = 0;
/* How many byte to ack on the stream? */
/* how much does rate limiting allow us to acknowledge? */
avail = Curl_rlimit_avail(&data->progress.dl.rlimit, now);
if(avail == CURL_OFF_T_MAX) { /* no rate limit, ack all */
ack_len = stream->download_unacked;
}
else if(avail > 0) {
ack_len = CURLMIN(stream->download_unacked, (uint64_t)avail);
}
if(ack_len) {
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] ACK %" PRIu64
"/%" PRIu64 " bytes of DATA", stream->id,
ack_len, stream->download_unacked);
ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->id, ack_len);
stream->download_unacked -= ack_len;
}
}
static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id, static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
const uint8_t *buf, size_t blen, const uint8_t *buf, size_t blen,
void *user_data, void *stream_user_data) void *user_data, void *stream_user_data)
@ -1073,13 +1102,15 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
return NGHTTP3_ERR_CALLBACK_FAILURE; return NGHTTP3_ERR_CALLBACK_FAILURE;
h3_xfer_write_resp(cf, data, stream, (const char *)buf, blen, FALSE); h3_xfer_write_resp(cf, data, stream, (const char *)buf, blen, FALSE);
if(blen) {
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] ACK %zu bytes of DATA",
stream->id, blen);
ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->id, blen);
ngtcp2_conn_extend_max_offset(ctx->qconn, blen);
}
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] DATA len=%zu", stream->id, blen); CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] DATA len=%zu", stream->id, blen);
ngtcp2_conn_extend_max_offset(ctx->qconn, blen);
if(UINT64_MAX - blen < stream->download_unacked)
stream->download_unacked = UINT64_MAX; /* unlikely */
else
stream->download_unacked += blen;
cf_ngtcp2_ack_stream(cf, data, stream);
return 0; return 0;
} }
@ -1374,6 +1405,8 @@ static CURLcode cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out; goto out;
} }
cf_ngtcp2_ack_stream(cf, data, stream);
if(cf_progress_ingress(cf, data, &pktx)) { if(cf_progress_ingress(cf, data, &pktx)) {
result = CURLE_RECV_ERROR; result = CURLE_RECV_ERROR;
goto out; goto out;

View file

@ -54,7 +54,6 @@
#include "../http.h" /* for HTTP proxy tunnel stuff */ #include "../http.h" /* for HTTP proxy tunnel stuff */
#include "ssh.h" #include "ssh.h"
#include "../url.h" #include "../url.h"
#include "../speedcheck.h"
#include "../vtls/vtls.h" #include "../vtls/vtls.h"
#include "../cfilters.h" #include "../cfilters.h"
#include "../connect.h" #include "../connect.h"
@ -2481,17 +2480,13 @@ static CURLcode myssh_block_statemach(struct Curl_easy *data,
while((sshc->state != SSH_STOP) && !result) { while((sshc->state != SSH_STOP) && !result) {
bool block; bool block;
timediff_t left_ms = 1000; timediff_t left_ms = 1000;
struct curltime now = curlx_now();
result = myssh_statemach_act(data, sshc, sshp, &block); result = myssh_statemach_act(data, sshc, sshp, &block);
if(result) if(result)
break; break;
if(!disconnect) { if(!disconnect) {
if(Curl_pgrsUpdate(data)) result = Curl_pgrsCheck(data);
return CURLE_ABORTED_BY_CALLBACK;
result = Curl_speedcheck(data, now);
if(result) if(result)
break; break;
@ -2746,10 +2741,7 @@ static CURLcode myssh_do_it(struct Curl_easy *data, bool *done)
sshc->secondCreateDirs = 0; /* reset the create directory attempt state sshc->secondCreateDirs = 0; /* reset the create directory attempt state
variable */ variable */
Curl_pgrsSetUploadCounter(data, 0); Curl_pgrsReset(data);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
if(conn->handler->protocol & CURLPROTO_SCP) if(conn->handler->protocol & CURLPROTO_SCP)
result = scp_perform(data, &connected, done); result = scp_perform(data, &connected, done);

View file

@ -53,7 +53,6 @@
#include "../http.h" /* for HTTP proxy tunnel stuff */ #include "../http.h" /* for HTTP proxy tunnel stuff */
#include "ssh.h" #include "ssh.h"
#include "../url.h" #include "../url.h"
#include "../speedcheck.h"
#include "../vtls/vtls.h" #include "../vtls/vtls.h"
#include "../cfilters.h" #include "../cfilters.h"
#include "../connect.h" #include "../connect.h"
@ -3135,10 +3134,7 @@ static CURLcode ssh_block_statemach(struct Curl_easy *data,
break; break;
if(!disconnect) { if(!disconnect) {
if(Curl_pgrsUpdate(data)) result = Curl_pgrsCheck(data);
return CURLE_ABORTED_BY_CALLBACK;
result = Curl_speedcheck(data, now);
if(result) if(result)
break; break;
@ -3534,10 +3530,7 @@ static CURLcode ssh_do(struct Curl_easy *data, bool *done)
sshc->secondCreateDirs = 0; /* reset the create directory attempt state sshc->secondCreateDirs = 0; /* reset the create directory attempt state
variable */ variable */
Curl_pgrsSetUploadCounter(data, 0); Curl_pgrsReset(data);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
if(conn->handler->protocol & CURLPROTO_SCP) if(conn->handler->protocol & CURLPROTO_SCP)
result = scp_perform(data, &connected, done); result = scp_perform(data, &connected, done);

View file

@ -1811,10 +1811,9 @@ schannel_recv_renegotiate(struct Curl_cfilter *cf, struct Curl_easy *data,
int what; int what;
timediff_t timeout_ms, remaining; timediff_t timeout_ms, remaining;
if(Curl_pgrsUpdate(data)) { result = Curl_pgrsUpdate(data);
result = CURLE_ABORTED_BY_CALLBACK; if(result)
break; break;
}
elapsed = curlx_timediff_ms(curlx_now(), rs->start_time); elapsed = curlx_timediff_ms(curlx_now(), rs->start_time);
if(elapsed >= MAX_RENEG_BLOCK_TIME) { if(elapsed >= MAX_RENEG_BLOCK_TIME) {

View file

@ -1927,9 +1927,9 @@ const struct Curl_handler Curl_handler_ws = {
ZERO_NULL, /* connecting */ ZERO_NULL, /* connecting */
ZERO_NULL, /* doing */ ZERO_NULL, /* doing */
ZERO_NULL, /* proto_pollset */ ZERO_NULL, /* proto_pollset */
Curl_http_do_pollset, /* doing_pollset */ Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */ ZERO_NULL, /* domore_pollset */
ZERO_NULL, /* perform_pollset */ Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */ ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */ Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */ Curl_http_write_resp_hd, /* write_resp_hd */
@ -1954,9 +1954,9 @@ const struct Curl_handler Curl_handler_wss = {
NULL, /* connecting */ NULL, /* connecting */
ZERO_NULL, /* doing */ ZERO_NULL, /* doing */
NULL, /* proto_pollset */ NULL, /* proto_pollset */
Curl_http_do_pollset, /* doing_pollset */ Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */ ZERO_NULL, /* domore_pollset */
ZERO_NULL, /* perform_pollset */ Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */ ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */ Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */ Curl_http_write_resp_hd, /* write_resp_hd */

View file

@ -280,7 +280,7 @@ test3032 test3033 test3034 test3035 \
test3100 test3101 test3102 test3103 test3104 test3105 \ test3100 test3101 test3102 test3103 test3104 test3105 \
\ \
test3200 test3201 test3202 test3203 test3204 test3205 test3206 test3207 test3208 \ test3200 test3201 test3202 test3203 test3204 test3205 test3206 test3207 test3208 \
test3209 test3210 test3211 test3212 test3213 test3214 test3215 \ test3209 test3210 test3211 test3212 test3213 test3214 test3215 test3216 \
test4000 test4001 test4000 test4001
EXTRA_DIST = $(TESTCASES) DISABLED data-xml1 EXTRA_DIST = $(TESTCASES) DISABLED data-xml1

19
tests/data/test3216 Normal file
View file

@ -0,0 +1,19 @@
<testcase>
<info>
<keywords>
unittest
ratelimit
</keywords>
</info>
#
# Client-side
<client>
<features>
unittest
</features>
<name>
ratelimit unit tests
</name>
</client>
</testcase>

View file

@ -27,11 +27,9 @@
import difflib import difflib
import filecmp import filecmp
import logging import logging
import math
import os import os
import re import re
import sys import sys
from datetime import timedelta
import pytest import pytest
from testenv import Env, CurlClient, LocalClient from testenv import Env, CurlClient, LocalClient
@ -424,15 +422,17 @@ class TestDownload:
count = 1 count = 1
url = f'https://{env.authority_for(env.domain1, proto)}/data-1m' url = f'https://{env.authority_for(env.domain1, proto)}/data-1m'
curl = CurlClient(env=env) curl = CurlClient(env=env)
speed_limit = 384 * 1024 speed_limit = 256 * 1024
min_duration = math.floor((1024 * 1024)/speed_limit)
r = curl.http_download(urls=[url], alpn_proto=proto, extra_args=[ r = curl.http_download(urls=[url], alpn_proto=proto, extra_args=[
'--limit-rate', f'{speed_limit}' '--limit-rate', f'{speed_limit}'
]) ])
r.check_response(count=count, http_status=200) r.check_response(count=count, http_status=200)
assert r.duration > timedelta(seconds=min_duration), \ dl_speed = r.stats[0]['speed_download']
f'rate limited transfer should take more than {min_duration}s, '\ # speed limit is only exact on long durations. Ideally this transfer
f'not {r.duration}' # would take 4 seconds, but it may end just after 3 because then
# we have downloaded the rest and will not wait for the rate
# limit to increase again.
assert dl_speed <= ((1024*1024)/3), f'{r.stats[0]}'
# make extreme parallel h2 upgrades, check invalid conn reuse # make extreme parallel h2 upgrades, check invalid conn reuse
# before protocol switch has happened # before protocol switch has happened

View file

@ -557,7 +557,7 @@ class TestUpload:
r.check_response(count=count, http_status=200) r.check_response(count=count, http_status=200)
assert r.responses[0]['header']['received-length'] == f'{up_len}', f'{r.responses[0]}' assert r.responses[0]['header']['received-length'] == f'{up_len}', f'{r.responses[0]}'
up_speed = r.stats[0]['speed_upload'] up_speed = r.stats[0]['speed_upload']
assert (speed_limit * 0.5) <= up_speed <= (speed_limit * 1.5), f'{r.stats[0]}' assert up_speed <= (speed_limit * 1.1), f'{r.stats[0]}'
# speed limited on echo handler # speed limited on echo handler
@pytest.mark.parametrize("proto", Env.http_protos()) @pytest.mark.parametrize("proto", Env.http_protos())
@ -573,7 +573,7 @@ class TestUpload:
]) ])
r.check_response(count=count, http_status=200) r.check_response(count=count, http_status=200)
up_speed = r.stats[0]['speed_upload'] up_speed = r.stats[0]['speed_upload']
assert (speed_limit * 0.5) <= up_speed <= (speed_limit * 1.5), f'{r.stats[0]}' assert up_speed <= (speed_limit * 1.1), f'{r.stats[0]}'
# upload larger data, triggering "Expect: 100-continue" code paths # upload larger data, triggering "Expect: 100-continue" code paths
@pytest.mark.parametrize("proto", ['http/1.1']) @pytest.mark.parametrize("proto", ['http/1.1'])

View file

@ -42,4 +42,4 @@ TESTS_C = \
unit1979.c unit1980.c \ unit1979.c unit1980.c \
unit2600.c unit2601.c unit2602.c unit2603.c unit2604.c unit2605.c \ unit2600.c unit2601.c unit2602.c unit2603.c unit2604.c unit2605.c \
unit3200.c unit3205.c \ unit3200.c unit3205.c \
unit3211.c unit3212.c unit3213.c unit3214.c unit3211.c unit3212.c unit3213.c unit3214.c unit3216.c

View file

@ -23,7 +23,7 @@
***************************************************************************/ ***************************************************************************/
#include "unitcheck.h" #include "unitcheck.h"
#include "speedcheck.h" #include "progress.h"
#include "urldata.h" #include "urldata.h"
static CURLcode t1606_setup(struct Curl_easy **easy) static CURLcode t1606_setup(struct Curl_easy **easy)
@ -58,12 +58,12 @@ static int runawhile(struct Curl_easy *easy,
curl_easy_setopt(easy, CURLOPT_LOW_SPEED_LIMIT, speed_limit); curl_easy_setopt(easy, CURLOPT_LOW_SPEED_LIMIT, speed_limit);
curl_easy_setopt(easy, CURLOPT_LOW_SPEED_TIME, time_limit); curl_easy_setopt(easy, CURLOPT_LOW_SPEED_TIME, time_limit);
Curl_speedinit(easy); Curl_pgrsReset(easy);
do { do {
/* fake the current transfer speed */ /* fake the current transfer speed */
easy->progress.current_speed = speed; easy->progress.current_speed = speed;
res = Curl_speedcheck(easy, now); res = pgrs_speedcheck(easy, &now);
if(res) if(res)
break; break;
/* step the time */ /* step the time */

103
tests/unit/unit3216.c Normal file
View file

@ -0,0 +1,103 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "unitcheck.h"
#include "ratelimit.h"
static CURLcode test_unit3216(const char *arg)
{
UNITTEST_BEGIN_SIMPLE
struct Curl_rlimit r;
struct curltime ts;
/* A ratelimit that is unlimited */
ts = curlx_now();
Curl_rlimit_init(&r, 0, 0, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == CURL_OFF_T_MAX, "inf");
Curl_rlimit_drain(&r, 1000000, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == CURL_OFF_T_MAX, "drain keep inf");
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 0, "inf never waits");
Curl_rlimit_block(&r, TRUE, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 0, "inf blocked to 0");
Curl_rlimit_drain(&r, 1000000, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 0, "blocked inf");
Curl_rlimit_block(&r, FALSE, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == CURL_OFF_T_MAX,
"unblocked unlimited");
/* A ratelimit that give 10 tokens per second */
ts = curlx_now();
Curl_rlimit_init(&r, 10, 0, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 10, "initial 10");
Curl_rlimit_drain(&r, 5, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 5, "drain to 5");
Curl_rlimit_drain(&r, 3, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 2, "drain to 2");
ts.tv_usec += 1000; /* 1ms */
Curl_rlimit_drain(&r, 3, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == -1, "drain to -1");
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 999, "wait 999ms");
ts.tv_usec += 1000; /* 1ms */
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 998, "wait 998ms");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 9, "10 inc per sec");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 19, "10 inc per sec(2)");
Curl_rlimit_block(&r, TRUE, curlx_now());
fail_unless(Curl_rlimit_avail(&r, curlx_now()) == 0, "10 blocked to 0");
Curl_rlimit_block(&r, FALSE, curlx_now());
fail_unless(Curl_rlimit_avail(&r, curlx_now()) == 10, "unblocked 10");
/* A ratelimit that give 10 tokens per second, max burst 15/s */
ts = curlx_now();
Curl_rlimit_init(&r, 10, 15, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 10, "initial 10");
Curl_rlimit_drain(&r, 5, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 5, "drain to 5");
Curl_rlimit_drain(&r, 3, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 2, "drain to 2");
Curl_rlimit_drain(&r, 3, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == -1, "drain to -1");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 9, "10 inc per sec");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 15, "10/15 burst limit");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 15, "10/15 burst limit(2)");
Curl_rlimit_drain(&r, 15, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 0, "drain to 0");
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 1000, "wait 1 sec");
ts.tv_usec += 500000; /* half a sec, cheating on second carry */
fail_unless(Curl_rlimit_avail(&r, ts) == 0, "0 after 0.5 sec");
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 500, "wait 0.5 sec");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 10, "10 after 1.5 sec");
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 0, "wait 0");
ts.tv_usec += 500000; /* half a sec, cheating on second carry */
fail_unless(Curl_rlimit_avail(&r, ts) == 15, "10 after 2 sec");
UNITTEST_END_SIMPLE
}