diff --git a/lib/multi.c b/lib/multi.c index 685bb01f0c..e72d3e3d4a 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -294,7 +294,8 @@ struct Curl_multi *Curl_multi_handle(uint32_t xfer_table_size, multi->wsa_event = WSACreateEvent(); if(multi->wsa_event == WSA_INVALID_EVENT) goto error; -#elif defined(ENABLE_WAKEUP) +#endif +#ifdef ENABLE_WAKEUP if(Curl_wakeup_init(multi->wakeup_pair, TRUE) < 0) { multi->wakeup_pair[0] = CURL_SOCKET_BAD; multi->wakeup_pair[1] = CURL_SOCKET_BAD; @@ -319,6 +320,7 @@ error: Curl_ssl_scache_destroy(multi->ssl_scache); #endif if(multi->admin) { + Curl_multi_ev_xfer_done(multi, multi->admin); multi->admin->multi = NULL; Curl_close(&multi->admin); } @@ -362,6 +364,17 @@ bool Curl_is_connecting(struct Curl_easy *data) return data->mstate < MSTATE_DO; } +static CURLMcode multi_assess_wakeup(struct Curl_multi *multi) +{ +#ifdef ENABLE_WAKEUP + if(multi->socket_cb) + return Curl_multi_ev_assess_xfer(multi, multi->admin); +#else + (void)multi; +#endif + return CURLM_OK; +} + static CURLMcode multi_xfers_add(struct Curl_multi *multi, struct Curl_easy *data) { @@ -528,6 +541,12 @@ CURLMcode curl_multi_add_handle(CURLM *m, CURL *d) data->set.server_response_timeout; multi->admin->set.no_signal = data->set.no_signal; + mresult = multi_assess_wakeup(multi); + if(mresult) { + failf(data, "error enabling wakeup listening: %d", mresult); + return mresult; + } + CURL_TRC_M(data, "added to multi, mid=%u, running=%u, total=%u", data->mid, Curl_multi_xfers_running(multi), Curl_uint32_tbl_count(&multi->xfers)); @@ -656,9 +675,6 @@ static CURLcode multi_done(struct Curl_easy *data, { CURLcode result; struct connectdata *conn = data->conn; - struct multi_done_ctx mdctx; - - memset(&mdctx, 0, sizeof(mdctx)); CURL_TRC_M(data, "multi_done: status: %d prem: %d done: %d", (int)status, (int)premature, data->state.done); @@ -689,7 +705,7 @@ static CURLcode multi_done(struct Curl_easy *data, } /* this calls the protocol-specific function pointer previously set */ - if(conn->scheme->run->done && (data->mstate >= MSTATE_PROTOCONNECT)) + if(conn && conn->scheme->run->done && (data->mstate >= MSTATE_PROTOCONNECT)) result = conn->scheme->run->done(data, status, premature); else result = status; @@ -706,17 +722,23 @@ static CURLcode multi_done(struct Curl_easy *data, result = Curl_1st_fatal(result, Curl_xfer_write_done(data, premature)); /* Inform connection filters that this transfer is done */ - Curl_conn_ev_data_done(data, premature); + if(conn) + Curl_conn_ev_data_done(data, premature); process_pending_handles(data->multi); /* connection / multiplex */ if(!result) result = Curl_req_done(&data->req, data, premature); - /* Under the potential connection pool's share lock, decide what to - * do with the transfer's connection. */ - mdctx.premature = premature; - Curl_cpool_do_locked(data, data->conn, multi_done_locked, &mdctx); + if(conn) { + /* Under the potential connection pool's share lock, decide what to + * do with the transfer's connection. */ + struct multi_done_ctx mdctx; + + memset(&mdctx, 0, sizeof(mdctx)); + mdctx.premature = premature; + Curl_cpool_do_locked(data, data->conn, multi_done_locked, &mdctx); + } /* flush the netrc cache */ Curl_netrc_cleanup(&data->state.netrc); @@ -871,6 +893,12 @@ CURLMcode curl_multi_remove_handle(CURLM *m, CURL *d) if(mresult) return mresult; + mresult = multi_assess_wakeup(multi); + if(mresult) { + failf(data, "error enabling wakeup listening: %d", mresult); + return mresult; + } + CURL_TRC_M(data, "removed from multi, mid=%u, running=%u, total=%u", mid, Curl_multi_xfers_running(multi), Curl_uint32_tbl_count(&multi->xfers)); @@ -1086,75 +1114,79 @@ static CURLcode mstate_perform_pollset(struct Curl_easy *data, CURLMcode Curl_multi_pollset(struct Curl_easy *data, struct easy_pollset *ps) { - CURLMcode mresult = CURLM_OK; CURLcode result = CURLE_OK; + Curl_pollset_reset(ps); +#ifdef ENABLE_WAKEUP + /* The admin handle always listens on the wakeup socket when there + * are transfers alive. */ + if(data->multi && (data == data->multi->admin) && + data->multi->xfers_alive && + (data->multi->wakeup_pair[0] != CURL_SOCKET_BAD)) { + result = Curl_pollset_add_in(data, ps, data->multi->wakeup_pair[0]); + } +#endif /* If the transfer has no connection, this is fine. Happens when called via curl_multi_remove_handle() => Curl_multi_ev_assess() => Curl_multi_pollset(). */ - Curl_pollset_reset(ps); - if(!data->conn) - return CURLM_OK; + if(!result && data->conn) { + switch(data->mstate) { + case MSTATE_INIT: + case MSTATE_PENDING: + case MSTATE_SETUP: + case MSTATE_CONNECT: + /* nothing to poll for yet */ + break; - switch(data->mstate) { - case MSTATE_INIT: - case MSTATE_PENDING: - case MSTATE_SETUP: - case MSTATE_CONNECT: - /* nothing to poll for yet */ - break; + case MSTATE_RESOLVING: + result = Curl_resolv_pollset(data, ps); + break; - case MSTATE_RESOLVING: - result = Curl_resolv_pollset(data, ps); - break; + case MSTATE_CONNECTING: + result = mstate_connecting_pollset(data, ps); + break; - case MSTATE_CONNECTING: - result = mstate_connecting_pollset(data, ps); - break; + case MSTATE_PROTOCONNECT: + case MSTATE_PROTOCONNECTING: + result = mstate_protocol_pollset(data, ps); + break; - case MSTATE_PROTOCONNECT: - case MSTATE_PROTOCONNECTING: - result = mstate_protocol_pollset(data, ps); - break; + case MSTATE_DO: + case MSTATE_DOING: + result = mstate_do_pollset(data, ps); + break; - case MSTATE_DO: - case MSTATE_DOING: - result = mstate_do_pollset(data, ps); - break; + case MSTATE_DOING_MORE: + result = mstate_domore_pollset(data, ps); + break; - case MSTATE_DOING_MORE: - result = mstate_domore_pollset(data, ps); - break; + case MSTATE_DID: /* same as PERFORMING in regard to polling */ + case MSTATE_PERFORMING: + result = mstate_perform_pollset(data, ps); + break; - case MSTATE_DID: /* same as PERFORMING in regard to polling */ - case MSTATE_PERFORMING: - result = mstate_perform_pollset(data, ps); - break; + case MSTATE_RATELIMITING: + /* we need to let time pass, ignore socket(s) */ + break; - case MSTATE_RATELIMITING: - /* we need to let time pass, ignore socket(s) */ - break; + case MSTATE_DONE: + case MSTATE_COMPLETED: + case MSTATE_MSGSENT: + /* nothing more to poll for */ + break; - case MSTATE_DONE: - case MSTATE_COMPLETED: - case MSTATE_MSGSENT: - /* nothing more to poll for */ - break; - - default: - failf(data, "multi_getsock: unexpected multi state %d", data->mstate); - DEBUGASSERT(0); - break; + default: + failf(data, "multi_getsock: unexpected multi state %d", data->mstate); + DEBUGASSERT(0); + break; + } } if(result) { if(result == CURLE_OUT_OF_MEMORY) - mresult = CURLM_OUT_OF_MEMORY; - else { - failf(data, "error determining pollset: %d", result); - mresult = CURLM_INTERNAL_ERROR; - } - goto out; + return CURLM_OUT_OF_MEMORY; + failf(data, "error determining pollset: %d", result); + return CURLM_INTERNAL_ERROR; } #ifdef CURLVERBOSE @@ -1193,8 +1225,7 @@ CURLMcode Curl_multi_pollset(struct Curl_easy *data, } #endif -out: - return mresult; + return CURLM_OK; } CURLMcode curl_multi_fdset(CURLM *m, @@ -1313,7 +1344,176 @@ static void reset_socket_fdwrite(curl_socket_t s) if(!getsockopt(s, SOL_SOCKET, SO_TYPE, (char *)&t, &l) && t == SOCK_STREAM) swrite(s, NULL, 0); } -#endif + +static CURLMcode multi_winsock_select(struct Curl_multi *multi, + struct curl_pollfds *cpfds, + unsigned int curl_nfds, + struct curl_waitfd extra_fds[], + unsigned int extra_nfds, + int timeout_ms, + bool wait_on_nop, + int *pnevents) +{ + CURLMcode mresult = CURLM_OK; + WSANETWORKEVENTS wsa_events; + int nevents = 0; + size_t i; + + DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT); + + /* Set the WSA events based on the collected pollds */ + for(i = 0; i < cpfds->n; i++) { + long mask = 0; + if(cpfds->pfds[i].events & POLLIN) + mask |= FD_READ | FD_ACCEPT | FD_CLOSE; + if(cpfds->pfds[i].events & POLLPRI) + mask |= FD_OOB; + if(cpfds->pfds[i].events & POLLOUT) { + mask |= FD_WRITE | FD_CONNECT | FD_CLOSE; + reset_socket_fdwrite(cpfds->pfds[i].fd); + } + if(mask) { + if(WSAEventSelect(cpfds->pfds[i].fd, multi->wsa_event, mask) != 0) { + mresult = CURLM_OUT_OF_MEMORY; + goto out; + } + } + } + + if(cpfds->n || wait_on_nop) { + int pollrc = 0; + + if(cpfds->n) { /* pre-check with Winsock */ + pollrc = Curl_poll(cpfds->pfds, cpfds->n, 0); + if(pollrc < 0) { + mresult = CURLM_UNRECOVERABLE_POLL; + goto out; + } + nevents = pollrc; + } + + if(!nevents) { + /* now wait... if not ready during the pre-check (pollrc == 0) */ + WSAWaitForMultipleEvents(1, &multi->wsa_event, FALSE, (DWORD)timeout_ms, + FALSE); + } + + /* With Winsock, we have to run the following section unconditionally + to call WSAEventSelect(fd, event, 0) on all the sockets */ + /* copy revents results from the poll to the curl_multi_wait poll + struct, the bit values of the actual underlying poll() implementation + may not be the same as the ones in the public libcurl API! */ + for(i = 0; i < extra_nfds; i++) { + unsigned short mask = 0; + curl_socket_t s = extra_fds[i].fd; + + wsa_events.lNetworkEvents = 0; + if(WSAEnumNetworkEvents(s, NULL, &wsa_events) == 0) { + if(wsa_events.lNetworkEvents & (FD_READ | FD_ACCEPT | FD_CLOSE)) + mask |= CURL_WAIT_POLLIN; + if(wsa_events.lNetworkEvents & (FD_WRITE | FD_CONNECT | FD_CLOSE)) + mask |= CURL_WAIT_POLLOUT; + if(wsa_events.lNetworkEvents & FD_OOB) + mask |= CURL_WAIT_POLLPRI; + if(!pollrc && wsa_events.lNetworkEvents) + nevents++; + } + WSAEventSelect(s, multi->wsa_event, 0); + if(!pollrc) { + extra_fds[i].revents = (short)mask; + continue; + } + else { + unsigned r = (unsigned)cpfds->pfds[curl_nfds + i].revents; + if(r & POLLIN) + mask |= CURL_WAIT_POLLIN; + if(r & POLLOUT) + mask |= CURL_WAIT_POLLOUT; + if(r & POLLPRI) + mask |= CURL_WAIT_POLLPRI; + extra_fds[i].revents = (short)mask; + } + } + + /* Count up all our own sockets that had activity, + and remove them from the event. */ + for(i = 0; i < curl_nfds; ++i) { + wsa_events.lNetworkEvents = 0; + if(WSAEnumNetworkEvents(cpfds->pfds[i].fd, NULL, &wsa_events) == 0) { + if(!pollrc && wsa_events.lNetworkEvents) + nevents++; + } + WSAEventSelect(cpfds->pfds[i].fd, multi->wsa_event, 0); + } + WSAResetEvent(multi->wsa_event); + } + +out: + *pnevents = nevents; + return mresult; +} + +#else /* USE_WINSOCK */ + +static CURLMcode multi_posix_poll(struct Curl_multi *multi, + struct curl_pollfds *cpfds, + unsigned int curl_nfds, + struct curl_waitfd extra_fds[], + unsigned int extra_nfds, + int timeout_ms, + bool wait_on_nop, + int *pnevents) +{ + CURLMcode mresult = CURLM_OK; + int nevents = 0; + size_t i; + + if(cpfds->n) { + int pollrc = Curl_poll(cpfds->pfds, cpfds->n, timeout_ms); /* wait... */ + if(pollrc < 0) { + mresult = CURLM_UNRECOVERABLE_POLL; + goto out; + } + nevents = pollrc; + + /* copy revents results from the poll to the curl_multi_wait poll + struct, the bit values of the actual underlying poll() implementation + may not be the same as the ones in the public libcurl API! */ + for(i = 0; i < extra_nfds; i++) { + unsigned r = (unsigned)cpfds->pfds[curl_nfds + i].revents; + unsigned short mask = 0; + if(r & POLLIN) + mask |= CURL_WAIT_POLLIN; + if(r & POLLOUT) + mask |= CURL_WAIT_POLLOUT; + if(r & POLLPRI) + mask |= CURL_WAIT_POLLPRI; + extra_fds[i].revents = (short)mask; + } + } + else if(wait_on_nop) { + struct curltime expire_time; + long sleep_ms = 0; + + /* Avoid busy-looping when there is nothing particular to wait for */ + multi_timeout(multi, &expire_time, &sleep_ms); + if(sleep_ms) { + if(sleep_ms > timeout_ms) + sleep_ms = timeout_ms; + /* when there are no easy handles in the multi, this holds a -1 + timeout */ + else if(sleep_ms < 0) + sleep_ms = timeout_ms; + curlx_wait_ms(sleep_ms); + } + } + +out: + *pnevents = nevents; + return mresult; +} + +#endif /* !USE_WINSOCK */ #define NUM_POLLS_ON_STACK 10 @@ -1322,13 +1522,13 @@ static CURLMcode multi_wait(struct Curl_multi *multi, unsigned int extra_nfds, int timeout_ms, int *ret, - bool extrawait, /* when no socket, wait */ - bool use_wakeup) + bool wait_on_nop) /* spend time, even if there + * is nothing to monitor */ { size_t i; struct curltime expire_time; long timeout_internal; - int retcode = 0; + int nevents = 0; struct easy_pollset ps; struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK]; struct curl_pollfds cpfds; @@ -1336,13 +1536,8 @@ static CURLMcode multi_wait(struct Curl_multi *multi, struct Curl_easy *data = NULL; CURLMcode mresult = CURLM_OK; uint32_t mid; - -#ifdef USE_WINSOCK - WSANETWORKEVENTS wsa_events; - DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT); -#endif -#ifndef ENABLE_WAKEUP - (void)use_wakeup; +#ifdef ENABLE_WAKEUP + int wakeup_idx = -1; #endif if(!GOOD_MULTI_HANDLE(multi)) @@ -1380,6 +1575,16 @@ static CURLMcode multi_wait(struct Curl_multi *multi, goto out; } +#ifdef ENABLE_WAKEUP + if(wait_on_nop && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { + wakeup_idx = cpfds.n; + if(Curl_pollfds_add_sock(&cpfds, multi->wakeup_pair[0], POLLIN)) { + mresult = CURLM_OUT_OF_MEMORY; + goto out; + } + } +#endif + curl_nfds = cpfds.n; /* what curl internally uses in cpfds */ /* Add external file descriptions from poll-like struct curl_waitfd */ for(i = 0; i < extra_nfds; i++) { @@ -1396,38 +1601,6 @@ static CURLMcode multi_wait(struct Curl_multi *multi, } } -#ifdef USE_WINSOCK - /* Set the WSA events based on the collected pollds */ - for(i = 0; i < cpfds.n; i++) { - long mask = 0; - if(cpfds.pfds[i].events & POLLIN) - mask |= FD_READ | FD_ACCEPT | FD_CLOSE; - if(cpfds.pfds[i].events & POLLPRI) - mask |= FD_OOB; - if(cpfds.pfds[i].events & POLLOUT) { - mask |= FD_WRITE | FD_CONNECT | FD_CLOSE; - reset_socket_fdwrite(cpfds.pfds[i].fd); - } - if(mask) { - if(WSAEventSelect(cpfds.pfds[i].fd, multi->wsa_event, mask) != 0) { - mresult = CURLM_OUT_OF_MEMORY; - goto out; - } - } - } -#endif - -#ifdef ENABLE_WAKEUP -#ifndef USE_WINSOCK - if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { - if(Curl_pollfds_add_sock(&cpfds, multi->wakeup_pair[0], POLLIN)) { - mresult = CURLM_OUT_OF_MEMORY; - goto out; - } - } -#endif -#endif - /* We check the internal timeout *AFTER* we collected all sockets to * poll. Collecting the sockets may install new timers by protocols * and connection filters. @@ -1439,122 +1612,32 @@ static CURLMcode multi_wait(struct Curl_multi *multi, if(data) CURL_TRC_M(data, "multi_wait(fds=%d, timeout=%d) tinternal=%ld", cpfds.n, timeout_ms, timeout_internal); -#if defined(ENABLE_WAKEUP) && defined(USE_WINSOCK) - if(cpfds.n || use_wakeup) { -#else - if(cpfds.n) { -#endif - int pollrc; -#ifdef USE_WINSOCK - if(cpfds.n) /* pre-check with Winsock */ - pollrc = Curl_poll(cpfds.pfds, cpfds.n, 0); - else - pollrc = 0; -#else - pollrc = Curl_poll(cpfds.pfds, cpfds.n, timeout_ms); /* wait... */ -#endif - if(pollrc < 0) { - mresult = CURLM_UNRECOVERABLE_POLL; - goto out; - } - - if(pollrc > 0) { - retcode = pollrc; -#ifdef USE_WINSOCK - } - else { /* now wait... if not ready during the pre-check (pollrc == 0) */ - WSAWaitForMultipleEvents(1, &multi->wsa_event, FALSE, (DWORD)timeout_ms, - FALSE); - } - /* With Winsock, we have to run the following section unconditionally - to call WSAEventSelect(fd, event, 0) on all the sockets */ - { -#endif - /* copy revents results from the poll to the curl_multi_wait poll - struct, the bit values of the actual underlying poll() implementation - may not be the same as the ones in the public libcurl API! */ - for(i = 0; i < extra_nfds; i++) { - unsigned r = (unsigned)cpfds.pfds[curl_nfds + i].revents; - unsigned short mask = 0; -#ifdef USE_WINSOCK - curl_socket_t s = extra_fds[i].fd; - wsa_events.lNetworkEvents = 0; - if(WSAEnumNetworkEvents(s, NULL, &wsa_events) == 0) { - if(wsa_events.lNetworkEvents & (FD_READ | FD_ACCEPT | FD_CLOSE)) - mask |= CURL_WAIT_POLLIN; - if(wsa_events.lNetworkEvents & (FD_WRITE | FD_CONNECT | FD_CLOSE)) - mask |= CURL_WAIT_POLLOUT; - if(wsa_events.lNetworkEvents & FD_OOB) - mask |= CURL_WAIT_POLLPRI; - if(ret && !pollrc && wsa_events.lNetworkEvents) - retcode++; - } - WSAEventSelect(s, multi->wsa_event, 0); - if(!pollrc) { - extra_fds[i].revents = (short)mask; - continue; - } -#endif - if(r & POLLIN) - mask |= CURL_WAIT_POLLIN; - if(r & POLLOUT) - mask |= CURL_WAIT_POLLOUT; - if(r & POLLPRI) - mask |= CURL_WAIT_POLLPRI; - extra_fds[i].revents = (short)mask; - } #ifdef USE_WINSOCK - /* Count up all our own sockets that had activity, - and remove them from the event. */ - for(i = 0; i < curl_nfds; ++i) { - wsa_events.lNetworkEvents = 0; - if(WSAEnumNetworkEvents(cpfds.pfds[i].fd, NULL, &wsa_events) == 0) { - if(ret && !pollrc && wsa_events.lNetworkEvents) - retcode++; - } - WSAEventSelect(cpfds.pfds[i].fd, multi->wsa_event, 0); - } - WSAResetEvent(multi->wsa_event); + mresult = multi_winsock_select(multi, &cpfds, curl_nfds, + extra_fds, extra_nfds, + timeout_ms, wait_on_nop, &nevents); #else + mresult = multi_posix_poll(multi, &cpfds, curl_nfds, + extra_fds, extra_nfds, + timeout_ms, wait_on_nop, &nevents); +#endif + #ifdef ENABLE_WAKEUP - if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { - if(cpfds.pfds[curl_nfds + extra_nfds].revents & POLLIN) { - (void)Curl_wakeup_consume(multi->wakeup_pair, TRUE); - /* do not count the wakeup socket into the returned value */ - retcode--; - } + if(nevents && (wakeup_idx >= 0)) { + if(cpfds.pfds[wakeup_idx].revents & POLLIN) { + (void)Curl_wakeup_consume(multi->wakeup_pair, TRUE); + /* do not count the wakeup socket into the returned value */ + nevents--; } -#endif -#endif } - } - - if(ret) - *ret = retcode; -#if defined(ENABLE_WAKEUP) && defined(USE_WINSOCK) - if(extrawait && !cpfds.n && !use_wakeup) { -#else - if(extrawait && !cpfds.n) { #endif - long sleep_ms = 0; - - /* Avoid busy-looping when there is nothing particular to wait for */ - multi_timeout(multi, &expire_time, &sleep_ms); - if(sleep_ms) { - if(sleep_ms > timeout_ms) - sleep_ms = timeout_ms; - /* when there are no easy handles in the multi, this holds a -1 - timeout */ - else if(sleep_ms < 0) - sleep_ms = timeout_ms; - curlx_wait_ms(sleep_ms); - } - } out: Curl_pollset_cleanup(&ps); Curl_pollfds_cleanup(&cpfds); + if(ret) + *ret = nevents; return mresult; } @@ -1564,8 +1647,7 @@ CURLMcode curl_multi_wait(CURLM *multi, int timeout_ms, int *ret) { - return multi_wait(multi, extra_fds, extra_nfds, timeout_ms, ret, FALSE, - FALSE); + return multi_wait(multi, extra_fds, extra_nfds, timeout_ms, ret, FALSE); } CURLMcode curl_multi_poll(CURLM *multi, @@ -1574,7 +1656,7 @@ CURLMcode curl_multi_poll(CURLM *multi, int timeout_ms, int *ret) { - return multi_wait(multi, extra_fds, extra_nfds, timeout_ms, ret, TRUE, TRUE); + return multi_wait(multi, extra_fds, extra_nfds, timeout_ms, ret, TRUE); } CURLMcode curl_multi_wakeup(CURLM *m) @@ -1588,11 +1670,11 @@ CURLMcode curl_multi_wakeup(CURLM *m) if(!GOOD_MULTI_HANDLE(multi)) return CURLM_BAD_HANDLE; -#ifdef ENABLE_WAKEUP #ifdef USE_WINSOCK if(WSASetEvent(multi->wsa_event)) return CURLM_OK; -#else +#endif +#ifdef ENABLE_WAKEUP /* the wakeup_pair variable is only written during init and cleanup, making it safe to access from another thread after the init part and before cleanup */ @@ -1601,7 +1683,6 @@ CURLMcode curl_multi_wakeup(CURLM *m) return CURLM_WAKEUP_FAILURE; return CURLM_OK; } -#endif #endif return CURLM_WAKEUP_FAILURE; } @@ -2408,6 +2489,8 @@ static void handle_completed(struct Curl_multi *multi, Curl_uint32_bset_remove(&multi->pending, data->mid); Curl_uint32_bset_add(&multi->msgsent, data->mid); --multi->xfers_alive; + if(!multi->xfers_alive) + multi_assess_wakeup(multi); } static CURLMcode multi_runsingle(struct Curl_multi *multi, @@ -2890,10 +2973,9 @@ CURLMcode curl_multi_cleanup(CURLM *m) #ifdef USE_WINSOCK WSACloseEvent(multi->wsa_event); -#else +#endif #ifdef ENABLE_WAKEUP Curl_wakeup_destroy(multi->wakeup_pair); -#endif #endif multi_xfer_bufs_free(multi); @@ -3132,6 +3214,14 @@ static CURLMcode multi_socket(struct Curl_multi *multi, handles the case when the application asks libcurl to run the timeout prematurely. */ memset(&multi->last_expire_ts, 0, sizeof(multi->last_expire_ts)); + + /* Applications may set `socket_cb` *after* having added transfers + * first. *Then* kick off processing with a + * curl_multi_socket_action(TIMEOUT) afterwards. Make sure our + * admin handle registers its pollset with the callbacks present. */ + mresult = multi_assess_wakeup(multi); + if(mresult) + goto out; } multi_mark_expired_as_dirty(multi, multi_now(multi)); diff --git a/lib/multi_ev.c b/lib/multi_ev.c index 696a012ebf..b98cda0c68 100644 --- a/lib/multi_ev.c +++ b/lib/multi_ev.c @@ -603,10 +603,8 @@ void Curl_multi_ev_xfer_done(struct Curl_multi *multi, struct Curl_easy *data) { DEBUGASSERT(!data->conn); /* transfer should have been detached */ - if(data != multi->admin) { - (void)mev_assess(multi, data, NULL); - Curl_meta_remove(data, CURL_META_MEV_POLLSET); - } + (void)mev_assess(multi, data, NULL); + Curl_meta_remove(data, CURL_META_MEV_POLLSET); } void Curl_multi_ev_conn_done(struct Curl_multi *multi, diff --git a/lib/multihandle.h b/lib/multihandle.h index 7a561124ae..3b18930e00 100644 --- a/lib/multihandle.h +++ b/lib/multihandle.h @@ -71,7 +71,7 @@ typedef enum { #define CURLPIPE_ANY (CURLPIPE_MULTIPLEX) -#ifndef CURL_DISABLE_SOCKETPAIR +#if !defined(CURL_DISABLE_SOCKETPAIR) && !defined(USE_WINSOCK) #define ENABLE_WAKEUP #endif @@ -160,12 +160,11 @@ struct Curl_multi { #ifdef USE_WINSOCK WSAEVENT wsa_event; /* Winsock event used for waits */ -#else +#endif #ifdef ENABLE_WAKEUP curl_socket_t wakeup_pair[2]; /* eventfd()/pipe()/socketpair() used for wakeup 0 is used for read, 1 is used for write */ -#endif #endif unsigned int max_concurrent_streams; unsigned int maxconnects; /* if >0, a fixed limit of the maximum number of diff --git a/tests/http/test_19_shutdown.py b/tests/http/test_19_shutdown.py index 7cb8ce6415..9e5effd20c 100644 --- a/tests/http/test_19_shutdown.py +++ b/tests/http/test_19_shutdown.py @@ -160,7 +160,7 @@ class TestShutdown: # check that all connection sockets were removed from event removes = [line for line in r.trace_lines if re.match(r'.*socket cb: socket \d+ REMOVED', line)] - assert len(removes) == count, f'{removes}' + assert len(removes) >= count, f'{removes}' # check graceful shutdown on multiplexed http @pytest.mark.parametrize("proto", Env.http_mplx_protos()) diff --git a/tests/libtest/lib2405.c b/tests/libtest/lib2405.c index f19a7ee4e1..d65196e236 100644 --- a/tests/libtest/lib2405.c +++ b/tests/libtest/lib2405.c @@ -331,8 +331,8 @@ static CURLcode empty_multi_test(void) goto test_cleanup; } else if(fd_count > 0) { - curl_mfprintf(stderr, "curl_multi_waitfds() returned non-zero count of " - "waitfds: %d.\n", fd_count); + curl_mfprintf(stderr, "curl_multi_waitfds(), empty, returned non-zero " + "count of waitfds: %d.\n", fd_count); result = TEST_ERR_FAILURE; goto test_cleanup; } @@ -352,8 +352,8 @@ static CURLcode empty_multi_test(void) result = TEST_ERR_FAILURE; goto test_cleanup; } - else if(fd_count > 0) { - curl_mfprintf(stderr, "curl_multi_waitfds() returned non-zero count of " + else if(fd_count > 1) { + curl_mfprintf(stderr, "curl_multi_waitfds() returned > 1 count of " "waitfds: %d.\n", fd_count); result = TEST_ERR_FAILURE; goto test_cleanup; @@ -380,16 +380,16 @@ static CURLcode test_lib2405(const char *URL) goto test_cleanup; if(testnum == 2405) { - /* HTTP1, expected 2 waitfds - one for each transfer */ - test_run_check(TEST_USE_HTTP1, 2); + /* HTTP1, expected 3 waitfds - one for each transfer + wakeup */ + test_run_check(TEST_USE_HTTP1, 3); } #ifdef USE_HTTP2 else { /* 2407 */ - /* HTTP2, expected 2 waitfds - one for each transfer */ - test_run_check(TEST_USE_HTTP2, 2); + /* HTTP2, expected 3 waitfds - one for each transfer + wakeup */ + test_run_check(TEST_USE_HTTP2, 3); - /* HTTP2 with multiplexing, expected 1 waitfds - one for all transfers */ - test_run_check(TEST_USE_HTTP2_MPLEX, 1); + /* HTTP2 with multiplexing, expected 2 waitfds - transfers + wakeup */ + test_run_check(TEST_USE_HTTP2_MPLEX, 2); } #endif diff --git a/tests/libtest/lib530.c b/tests/libtest/lib530.c index ad2ad536f7..31cef1b702 100644 --- a/tests/libtest/lib530.c +++ b/tests/libtest/lib530.c @@ -404,11 +404,11 @@ static CURLcode test_lib530(const char *URL) if(!result) curl_mfprintf(stderr, "%s FAILED: %d\n", t530_tag(), result); - result = testone(URL, 0, 1); /* fail 1st call to socket callback */ + result = testone(URL, 0, 2); /* fail 2nd call to socket callback */ if(!result) curl_mfprintf(stderr, "%s FAILED: %d\n", t530_tag(), result); - result = testone(URL, 0, 2); /* fail 2nd call to socket callback */ + result = testone(URL, 0, 3); /* fail 3rd call to socket callback */ if(!result) curl_mfprintf(stderr, "%s FAILED: %d\n", t530_tag(), result); diff --git a/tests/libtest/lib758.c b/tests/libtest/lib758.c index e743da7f58..b6642cb130 100644 --- a/tests/libtest/lib758.c +++ b/tests/libtest/lib758.c @@ -386,7 +386,8 @@ static CURLcode t758_one(const char *URL, int timer_fail_at, if(t758_ctx.fake_async_cert_verification_pending && !t758_ctx.fake_async_cert_verification_finished) { - if(sockets.read.count || sockets.write.count) { + /* the wakeup socket will be monitored */ + if((sockets.read.count > 1) || sockets.write.count) { t758_msg("during verification there should be no sockets scheduled"); result = TEST_ERR_MAJOR_BAD; goto test_cleanup;