multi: improve wakeup and wait code

- Split WINSOCK and POSIX code in `multi_wait()` as the ifdef'ery
  was becoming unreadable
- define `ENABLE_WAKEUP` to mean the wakeup socketpair is enabled,
  no additional USE_WINSOCK check needed. Under WINSOCK
  `ENABLE_WAKEUP` is not defined, so it's availability is as before
  under the double defined() checks
- When the multi handle has "alive" transfers, the admin handle's
  pollset include the wakeup receive socket. This results in the
  admin handle running when someone uses `curl_multi_wakeup()`.
- Without any "alive" transfers, the wakeup socket is removed from
  the pollset. Otherwise, event based processing would never finish,
  eg. leave the event loop.
- The wakeup socket was never registered for event processing before,
  e.g. `curl_multi_wakeup()` never worked in that mode.
- Adjust test exepectations on socket callback invocations and
  number of sockets appearing in waitfds sets.

Closes #20832
This commit is contained in:
Stefan Eissing 2026-03-06 10:10:55 +01:00 committed by Daniel Stenberg
parent 447b32f13a
commit 9bc8b078eb
No known key found for this signature in database
GPG key ID: 5CC908FDB71E12C2
7 changed files with 328 additions and 240 deletions

View file

@ -294,7 +294,8 @@ struct Curl_multi *Curl_multi_handle(uint32_t xfer_table_size,
multi->wsa_event = WSACreateEvent(); multi->wsa_event = WSACreateEvent();
if(multi->wsa_event == WSA_INVALID_EVENT) if(multi->wsa_event == WSA_INVALID_EVENT)
goto error; goto error;
#elif defined(ENABLE_WAKEUP) #endif
#ifdef ENABLE_WAKEUP
if(Curl_wakeup_init(multi->wakeup_pair, TRUE) < 0) { if(Curl_wakeup_init(multi->wakeup_pair, TRUE) < 0) {
multi->wakeup_pair[0] = CURL_SOCKET_BAD; multi->wakeup_pair[0] = CURL_SOCKET_BAD;
multi->wakeup_pair[1] = CURL_SOCKET_BAD; multi->wakeup_pair[1] = CURL_SOCKET_BAD;
@ -319,6 +320,7 @@ error:
Curl_ssl_scache_destroy(multi->ssl_scache); Curl_ssl_scache_destroy(multi->ssl_scache);
#endif #endif
if(multi->admin) { if(multi->admin) {
Curl_multi_ev_xfer_done(multi, multi->admin);
multi->admin->multi = NULL; multi->admin->multi = NULL;
Curl_close(&multi->admin); Curl_close(&multi->admin);
} }
@ -362,6 +364,17 @@ bool Curl_is_connecting(struct Curl_easy *data)
return data->mstate < MSTATE_DO; return data->mstate < MSTATE_DO;
} }
static CURLMcode multi_assess_wakeup(struct Curl_multi *multi)
{
#ifdef ENABLE_WAKEUP
if(multi->socket_cb)
return Curl_multi_ev_assess_xfer(multi, multi->admin);
#else
(void)multi;
#endif
return CURLM_OK;
}
static CURLMcode multi_xfers_add(struct Curl_multi *multi, static CURLMcode multi_xfers_add(struct Curl_multi *multi,
struct Curl_easy *data) struct Curl_easy *data)
{ {
@ -528,6 +541,12 @@ CURLMcode curl_multi_add_handle(CURLM *m, CURL *d)
data->set.server_response_timeout; data->set.server_response_timeout;
multi->admin->set.no_signal = data->set.no_signal; multi->admin->set.no_signal = data->set.no_signal;
mresult = multi_assess_wakeup(multi);
if(mresult) {
failf(data, "error enabling wakeup listening: %d", mresult);
return mresult;
}
CURL_TRC_M(data, "added to multi, mid=%u, running=%u, total=%u", CURL_TRC_M(data, "added to multi, mid=%u, running=%u, total=%u",
data->mid, Curl_multi_xfers_running(multi), data->mid, Curl_multi_xfers_running(multi),
Curl_uint32_tbl_count(&multi->xfers)); Curl_uint32_tbl_count(&multi->xfers));
@ -656,9 +675,6 @@ static CURLcode multi_done(struct Curl_easy *data,
{ {
CURLcode result; CURLcode result;
struct connectdata *conn = data->conn; struct connectdata *conn = data->conn;
struct multi_done_ctx mdctx;
memset(&mdctx, 0, sizeof(mdctx));
CURL_TRC_M(data, "multi_done: status: %d prem: %d done: %d", CURL_TRC_M(data, "multi_done: status: %d prem: %d done: %d",
(int)status, (int)premature, data->state.done); (int)status, (int)premature, data->state.done);
@ -689,7 +705,7 @@ static CURLcode multi_done(struct Curl_easy *data,
} }
/* this calls the protocol-specific function pointer previously set */ /* this calls the protocol-specific function pointer previously set */
if(conn->scheme->run->done && (data->mstate >= MSTATE_PROTOCONNECT)) if(conn && conn->scheme->run->done && (data->mstate >= MSTATE_PROTOCONNECT))
result = conn->scheme->run->done(data, status, premature); result = conn->scheme->run->done(data, status, premature);
else else
result = status; result = status;
@ -706,6 +722,7 @@ static CURLcode multi_done(struct Curl_easy *data,
result = Curl_1st_fatal(result, Curl_xfer_write_done(data, premature)); result = Curl_1st_fatal(result, Curl_xfer_write_done(data, premature));
/* Inform connection filters that this transfer is done */ /* Inform connection filters that this transfer is done */
if(conn)
Curl_conn_ev_data_done(data, premature); Curl_conn_ev_data_done(data, premature);
process_pending_handles(data->multi); /* connection / multiplex */ process_pending_handles(data->multi); /* connection / multiplex */
@ -713,10 +730,15 @@ static CURLcode multi_done(struct Curl_easy *data,
if(!result) if(!result)
result = Curl_req_done(&data->req, data, premature); result = Curl_req_done(&data->req, data, premature);
if(conn) {
/* Under the potential connection pool's share lock, decide what to /* Under the potential connection pool's share lock, decide what to
* do with the transfer's connection. */ * do with the transfer's connection. */
struct multi_done_ctx mdctx;
memset(&mdctx, 0, sizeof(mdctx));
mdctx.premature = premature; mdctx.premature = premature;
Curl_cpool_do_locked(data, data->conn, multi_done_locked, &mdctx); Curl_cpool_do_locked(data, data->conn, multi_done_locked, &mdctx);
}
/* flush the netrc cache */ /* flush the netrc cache */
Curl_netrc_cleanup(&data->state.netrc); Curl_netrc_cleanup(&data->state.netrc);
@ -871,6 +893,12 @@ CURLMcode curl_multi_remove_handle(CURLM *m, CURL *d)
if(mresult) if(mresult)
return mresult; return mresult;
mresult = multi_assess_wakeup(multi);
if(mresult) {
failf(data, "error enabling wakeup listening: %d", mresult);
return mresult;
}
CURL_TRC_M(data, "removed from multi, mid=%u, running=%u, total=%u", CURL_TRC_M(data, "removed from multi, mid=%u, running=%u, total=%u",
mid, Curl_multi_xfers_running(multi), mid, Curl_multi_xfers_running(multi),
Curl_uint32_tbl_count(&multi->xfers)); Curl_uint32_tbl_count(&multi->xfers));
@ -1086,16 +1114,22 @@ static CURLcode mstate_perform_pollset(struct Curl_easy *data,
CURLMcode Curl_multi_pollset(struct Curl_easy *data, CURLMcode Curl_multi_pollset(struct Curl_easy *data,
struct easy_pollset *ps) struct easy_pollset *ps)
{ {
CURLMcode mresult = CURLM_OK;
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
Curl_pollset_reset(ps);
#ifdef ENABLE_WAKEUP
/* The admin handle always listens on the wakeup socket when there
* are transfers alive. */
if(data->multi && (data == data->multi->admin) &&
data->multi->xfers_alive &&
(data->multi->wakeup_pair[0] != CURL_SOCKET_BAD)) {
result = Curl_pollset_add_in(data, ps, data->multi->wakeup_pair[0]);
}
#endif
/* If the transfer has no connection, this is fine. Happens when /* If the transfer has no connection, this is fine. Happens when
called via curl_multi_remove_handle() => Curl_multi_ev_assess() => called via curl_multi_remove_handle() => Curl_multi_ev_assess() =>
Curl_multi_pollset(). */ Curl_multi_pollset(). */
Curl_pollset_reset(ps); if(!result && data->conn) {
if(!data->conn)
return CURLM_OK;
switch(data->mstate) { switch(data->mstate) {
case MSTATE_INIT: case MSTATE_INIT:
case MSTATE_PENDING: case MSTATE_PENDING:
@ -1146,15 +1180,13 @@ CURLMcode Curl_multi_pollset(struct Curl_easy *data,
DEBUGASSERT(0); DEBUGASSERT(0);
break; break;
} }
}
if(result) { if(result) {
if(result == CURLE_OUT_OF_MEMORY) if(result == CURLE_OUT_OF_MEMORY)
mresult = CURLM_OUT_OF_MEMORY; return CURLM_OUT_OF_MEMORY;
else {
failf(data, "error determining pollset: %d", result); failf(data, "error determining pollset: %d", result);
mresult = CURLM_INTERNAL_ERROR; return CURLM_INTERNAL_ERROR;
}
goto out;
} }
#ifdef CURLVERBOSE #ifdef CURLVERBOSE
@ -1193,8 +1225,7 @@ CURLMcode Curl_multi_pollset(struct Curl_easy *data,
} }
#endif #endif
out: return CURLM_OK;
return mresult;
} }
CURLMcode curl_multi_fdset(CURLM *m, CURLMcode curl_multi_fdset(CURLM *m,
@ -1313,7 +1344,176 @@ static void reset_socket_fdwrite(curl_socket_t s)
if(!getsockopt(s, SOL_SOCKET, SO_TYPE, (char *)&t, &l) && t == SOCK_STREAM) if(!getsockopt(s, SOL_SOCKET, SO_TYPE, (char *)&t, &l) && t == SOCK_STREAM)
swrite(s, NULL, 0); swrite(s, NULL, 0);
} }
#endif
static CURLMcode multi_winsock_select(struct Curl_multi *multi,
struct curl_pollfds *cpfds,
unsigned int curl_nfds,
struct curl_waitfd extra_fds[],
unsigned int extra_nfds,
int timeout_ms,
bool wait_on_nop,
int *pnevents)
{
CURLMcode mresult = CURLM_OK;
WSANETWORKEVENTS wsa_events;
int nevents = 0;
size_t i;
DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT);
/* Set the WSA events based on the collected pollds */
for(i = 0; i < cpfds->n; i++) {
long mask = 0;
if(cpfds->pfds[i].events & POLLIN)
mask |= FD_READ | FD_ACCEPT | FD_CLOSE;
if(cpfds->pfds[i].events & POLLPRI)
mask |= FD_OOB;
if(cpfds->pfds[i].events & POLLOUT) {
mask |= FD_WRITE | FD_CONNECT | FD_CLOSE;
reset_socket_fdwrite(cpfds->pfds[i].fd);
}
if(mask) {
if(WSAEventSelect(cpfds->pfds[i].fd, multi->wsa_event, mask) != 0) {
mresult = CURLM_OUT_OF_MEMORY;
goto out;
}
}
}
if(cpfds->n || wait_on_nop) {
int pollrc = 0;
if(cpfds->n) { /* pre-check with Winsock */
pollrc = Curl_poll(cpfds->pfds, cpfds->n, 0);
if(pollrc < 0) {
mresult = CURLM_UNRECOVERABLE_POLL;
goto out;
}
nevents = pollrc;
}
if(!nevents) {
/* now wait... if not ready during the pre-check (pollrc == 0) */
WSAWaitForMultipleEvents(1, &multi->wsa_event, FALSE, (DWORD)timeout_ms,
FALSE);
}
/* With Winsock, we have to run the following section unconditionally
to call WSAEventSelect(fd, event, 0) on all the sockets */
/* copy revents results from the poll to the curl_multi_wait poll
struct, the bit values of the actual underlying poll() implementation
may not be the same as the ones in the public libcurl API! */
for(i = 0; i < extra_nfds; i++) {
unsigned short mask = 0;
curl_socket_t s = extra_fds[i].fd;
wsa_events.lNetworkEvents = 0;
if(WSAEnumNetworkEvents(s, NULL, &wsa_events) == 0) {
if(wsa_events.lNetworkEvents & (FD_READ | FD_ACCEPT | FD_CLOSE))
mask |= CURL_WAIT_POLLIN;
if(wsa_events.lNetworkEvents & (FD_WRITE | FD_CONNECT | FD_CLOSE))
mask |= CURL_WAIT_POLLOUT;
if(wsa_events.lNetworkEvents & FD_OOB)
mask |= CURL_WAIT_POLLPRI;
if(!pollrc && wsa_events.lNetworkEvents)
nevents++;
}
WSAEventSelect(s, multi->wsa_event, 0);
if(!pollrc) {
extra_fds[i].revents = (short)mask;
continue;
}
else {
unsigned r = (unsigned)cpfds->pfds[curl_nfds + i].revents;
if(r & POLLIN)
mask |= CURL_WAIT_POLLIN;
if(r & POLLOUT)
mask |= CURL_WAIT_POLLOUT;
if(r & POLLPRI)
mask |= CURL_WAIT_POLLPRI;
extra_fds[i].revents = (short)mask;
}
}
/* Count up all our own sockets that had activity,
and remove them from the event. */
for(i = 0; i < curl_nfds; ++i) {
wsa_events.lNetworkEvents = 0;
if(WSAEnumNetworkEvents(cpfds->pfds[i].fd, NULL, &wsa_events) == 0) {
if(!pollrc && wsa_events.lNetworkEvents)
nevents++;
}
WSAEventSelect(cpfds->pfds[i].fd, multi->wsa_event, 0);
}
WSAResetEvent(multi->wsa_event);
}
out:
*pnevents = nevents;
return mresult;
}
#else /* USE_WINSOCK */
static CURLMcode multi_posix_poll(struct Curl_multi *multi,
struct curl_pollfds *cpfds,
unsigned int curl_nfds,
struct curl_waitfd extra_fds[],
unsigned int extra_nfds,
int timeout_ms,
bool wait_on_nop,
int *pnevents)
{
CURLMcode mresult = CURLM_OK;
int nevents = 0;
size_t i;
if(cpfds->n) {
int pollrc = Curl_poll(cpfds->pfds, cpfds->n, timeout_ms); /* wait... */
if(pollrc < 0) {
mresult = CURLM_UNRECOVERABLE_POLL;
goto out;
}
nevents = pollrc;
/* copy revents results from the poll to the curl_multi_wait poll
struct, the bit values of the actual underlying poll() implementation
may not be the same as the ones in the public libcurl API! */
for(i = 0; i < extra_nfds; i++) {
unsigned r = (unsigned)cpfds->pfds[curl_nfds + i].revents;
unsigned short mask = 0;
if(r & POLLIN)
mask |= CURL_WAIT_POLLIN;
if(r & POLLOUT)
mask |= CURL_WAIT_POLLOUT;
if(r & POLLPRI)
mask |= CURL_WAIT_POLLPRI;
extra_fds[i].revents = (short)mask;
}
}
else if(wait_on_nop) {
struct curltime expire_time;
long sleep_ms = 0;
/* Avoid busy-looping when there is nothing particular to wait for */
multi_timeout(multi, &expire_time, &sleep_ms);
if(sleep_ms) {
if(sleep_ms > timeout_ms)
sleep_ms = timeout_ms;
/* when there are no easy handles in the multi, this holds a -1
timeout */
else if(sleep_ms < 0)
sleep_ms = timeout_ms;
curlx_wait_ms(sleep_ms);
}
}
out:
*pnevents = nevents;
return mresult;
}
#endif /* !USE_WINSOCK */
#define NUM_POLLS_ON_STACK 10 #define NUM_POLLS_ON_STACK 10
@ -1322,13 +1522,13 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
unsigned int extra_nfds, unsigned int extra_nfds,
int timeout_ms, int timeout_ms,
int *ret, int *ret,
bool extrawait, /* when no socket, wait */ bool wait_on_nop) /* spend time, even if there
bool use_wakeup) * is nothing to monitor */
{ {
size_t i; size_t i;
struct curltime expire_time; struct curltime expire_time;
long timeout_internal; long timeout_internal;
int retcode = 0; int nevents = 0;
struct easy_pollset ps; struct easy_pollset ps;
struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK]; struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
struct curl_pollfds cpfds; struct curl_pollfds cpfds;
@ -1336,13 +1536,8 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
struct Curl_easy *data = NULL; struct Curl_easy *data = NULL;
CURLMcode mresult = CURLM_OK; CURLMcode mresult = CURLM_OK;
uint32_t mid; uint32_t mid;
#ifdef ENABLE_WAKEUP
#ifdef USE_WINSOCK int wakeup_idx = -1;
WSANETWORKEVENTS wsa_events;
DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT);
#endif
#ifndef ENABLE_WAKEUP
(void)use_wakeup;
#endif #endif
if(!GOOD_MULTI_HANDLE(multi)) if(!GOOD_MULTI_HANDLE(multi))
@ -1380,6 +1575,16 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
goto out; goto out;
} }
#ifdef ENABLE_WAKEUP
if(wait_on_nop && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
wakeup_idx = cpfds.n;
if(Curl_pollfds_add_sock(&cpfds, multi->wakeup_pair[0], POLLIN)) {
mresult = CURLM_OUT_OF_MEMORY;
goto out;
}
}
#endif
curl_nfds = cpfds.n; /* what curl internally uses in cpfds */ curl_nfds = cpfds.n; /* what curl internally uses in cpfds */
/* Add external file descriptions from poll-like struct curl_waitfd */ /* Add external file descriptions from poll-like struct curl_waitfd */
for(i = 0; i < extra_nfds; i++) { for(i = 0; i < extra_nfds; i++) {
@ -1396,38 +1601,6 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
} }
} }
#ifdef USE_WINSOCK
/* Set the WSA events based on the collected pollds */
for(i = 0; i < cpfds.n; i++) {
long mask = 0;
if(cpfds.pfds[i].events & POLLIN)
mask |= FD_READ | FD_ACCEPT | FD_CLOSE;
if(cpfds.pfds[i].events & POLLPRI)
mask |= FD_OOB;
if(cpfds.pfds[i].events & POLLOUT) {
mask |= FD_WRITE | FD_CONNECT | FD_CLOSE;
reset_socket_fdwrite(cpfds.pfds[i].fd);
}
if(mask) {
if(WSAEventSelect(cpfds.pfds[i].fd, multi->wsa_event, mask) != 0) {
mresult = CURLM_OUT_OF_MEMORY;
goto out;
}
}
}
#endif
#ifdef ENABLE_WAKEUP
#ifndef USE_WINSOCK
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(Curl_pollfds_add_sock(&cpfds, multi->wakeup_pair[0], POLLIN)) {
mresult = CURLM_OUT_OF_MEMORY;
goto out;
}
}
#endif
#endif
/* We check the internal timeout *AFTER* we collected all sockets to /* We check the internal timeout *AFTER* we collected all sockets to
* poll. Collecting the sockets may install new timers by protocols * poll. Collecting the sockets may install new timers by protocols
* and connection filters. * and connection filters.
@ -1439,122 +1612,32 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
if(data) if(data)
CURL_TRC_M(data, "multi_wait(fds=%d, timeout=%d) tinternal=%ld", CURL_TRC_M(data, "multi_wait(fds=%d, timeout=%d) tinternal=%ld",
cpfds.n, timeout_ms, timeout_internal); cpfds.n, timeout_ms, timeout_internal);
#if defined(ENABLE_WAKEUP) && defined(USE_WINSOCK)
if(cpfds.n || use_wakeup) {
#else
if(cpfds.n) {
#endif
int pollrc;
#ifdef USE_WINSOCK
if(cpfds.n) /* pre-check with Winsock */
pollrc = Curl_poll(cpfds.pfds, cpfds.n, 0);
else
pollrc = 0;
#else
pollrc = Curl_poll(cpfds.pfds, cpfds.n, timeout_ms); /* wait... */
#endif
if(pollrc < 0) {
mresult = CURLM_UNRECOVERABLE_POLL;
goto out;
}
if(pollrc > 0) {
retcode = pollrc;
#ifdef USE_WINSOCK
}
else { /* now wait... if not ready during the pre-check (pollrc == 0) */
WSAWaitForMultipleEvents(1, &multi->wsa_event, FALSE, (DWORD)timeout_ms,
FALSE);
}
/* With Winsock, we have to run the following section unconditionally
to call WSAEventSelect(fd, event, 0) on all the sockets */
{
#endif
/* copy revents results from the poll to the curl_multi_wait poll
struct, the bit values of the actual underlying poll() implementation
may not be the same as the ones in the public libcurl API! */
for(i = 0; i < extra_nfds; i++) {
unsigned r = (unsigned)cpfds.pfds[curl_nfds + i].revents;
unsigned short mask = 0;
#ifdef USE_WINSOCK
curl_socket_t s = extra_fds[i].fd;
wsa_events.lNetworkEvents = 0;
if(WSAEnumNetworkEvents(s, NULL, &wsa_events) == 0) {
if(wsa_events.lNetworkEvents & (FD_READ | FD_ACCEPT | FD_CLOSE))
mask |= CURL_WAIT_POLLIN;
if(wsa_events.lNetworkEvents & (FD_WRITE | FD_CONNECT | FD_CLOSE))
mask |= CURL_WAIT_POLLOUT;
if(wsa_events.lNetworkEvents & FD_OOB)
mask |= CURL_WAIT_POLLPRI;
if(ret && !pollrc && wsa_events.lNetworkEvents)
retcode++;
}
WSAEventSelect(s, multi->wsa_event, 0);
if(!pollrc) {
extra_fds[i].revents = (short)mask;
continue;
}
#endif
if(r & POLLIN)
mask |= CURL_WAIT_POLLIN;
if(r & POLLOUT)
mask |= CURL_WAIT_POLLOUT;
if(r & POLLPRI)
mask |= CURL_WAIT_POLLPRI;
extra_fds[i].revents = (short)mask;
}
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
/* Count up all our own sockets that had activity, mresult = multi_winsock_select(multi, &cpfds, curl_nfds,
and remove them from the event. */ extra_fds, extra_nfds,
for(i = 0; i < curl_nfds; ++i) { timeout_ms, wait_on_nop, &nevents);
wsa_events.lNetworkEvents = 0;
if(WSAEnumNetworkEvents(cpfds.pfds[i].fd, NULL, &wsa_events) == 0) {
if(ret && !pollrc && wsa_events.lNetworkEvents)
retcode++;
}
WSAEventSelect(cpfds.pfds[i].fd, multi->wsa_event, 0);
}
WSAResetEvent(multi->wsa_event);
#else #else
mresult = multi_posix_poll(multi, &cpfds, curl_nfds,
extra_fds, extra_nfds,
timeout_ms, wait_on_nop, &nevents);
#endif
#ifdef ENABLE_WAKEUP #ifdef ENABLE_WAKEUP
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { if(nevents && (wakeup_idx >= 0)) {
if(cpfds.pfds[curl_nfds + extra_nfds].revents & POLLIN) { if(cpfds.pfds[wakeup_idx].revents & POLLIN) {
(void)Curl_wakeup_consume(multi->wakeup_pair, TRUE); (void)Curl_wakeup_consume(multi->wakeup_pair, TRUE);
/* do not count the wakeup socket into the returned value */ /* do not count the wakeup socket into the returned value */
retcode--; nevents--;
} }
} }
#endif #endif
#endif
}
}
if(ret)
*ret = retcode;
#if defined(ENABLE_WAKEUP) && defined(USE_WINSOCK)
if(extrawait && !cpfds.n && !use_wakeup) {
#else
if(extrawait && !cpfds.n) {
#endif
long sleep_ms = 0;
/* Avoid busy-looping when there is nothing particular to wait for */
multi_timeout(multi, &expire_time, &sleep_ms);
if(sleep_ms) {
if(sleep_ms > timeout_ms)
sleep_ms = timeout_ms;
/* when there are no easy handles in the multi, this holds a -1
timeout */
else if(sleep_ms < 0)
sleep_ms = timeout_ms;
curlx_wait_ms(sleep_ms);
}
}
out: out:
Curl_pollset_cleanup(&ps); Curl_pollset_cleanup(&ps);
Curl_pollfds_cleanup(&cpfds); Curl_pollfds_cleanup(&cpfds);
if(ret)
*ret = nevents;
return mresult; return mresult;
} }
@ -1564,8 +1647,7 @@ CURLMcode curl_multi_wait(CURLM *multi,
int timeout_ms, int timeout_ms,
int *ret) int *ret)
{ {
return multi_wait(multi, extra_fds, extra_nfds, timeout_ms, ret, FALSE, return multi_wait(multi, extra_fds, extra_nfds, timeout_ms, ret, FALSE);
FALSE);
} }
CURLMcode curl_multi_poll(CURLM *multi, CURLMcode curl_multi_poll(CURLM *multi,
@ -1574,7 +1656,7 @@ CURLMcode curl_multi_poll(CURLM *multi,
int timeout_ms, int timeout_ms,
int *ret) int *ret)
{ {
return multi_wait(multi, extra_fds, extra_nfds, timeout_ms, ret, TRUE, TRUE); return multi_wait(multi, extra_fds, extra_nfds, timeout_ms, ret, TRUE);
} }
CURLMcode curl_multi_wakeup(CURLM *m) CURLMcode curl_multi_wakeup(CURLM *m)
@ -1588,11 +1670,11 @@ CURLMcode curl_multi_wakeup(CURLM *m)
if(!GOOD_MULTI_HANDLE(multi)) if(!GOOD_MULTI_HANDLE(multi))
return CURLM_BAD_HANDLE; return CURLM_BAD_HANDLE;
#ifdef ENABLE_WAKEUP
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
if(WSASetEvent(multi->wsa_event)) if(WSASetEvent(multi->wsa_event))
return CURLM_OK; return CURLM_OK;
#else #endif
#ifdef ENABLE_WAKEUP
/* the wakeup_pair variable is only written during init and cleanup, /* the wakeup_pair variable is only written during init and cleanup,
making it safe to access from another thread after the init part making it safe to access from another thread after the init part
and before cleanup */ and before cleanup */
@ -1601,7 +1683,6 @@ CURLMcode curl_multi_wakeup(CURLM *m)
return CURLM_WAKEUP_FAILURE; return CURLM_WAKEUP_FAILURE;
return CURLM_OK; return CURLM_OK;
} }
#endif
#endif #endif
return CURLM_WAKEUP_FAILURE; return CURLM_WAKEUP_FAILURE;
} }
@ -2408,6 +2489,8 @@ static void handle_completed(struct Curl_multi *multi,
Curl_uint32_bset_remove(&multi->pending, data->mid); Curl_uint32_bset_remove(&multi->pending, data->mid);
Curl_uint32_bset_add(&multi->msgsent, data->mid); Curl_uint32_bset_add(&multi->msgsent, data->mid);
--multi->xfers_alive; --multi->xfers_alive;
if(!multi->xfers_alive)
multi_assess_wakeup(multi);
} }
static CURLMcode multi_runsingle(struct Curl_multi *multi, static CURLMcode multi_runsingle(struct Curl_multi *multi,
@ -2890,10 +2973,9 @@ CURLMcode curl_multi_cleanup(CURLM *m)
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
WSACloseEvent(multi->wsa_event); WSACloseEvent(multi->wsa_event);
#else #endif
#ifdef ENABLE_WAKEUP #ifdef ENABLE_WAKEUP
Curl_wakeup_destroy(multi->wakeup_pair); Curl_wakeup_destroy(multi->wakeup_pair);
#endif
#endif #endif
multi_xfer_bufs_free(multi); multi_xfer_bufs_free(multi);
@ -3132,6 +3214,14 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
handles the case when the application asks libcurl to run the timeout handles the case when the application asks libcurl to run the timeout
prematurely. */ prematurely. */
memset(&multi->last_expire_ts, 0, sizeof(multi->last_expire_ts)); memset(&multi->last_expire_ts, 0, sizeof(multi->last_expire_ts));
/* Applications may set `socket_cb` *after* having added transfers
* first. *Then* kick off processing with a
* curl_multi_socket_action(TIMEOUT) afterwards. Make sure our
* admin handle registers its pollset with the callbacks present. */
mresult = multi_assess_wakeup(multi);
if(mresult)
goto out;
} }
multi_mark_expired_as_dirty(multi, multi_now(multi)); multi_mark_expired_as_dirty(multi, multi_now(multi));

View file

@ -603,10 +603,8 @@ void Curl_multi_ev_xfer_done(struct Curl_multi *multi,
struct Curl_easy *data) struct Curl_easy *data)
{ {
DEBUGASSERT(!data->conn); /* transfer should have been detached */ DEBUGASSERT(!data->conn); /* transfer should have been detached */
if(data != multi->admin) {
(void)mev_assess(multi, data, NULL); (void)mev_assess(multi, data, NULL);
Curl_meta_remove(data, CURL_META_MEV_POLLSET); Curl_meta_remove(data, CURL_META_MEV_POLLSET);
}
} }
void Curl_multi_ev_conn_done(struct Curl_multi *multi, void Curl_multi_ev_conn_done(struct Curl_multi *multi,

View file

@ -71,7 +71,7 @@ typedef enum {
#define CURLPIPE_ANY (CURLPIPE_MULTIPLEX) #define CURLPIPE_ANY (CURLPIPE_MULTIPLEX)
#ifndef CURL_DISABLE_SOCKETPAIR #if !defined(CURL_DISABLE_SOCKETPAIR) && !defined(USE_WINSOCK)
#define ENABLE_WAKEUP #define ENABLE_WAKEUP
#endif #endif
@ -160,12 +160,11 @@ struct Curl_multi {
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
WSAEVENT wsa_event; /* Winsock event used for waits */ WSAEVENT wsa_event; /* Winsock event used for waits */
#else #endif
#ifdef ENABLE_WAKEUP #ifdef ENABLE_WAKEUP
curl_socket_t wakeup_pair[2]; /* eventfd()/pipe()/socketpair() used for curl_socket_t wakeup_pair[2]; /* eventfd()/pipe()/socketpair() used for
wakeup 0 is used for read, 1 is used wakeup 0 is used for read, 1 is used
for write */ for write */
#endif
#endif #endif
unsigned int max_concurrent_streams; unsigned int max_concurrent_streams;
unsigned int maxconnects; /* if >0, a fixed limit of the maximum number of unsigned int maxconnects; /* if >0, a fixed limit of the maximum number of

View file

@ -160,7 +160,7 @@ class TestShutdown:
# check that all connection sockets were removed from event # check that all connection sockets were removed from event
removes = [line for line in r.trace_lines removes = [line for line in r.trace_lines
if re.match(r'.*socket cb: socket \d+ REMOVED', line)] if re.match(r'.*socket cb: socket \d+ REMOVED', line)]
assert len(removes) == count, f'{removes}' assert len(removes) >= count, f'{removes}'
# check graceful shutdown on multiplexed http # check graceful shutdown on multiplexed http
@pytest.mark.parametrize("proto", Env.http_mplx_protos()) @pytest.mark.parametrize("proto", Env.http_mplx_protos())

View file

@ -331,8 +331,8 @@ static CURLcode empty_multi_test(void)
goto test_cleanup; goto test_cleanup;
} }
else if(fd_count > 0) { else if(fd_count > 0) {
curl_mfprintf(stderr, "curl_multi_waitfds() returned non-zero count of " curl_mfprintf(stderr, "curl_multi_waitfds(), empty, returned non-zero "
"waitfds: %d.\n", fd_count); "count of waitfds: %d.\n", fd_count);
result = TEST_ERR_FAILURE; result = TEST_ERR_FAILURE;
goto test_cleanup; goto test_cleanup;
} }
@ -352,8 +352,8 @@ static CURLcode empty_multi_test(void)
result = TEST_ERR_FAILURE; result = TEST_ERR_FAILURE;
goto test_cleanup; goto test_cleanup;
} }
else if(fd_count > 0) { else if(fd_count > 1) {
curl_mfprintf(stderr, "curl_multi_waitfds() returned non-zero count of " curl_mfprintf(stderr, "curl_multi_waitfds() returned > 1 count of "
"waitfds: %d.\n", fd_count); "waitfds: %d.\n", fd_count);
result = TEST_ERR_FAILURE; result = TEST_ERR_FAILURE;
goto test_cleanup; goto test_cleanup;
@ -380,16 +380,16 @@ static CURLcode test_lib2405(const char *URL)
goto test_cleanup; goto test_cleanup;
if(testnum == 2405) { if(testnum == 2405) {
/* HTTP1, expected 2 waitfds - one for each transfer */ /* HTTP1, expected 3 waitfds - one for each transfer + wakeup */
test_run_check(TEST_USE_HTTP1, 2); test_run_check(TEST_USE_HTTP1, 3);
} }
#ifdef USE_HTTP2 #ifdef USE_HTTP2
else { /* 2407 */ else { /* 2407 */
/* HTTP2, expected 2 waitfds - one for each transfer */ /* HTTP2, expected 3 waitfds - one for each transfer + wakeup */
test_run_check(TEST_USE_HTTP2, 2); test_run_check(TEST_USE_HTTP2, 3);
/* HTTP2 with multiplexing, expected 1 waitfds - one for all transfers */ /* HTTP2 with multiplexing, expected 2 waitfds - transfers + wakeup */
test_run_check(TEST_USE_HTTP2_MPLEX, 1); test_run_check(TEST_USE_HTTP2_MPLEX, 2);
} }
#endif #endif

View file

@ -404,11 +404,11 @@ static CURLcode test_lib530(const char *URL)
if(!result) if(!result)
curl_mfprintf(stderr, "%s FAILED: %d\n", t530_tag(), result); curl_mfprintf(stderr, "%s FAILED: %d\n", t530_tag(), result);
result = testone(URL, 0, 1); /* fail 1st call to socket callback */ result = testone(URL, 0, 2); /* fail 2nd call to socket callback */
if(!result) if(!result)
curl_mfprintf(stderr, "%s FAILED: %d\n", t530_tag(), result); curl_mfprintf(stderr, "%s FAILED: %d\n", t530_tag(), result);
result = testone(URL, 0, 2); /* fail 2nd call to socket callback */ result = testone(URL, 0, 3); /* fail 3rd call to socket callback */
if(!result) if(!result)
curl_mfprintf(stderr, "%s FAILED: %d\n", t530_tag(), result); curl_mfprintf(stderr, "%s FAILED: %d\n", t530_tag(), result);

View file

@ -386,7 +386,8 @@ static CURLcode t758_one(const char *URL, int timer_fail_at,
if(t758_ctx.fake_async_cert_verification_pending && if(t758_ctx.fake_async_cert_verification_pending &&
!t758_ctx.fake_async_cert_verification_finished) { !t758_ctx.fake_async_cert_verification_finished) {
if(sockets.read.count || sockets.write.count) { /* the wakeup socket will be monitored */
if((sockets.read.count > 1) || sockets.write.count) {
t758_msg("during verification there should be no sockets scheduled"); t758_msg("during verification there should be no sockets scheduled");
result = TEST_ERR_MAJOR_BAD; result = TEST_ERR_MAJOR_BAD;
goto test_cleanup; goto test_cleanup;