diff --git a/docs/Makefile.am b/docs/Makefile.am index fa7ab3d331..750d65481e 100644 --- a/docs/Makefile.am +++ b/docs/Makefile.am @@ -68,6 +68,7 @@ INTERNALDOCS = \ internals/SCORECARD.md \ internals/SPLAY.md \ internals/STRPARSE.md \ + internals/THRDPOOL+QUEUE.md \ internals/TIME-KEEPING.md \ internals/TLS-SESSIONS.md \ internals/UINT_SETS.md \ diff --git a/docs/internals/THRDPOOL+QUEUE.md b/docs/internals/THRDPOOL+QUEUE.md new file mode 100644 index 0000000000..e55f8a2be3 --- /dev/null +++ b/docs/internals/THRDPOOL+QUEUE.md @@ -0,0 +1,112 @@ + + +# Thread Pool and Queue + +The thread pool and queue manage asynchronous processing of "work items" +to a user. The "work items" are opaque to the pool and queue, represented +by a `void *`, handled via callback functions. + +Thread pool and queue are available with `pthreads` or native Win32 +builds. + +## `Curl_thrdpool` + +This data structure manages a pool of threads for asynchronous operations. + +### Properties + +A pool's properties are: + +- minimum number of threads running, default 0 +- maximum number of threads running +- timeout for idle threads before they shut down + +The minimum number of threads is started at creation of the pool and +kept always running. On demand, when more work is available but all +existing threads are busy, it starts new threads, up to maximum. + +When work ceases, the threads "above" the minimum number exit again +after the given idle time. + +### Operation + +The pool is created with providing three callback functions: + +- `take`: the pool calls this to take a new "work item" for processing. From + the pool's point of view, a work item is a `void *`. "take" is called from + the pool's threads. When getting anything besides `NULL`, the thread is + "busy". On getting `NULL`, the thread becomes idle. +- `process`: called by a pool thread to process a work item. This can not + return any error. Any error handling must be done via properties in + the work item itself, opaque to the pool. +- `return`: after processing, the work item is returned and the pool has + no longer have any memory of it. + +The pool only tries to "take" new work items when told to. Calling +`Curl_thrdpool_signal(pool, n)` awakens up to `n`threads which then +take new work items. This may cause new threads being started. The other +time a pool thread "take"s work it when it has finished +processing and returned another item. + +A thread pool can be destroyed via `Curl_thrdpool_destroy(pool, join)` where +`join` determines if active threads shall be joined or detached. + +### Safety + +The thread pool operates use a mutex and condition variables to manage +concurrency. All interactions and callback invocation are done under +the pool's mutex lock, *except* the "process" callback which is invoked +unlocked. + +To avoid deadlocks, no callback must invoked other pool functions. Also, +any call of pool functions may result in callback invocations. + +The "work items", once "taken" by the pool, should not be referenced +from any other place. Thread pools **always** invoke the "return" +callback on a work item, even after the pool has been destroyed by +detaching the threads. + +There is a `user_data` in the pool's creation that is passed to "take" +and "return" callbacks. Once a pool is destroyed, this `user_data` is +cleared and "return" callbacks always see a `NULL`. This way, +the "return" callback may act on that fact. + +## `Curl_thrdq` + +A `thrdq` is a two-way queue with a thread pool. Users of a thread queue may +"send" work items into the queue and "receive" processed items back. + +### Properties + +A queue's properties are: + +- The properties of the thread pool to create +- the maximum length of the "send" queue, 0 for unlimited + +### Operation + +The queue is created with providing three callback functions: + +- `free`: called to free a work item that is in the queue but is + no longer returned (or processed). This happens when the queue is + destroyed or when work items are removed for other reasons. +- `process`: process the item. Can not fail. +- `event`: called when work items have been added to the "receive" list. + +Users of a thread queue call `Curl_thrdq_send()` to add a work item to +the queue. Calling `Curl_thrdq_recv()` delivers processed items back. + +### Safety + +The thread queue operates use a mutex and condition variables to manage +concurrency. All interactions and callback invocation are done under +the queue's mutex lock, *except* the "process" callback which is invoked +unlocked. + +Users of a thread queue should not hold any reference to work items sent +into the queue. The provided "free" callback has to take care of any +resources allocated by work items. diff --git a/lib/Makefile.inc b/lib/Makefile.inc index 50a2b3282c..b332b2cf41 100644 --- a/lib/Makefile.inc +++ b/lib/Makefile.inc @@ -259,6 +259,8 @@ LIB_CFILES = \ system_win32.c \ telnet.c \ tftp.c \ + thrdpool.c \ + thrdqueue.c \ transfer.c \ uint-bset.c \ uint-hash.c \ @@ -388,6 +390,8 @@ LIB_HFILES = \ system_win32.h \ telnet.h \ tftp.h \ + thrdpool.h \ + thrdqueue.h \ transfer.h \ uint-bset.h \ uint-hash.h \ diff --git a/lib/curl_threads.c b/lib/curl_threads.c index 52ab385a44..5f1d47f3d7 100644 --- a/lib/curl_threads.c +++ b/lib/curl_threads.c @@ -23,6 +23,7 @@ ***************************************************************************/ #include "curl_setup.h" #include "curl_threads.h" +#include "curlx/timeval.h" #ifdef USE_THREADS @@ -132,3 +133,82 @@ int Curl_thread_join(curl_thread_t *hnd) #error neither HAVE_THREADS_POSIX nor _WIN32 defined #endif #endif /* USE_THREADS */ + +#ifdef USE_MUTEX + +#ifdef HAVE_THREADS_POSIX + +void Curl_cond_signal(pthread_cond_t *c) +{ + /* return code defined as always 0 */ + (void)pthread_cond_signal(c); +} + +void Curl_cond_wait(pthread_cond_t *c, pthread_mutex_t *m) +{ + /* return code defined as always 0 */ + (void)pthread_cond_wait(c, m); +} + +CURLcode Curl_cond_timedwait(pthread_cond_t *c, pthread_mutex_t *m, + uint32_t timeout_ms) +{ + struct curltime now; + struct timespec ts; + timediff_t usec; + int rc; + + /* POSIX expects an "absolute" time until the condition wait ends. + * We cannot use `curlx_now()` here that may run on some monotonic clock + * that will be most likely in the past, as far as POSIX abstime is + * concerned. */ +#ifdef HAVE_GETTIMEOFDAY + struct timeval tv; + (void)gettimeofday(&tv, NULL); + now.tv_sec = tv.tv_sec; + now.tv_usec = (int)tv.tv_usec; +#else + now.tv_sec = time(NULL); + now.tv_usec = 0; +#endif + + ts.tv_sec = now.tv_sec + (timeout_ms / 1000); + usec = now.tv_usec + ((timeout_ms % 1000) * 1000); + if(usec >= 1000000) { + ++ts.tv_sec; + usec %= 1000000; + } + ts.tv_nsec = (long)usec * 1000; + + rc = pthread_cond_timedwait(c, m, &ts); + if(rc == SOCKETIMEDOUT) + return CURLE_OPERATION_TIMEDOUT; + return rc ? CURLE_UNRECOVERABLE_POLL : CURLE_OK; +} + +#elif defined(_WIN32) + +void Curl_cond_signal(CONDITION_VARIABLE *c) +{ + WakeConditionVariable(c); +} + +void Curl_cond_wait(CONDITION_VARIABLE *c, CRITICAL_SECTION *m) +{ + SleepConditionVariableCS(c, m, INFINITE); +} + +CURLcode Curl_cond_timedwait(CONDITION_VARIABLE *c, CRITICAL_SECTION *m, + uint32_t timeout_ms) +{ + if(!SleepConditionVariableCS(c, m, (DWORD)timeout_ms)) { + DWORD err = GetLastError(); + return (err == ERROR_TIMEOUT) ? + CURLE_OPERATION_TIMEDOUT : CURLE_UNRECOVERABLE_POLL; + } + return CURLE_OK; +} +#else +#error neither HAVE_THREADS_POSIX nor _WIN32 defined +#endif +#endif /* USE_MUTEX */ diff --git a/lib/curl_threads.h b/lib/curl_threads.h index ff3af6a793..3be46c7b15 100644 --- a/lib/curl_threads.h +++ b/lib/curl_threads.h @@ -39,6 +39,9 @@ # define Curl_mutex_acquire(m) pthread_mutex_lock(m) # define Curl_mutex_release(m) pthread_mutex_unlock(m) # define Curl_mutex_destroy(m) pthread_mutex_destroy(m) +# define curl_cond_t pthread_cond_t +# define Curl_cond_init(c) pthread_cond_init(c, NULL) +# define Curl_cond_destroy(c) pthread_cond_destroy(c) #elif defined(_WIN32) # define CURL_THREAD_RETURN_T DWORD # define CURL_STDCALL WINAPI @@ -49,9 +52,18 @@ # define Curl_mutex_acquire(m) EnterCriticalSection(m) # define Curl_mutex_release(m) LeaveCriticalSection(m) # define Curl_mutex_destroy(m) DeleteCriticalSection(m) +# define curl_cond_t CONDITION_VARIABLE +# define Curl_cond_init(c) InitializeConditionVariable(c) +# define Curl_cond_destroy(c) (void)(c) #else #error neither HAVE_THREADS_POSIX nor _WIN32 defined #endif + +void Curl_cond_signal(curl_cond_t *c); +void Curl_cond_wait(curl_cond_t *c, curl_mutex_t *m); +/* Returns CURLE_OPERATION_TIMEDOUT on timeout */ +CURLcode Curl_cond_timedwait(curl_cond_t *c, curl_mutex_t *m, + uint32_t timeout_ms); #endif /* USE_MUTEX */ #ifdef USE_THREADS diff --git a/lib/curl_trc.c b/lib/curl_trc.c index 4b8734a7e9..a80fb61eff 100644 --- a/lib/curl_trc.c +++ b/lib/curl_trc.c @@ -278,6 +278,19 @@ void Curl_trc_cf_infof(struct Curl_easy *data, const struct Curl_cfilter *cf, } } +void Curl_trc_feat_infof(struct Curl_easy *data, + struct curl_trc_feat *feat, + const char *fmt, ...) +{ + DEBUGASSERT(feat); + if(Curl_trc_ft_is_verbose(data, feat)) { + va_list ap; + va_start(ap, fmt); + trc_infof(data, feat, NULL, 0, fmt, ap); + va_end(ap); + } +} + static const char * const Curl_trc_timer_names[] = { "100_TIMEOUT", "ASYNC_NAME", diff --git a/lib/curl_trc.h b/lib/curl_trc.h index a82c945e5f..362a20c800 100644 --- a/lib/curl_trc.h +++ b/lib/curl_trc.h @@ -101,6 +101,10 @@ struct curl_trc_feat { int log_level; }; +void Curl_trc_feat_infof(struct Curl_easy *data, + struct curl_trc_feat *feat, + const char *fmt, ...) CURL_PRINTF(3, 4); + #ifndef CURL_DISABLE_FTP extern struct curl_trc_feat Curl_trc_feat_ftp; void Curl_trc_ftp(struct Curl_easy *data, diff --git a/lib/memdebug.c b/lib/memdebug.c index 408930179d..6eda7a8236 100644 --- a/lib/memdebug.c +++ b/lib/memdebug.c @@ -99,6 +99,7 @@ static void curl_dbg_log_locked(const char *format, ...) CURL_PRINTF(1, 2); _exit() comes after the atexit handlers are called. curl/curl#6620 */ static void curl_dbg_cleanup(void) { + bool locked = curl_dbg_lock(); if(curl_dbg_logfile && curl_dbg_logfile != stderr && curl_dbg_logfile != stdout) { @@ -108,6 +109,7 @@ static void curl_dbg_cleanup(void) fclose(curl_dbg_logfile); } curl_dbg_logfile = NULL; + curl_dbg_unlock(locked); #ifdef USE_MUTEX if(dbg_mutex_init) { Curl_mutex_destroy(&dbg_mutex); diff --git a/lib/thrdpool.c b/lib/thrdpool.c new file mode 100644 index 0000000000..be6363fd1a --- /dev/null +++ b/lib/thrdpool.c @@ -0,0 +1,457 @@ +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) Daniel Stenberg, , et al. + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + * SPDX-License-Identifier: curl + * + ***************************************************************************/ +#include "curl_setup.h" + +#ifdef USE_THREADS + +#if defined(USE_THREADS_POSIX) && defined(HAVE_PTHREAD_H) +#include +#endif + +#include "llist.h" +#include "curl_threads.h" +#include "curlx/timeval.h" +#include "thrdpool.h" +#ifdef CURLVERBOSE +#include "curl_trc.h" +#include "urldata.h" +#endif + + +struct thrdslot { + struct Curl_llist_node node; + struct curl_thrdpool *tpool; + curl_thread_t thread; + curl_cond_t await; + struct curltime starttime; + const char *work_description; + timediff_t work_timeout_ms; + uint32_t id; + BIT(running); + BIT(idle); +}; + +struct curl_thrdpool { + char *name; + uint64_t refcount; + curl_mutex_t lock; + curl_cond_t await; + struct Curl_llist slots; + struct Curl_llist zombies; + Curl_thrdpool_take_item_cb *fn_take; + Curl_thrdpool_process_item_cb *fn_process; + Curl_thrdpool_return_item_cb *fn_return; + void *fn_user_data; + CURLcode fatal_err; + uint32_t min_threads; + uint32_t max_threads; + uint32_t idle_time_ms; + uint32_t next_id; + BIT(aborted); + BIT(detached); +}; + +static void thrdpool_join_zombies(struct curl_thrdpool *tpool); +static bool thrdpool_unlink(struct curl_thrdpool *tpool, bool locked); + +static void thrdslot_destroy(struct thrdslot *tslot) +{ + DEBUGASSERT(tslot->thread == curl_thread_t_null); + DEBUGASSERT(!tslot->running); + Curl_cond_destroy(&tslot->await); + curlx_free(tslot); +} + +static void thrdslot_done(struct thrdslot *tslot) +{ + struct curl_thrdpool *tpool = tslot->tpool; + + DEBUGASSERT(Curl_node_llist(&tslot->node) == &tpool->slots); + Curl_node_remove(&tslot->node); + tslot->running = FALSE; + Curl_llist_append(&tpool->zombies, tslot, &tslot->node); + Curl_cond_signal(&tpool->await); +} + +static CURL_THREAD_RETURN_T CURL_STDCALL thrdslot_run(void *arg) +{ + struct thrdslot *tslot = arg; + struct curl_thrdpool *tpool = tslot->tpool; + void *item; + + Curl_mutex_acquire(&tpool->lock); + DEBUGASSERT(Curl_node_llist(&tslot->node) == &tpool->slots); + for(;;) { + while(!tpool->aborted) { + tslot->work_description = NULL; + tslot->work_timeout_ms = 0; + item = tpool->fn_take(tpool->fn_user_data, &tslot->work_description, + &tslot->work_timeout_ms); + if(!item) + break; + tslot->starttime = curlx_now(); + tslot->idle = FALSE; + Curl_mutex_release(&tpool->lock); + + tpool->fn_process(item); + + Curl_mutex_acquire(&tpool->lock); + tslot->work_description = NULL; + tpool->fn_return(item, tpool->aborted ? NULL : tpool->fn_user_data); + } + + if(tpool->aborted) + goto out; + + tslot->idle = TRUE; + tslot->starttime = curlx_now(); + thrdpool_join_zombies(tpool); + Curl_cond_signal(&tpool->await); + /* Only wait with idle timeout when we are above the minimum + * number of threads. Otherwise short idle timeouts will keep + * on activating threads that have no means to shut down. */ + if((tpool->idle_time_ms > 0) && + (Curl_llist_count(&tpool->slots) > tpool->min_threads)) { + CURLcode r = Curl_cond_timedwait(&tslot->await, &tpool->lock, + tpool->idle_time_ms); + if((r == CURLE_OPERATION_TIMEDOUT) && + (Curl_llist_count(&tpool->slots) > tpool->min_threads)) { + goto out; + } + } + else { + Curl_cond_wait(&tslot->await, &tpool->lock); + } + } + +out: + thrdslot_done(tslot); + if(!thrdpool_unlink(tslot->tpool, TRUE)) { + /* tpool not destroyed */ + Curl_mutex_release(&tpool->lock); + } + return 0; +} + +static CURLcode thrdslot_start(struct curl_thrdpool *tpool) +{ + struct thrdslot *tslot; + CURLcode result = CURLE_OUT_OF_MEMORY; + + tslot = curlx_calloc(1, sizeof(*tslot)); + if(!tslot) + goto out; + tslot->id = tpool->next_id++; + tslot->tpool = tpool; + tslot->thread = curl_thread_t_null; + Curl_cond_init(&tslot->await); + + tpool->refcount++; + tslot->running = TRUE; + tslot->thread = Curl_thread_create(thrdslot_run, tslot); + if(tslot->thread == curl_thread_t_null) { /* never started */ + tslot->running = FALSE; + thrdpool_unlink(tpool, TRUE); + result = CURLE_FAILED_INIT; + goto out; + } + + Curl_llist_append(&tpool->slots, tslot, &tslot->node); + tslot = NULL; + result = CURLE_OK; + +out: + if(tslot) + thrdslot_destroy(tslot); + return result; +} + +static void thrdpool_wake_all(struct curl_thrdpool *tpool) +{ + struct Curl_llist_node *e; + for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) { + struct thrdslot *tslot = Curl_node_elem(e); + Curl_cond_signal(&tslot->await); + } +} + +static void thrdpool_join_zombies(struct curl_thrdpool *tpool) +{ + struct Curl_llist_node *e; + + for(e = Curl_llist_head(&tpool->zombies); e; + e = Curl_llist_head(&tpool->zombies)) { + struct thrdslot *tslot = Curl_node_elem(e); + + Curl_node_remove(&tslot->node); + if(tslot->thread != curl_thread_t_null) { + Curl_mutex_release(&tpool->lock); + Curl_thread_join(&tslot->thread); + Curl_mutex_acquire(&tpool->lock); + tslot->thread = curl_thread_t_null; + } + thrdslot_destroy(tslot); + } +} + +static bool thrdpool_unlink(struct curl_thrdpool *tpool, bool locked) +{ + DEBUGASSERT(tpool->refcount); + if(tpool->refcount) + tpool->refcount--; + if(tpool->refcount) + return FALSE; + + /* no more references, free */ + DEBUGASSERT(tpool->aborted); + thrdpool_join_zombies(tpool); + if(locked) + Curl_mutex_release(&tpool->lock); + curlx_free(tpool->name); + Curl_cond_destroy(&tpool->await); + Curl_mutex_destroy(&tpool->lock); + curlx_free(tpool); + return TRUE; +} + +CURLcode Curl_thrdpool_create(struct curl_thrdpool **ptpool, + const char *name, + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms, + Curl_thrdpool_take_item_cb *fn_take, + Curl_thrdpool_process_item_cb *fn_process, + Curl_thrdpool_return_item_cb *fn_return, + void *user_data) +{ + struct curl_thrdpool *tpool; + CURLcode result = CURLE_OUT_OF_MEMORY; + + tpool = curlx_calloc(1, sizeof(*tpool)); + if(!tpool) + goto out; + tpool->refcount = 1; + + Curl_mutex_init(&tpool->lock); + Curl_cond_init(&tpool->await); + Curl_llist_init(&tpool->slots, NULL); + Curl_llist_init(&tpool->zombies, NULL); + tpool->min_threads = min_threads; + tpool->max_threads = max_threads; + tpool->idle_time_ms = idle_time_ms; + tpool->fn_take = fn_take; + tpool->fn_process = fn_process; + tpool->fn_return = fn_return; + tpool->fn_user_data = user_data; + + tpool->name = curlx_strdup(name); + if(!tpool->name) + goto out; + + if(tpool->min_threads) + result = Curl_thrdpool_signal(tpool, tpool->min_threads); + else + result = CURLE_OK; + +out: + if(result && tpool) { + tpool->aborted = TRUE; + thrdpool_unlink(tpool, FALSE); + tpool = NULL; + } + *ptpool = tpool; + return result; +} + +void Curl_thrdpool_destroy(struct curl_thrdpool *tpool, bool join) +{ + Curl_mutex_acquire(&tpool->lock); + + tpool->aborted = TRUE; + + while(join && Curl_llist_count(&tpool->slots)) { + thrdpool_wake_all(tpool); + Curl_cond_wait(&tpool->await, &tpool->lock); + } + + thrdpool_join_zombies(tpool); + + /* detach all still running threads */ + if(Curl_llist_count(&tpool->slots)) { + struct Curl_llist_node *e; + for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) { + struct thrdslot *tslot = Curl_node_elem(e); + if(tslot->thread != curl_thread_t_null) + Curl_thread_destroy(&tslot->thread); + } + tpool->detached = TRUE; + } + + if(!thrdpool_unlink(tpool, TRUE)) { + /* tpool not destroyed */ + Curl_mutex_release(&tpool->lock); + } +} + +CURLcode Curl_thrdpool_signal(struct curl_thrdpool *tpool, uint32_t nthreads) +{ + struct Curl_llist_node *e, *n; + CURLcode result = CURLE_OK; + + Curl_mutex_acquire(&tpool->lock); + DEBUGASSERT(!tpool->aborted); + + thrdpool_join_zombies(tpool); + + for(e = Curl_llist_head(&tpool->slots); e && nthreads; e = n) { + struct thrdslot *tslot = Curl_node_elem(e); + n = Curl_node_next(e); + if(tslot->idle) { + Curl_cond_signal(&tslot->await); + --nthreads; + } + else if(!tslot->starttime.tv_sec && !tslot->starttime.tv_usec) { + /* starting thread, queries for work soon. */ + --nthreads; + } + } + + while(nthreads && !result && + Curl_llist_count(&tpool->slots) < tpool->max_threads) { + result = thrdslot_start(tpool); + if(result) + break; + --nthreads; + } + + Curl_mutex_release(&tpool->lock); + return result; +} + +static bool thrdpool_all_idle(struct curl_thrdpool *tpool) +{ + struct Curl_llist_node *e; + for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) { + struct thrdslot *tslot = Curl_node_elem(e); + if(!tslot->idle) + return FALSE; + } + return TRUE; +} + +CURLcode Curl_thrdpool_await_idle(struct curl_thrdpool *tpool, + uint32_t timeout_ms) +{ + CURLcode result = CURLE_OK; + struct curltime end = { 0 }; + + Curl_mutex_acquire(&tpool->lock); + DEBUGASSERT(!tpool->aborted); + if(tpool->aborted) { + result = CURLE_FAILED_INIT; + goto out; + } + + while(!thrdpool_all_idle(tpool)) { + if(timeout_ms) { + timediff_t remain_ms; + CURLcode r; + + if(!end.tv_sec && !end.tv_usec) { + end = curlx_now(); + end.tv_sec += (time_t)(timeout_ms / 1000); + end.tv_usec += (int)(timeout_ms % 1000) * 1000; + if(end.tv_usec >= 1000000) { + end.tv_sec++; + end.tv_usec -= 1000000; + } + } + remain_ms = curlx_timediff_ms(curlx_now(), end); + if(remain_ms <= 0) + r = CURLE_OPERATION_TIMEDOUT; + else + r = Curl_cond_timedwait(&tpool->await, &tpool->lock, + (uint32_t)remain_ms); + if(r == CURLE_OPERATION_TIMEDOUT) { + result = r; + break; + } + } + else { + Curl_cond_wait(&tpool->await, &tpool->lock); + } + } + +out: + thrdpool_join_zombies(tpool); + Curl_mutex_release(&tpool->lock); + return result; +} + +#ifdef CURLVERBOSE +void Curl_thrdpool_trace(struct curl_thrdpool *tpool, + struct Curl_easy *data, + struct curl_trc_feat *feat) +{ + if(Curl_trc_ft_is_verbose(data, feat)) { + struct Curl_llist_node *e; + struct curltime now = curlx_now(); + + Curl_mutex_acquire(&tpool->lock); + if(!Curl_llist_count(&tpool->slots)) { + Curl_trc_feat_infof(data, feat, "[%s] [TPOOL] no threads running", + tpool->name); + } + for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) { + struct thrdslot *tslot = Curl_node_elem(e); + timediff_t elapsed_ms = curlx_ptimediff_ms(&now, &tslot->starttime); + if(!tslot->running) { + Curl_trc_feat_infof(data, feat, "[%s] [TPOOL] [%u]: not running", + tpool->name, tslot->id); + } + else if(!tslot->starttime.tv_sec && !tslot->starttime.tv_usec) { + Curl_trc_feat_infof(data, feat, "[%s] [TPOOL] [%u]: starting...", + tpool->name, tslot->id); + } + else if(tslot->idle) { + Curl_trc_feat_infof(data, feat, "[%s] [TPOOL] [%u]: idle for %" + FMT_TIMEDIFF_T "ms", + tpool->name, tslot->id, elapsed_ms); + } + else { + timediff_t remain_ms = tslot->work_timeout_ms ? + (tslot->work_timeout_ms - elapsed_ms) : 0; + Curl_trc_feat_infof(data, feat, "[%s] [TPOOL] [%u]: busy %" + FMT_TIMEDIFF_T "ms, timeout in %" FMT_TIMEDIFF_T + "ms: %s", + tpool->name, tslot->id, elapsed_ms, remain_ms, + tslot->work_description); + } + } + Curl_mutex_release(&tpool->lock); + } +} +#endif + +#endif /* USE_THREADS */ diff --git a/lib/thrdpool.h b/lib/thrdpool.h new file mode 100644 index 0000000000..8fa40538a3 --- /dev/null +++ b/lib/thrdpool.h @@ -0,0 +1,102 @@ +#ifndef HEADER_CURL_THRDPOOL_H +#define HEADER_CURL_THRDPOOL_H +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) Daniel Stenberg, , et al. + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + * SPDX-License-Identifier: curl + * + ***************************************************************************/ +#include "curl_setup.h" +#include "curlx/timediff.h" + +#ifdef USE_THREADS + +struct curl_thrdpool; +struct Curl_easy; +struct curl_trc_feat; + +/* Invoked under thread pool lock to get an "item" to work on. Must + * return NULL if there is nothing to do. + * Caller might return a descriptive string about the "item", where + * available. The string needs to have the same lifetime as the + * item itself. */ +typedef void *Curl_thrdpool_take_item_cb(void *user_data, + const char **pdescription, + timediff_t *ptimeout_ms); + +/* Invoked outside thread pool lock to process the item taken. */ +typedef void Curl_thrdpool_process_item_cb(void *item); + +/* Invoked under thread pool lock to return a processed item back + * to the producer. + * If the thread pool has been destroyed, `user_data` will be NULL + * and the callback is responsible to release all `item` resources. */ +typedef void Curl_thrdpool_return_item_cb(void *item, void *user_data); + +/* Create a new thread pool. + * @param name name of pool for tracing purposes + * @param min_threads minimum number of threads to have always running + * @param max_threads maximum number of threads running, ever. + * @param idle_time_ms maximum time a thread should wait for tasks to + * process before shutting down (unless the pool is + * already at minimum thread count), use 0 for + * infinite wait. + * @param fn_take take the next item to process + * @param fn_process process the item taken + * @param fn_return return the processed item + * @param user_data parameter passed to take/return callbacks + */ +CURLcode Curl_thrdpool_create(struct curl_thrdpool **ptpool, + const char *name, + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms, + Curl_thrdpool_take_item_cb *fn_take, + Curl_thrdpool_process_item_cb *fn_process, + Curl_thrdpool_return_item_cb *fn_return, + void *user_data); + +/* Destroy the thread pool, release its resources. + * With `join` being TRUE, the call will wait for all threads to finish + * processing before returning. On FALSE, it will detach all threads + * running. Ongoing item processing will continue to run and + * `fn_return` will be invoked with NULL user_data before the thread exits. + */ +void Curl_thrdpool_destroy(struct curl_thrdpool *tpool, bool join); + +/* Signal the pool to wake up `nthreads` idle worker threads, possible + * creating new threads up to the max limit. The number should reflect + * the items that can actually be taken for processing right away, e.g. + * the producers "queue" length of outstanding items. + */ +CURLcode Curl_thrdpool_signal(struct curl_thrdpool *tpool, uint32_t nthreads); + +CURLcode Curl_thrdpool_await_idle(struct curl_thrdpool *tpool, + uint32_t timeout_ms); + +#ifdef CURLVERBOSE +void Curl_thrdpool_trace(struct curl_thrdpool *tpool, + struct Curl_easy *data, + struct curl_trc_feat *feat); +#endif + +#endif /* USE_THREADS */ + +#endif /* HEADER_CURL_THRDPOOL_H */ diff --git a/lib/thrdqueue.c b/lib/thrdqueue.c new file mode 100644 index 0000000000..e36279ef12 --- /dev/null +++ b/lib/thrdqueue.c @@ -0,0 +1,390 @@ +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) Daniel Stenberg, , et al. + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + * SPDX-License-Identifier: curl + * + ***************************************************************************/ +#include "curl_setup.h" + +#ifdef USE_THREADS + +#if defined(USE_THREADS_POSIX) && defined(HAVE_PTHREAD_H) +#include +#endif + +#include "llist.h" +#include "curl_threads.h" +#include "thrdpool.h" +#include "thrdqueue.h" +#include "curlx/timeval.h" +#ifdef CURLVERBOSE +#include "curl_trc.h" +#include "urldata.h" +#endif + + +struct curl_thrdq { + char *name; + curl_mutex_t lock; + curl_cond_t await; + struct Curl_llist sendq; + struct Curl_llist recvq; + struct curl_thrdpool *tpool; + Curl_thrdq_item_free_cb *fn_free; + Curl_thrdq_item_process_cb *fn_process; + Curl_thrdq_ev_cb *fn_event; + void *fn_user_data; + uint32_t send_max_len; + BIT(aborted); +}; + +struct thrdq_item { + struct Curl_llist_node node; + Curl_thrdq_item_free_cb *fn_free; + Curl_thrdq_item_process_cb *fn_process; + void *item; + struct curltime start; + timediff_t timeout_ms; + const char *description; +}; + +static struct thrdq_item *thrdq_item_create(struct curl_thrdq *tqueue, + void *item, + const char *description, + timediff_t timeout_ms) +{ + struct thrdq_item *qitem; + + qitem = curlx_calloc(1, sizeof(*qitem)); + if(!qitem) + return NULL; + qitem->item = item; + qitem->description = description; + qitem->fn_free = tqueue->fn_free; + qitem->fn_process = tqueue->fn_process; + if(timeout_ms) { + qitem->start = curlx_now(); + qitem->timeout_ms = timeout_ms; + } + return qitem; +} + +static void thrdq_item_destroy(struct thrdq_item *qitem) +{ + if(qitem->item) + qitem->fn_free(qitem->item); + curlx_free(qitem); +} + +static void thrdq_item_list_dtor(void *user_data, void *elem) +{ + (void)user_data; + thrdq_item_destroy(elem); +} + +static void *thrdq_tpool_take(void *user_data, const char **pdescription, + timediff_t *ptimeout_ms) +{ + struct curl_thrdq *tqueue = user_data; + struct thrdq_item *qitem = NULL; + struct Curl_llist_node *e; + Curl_thrdq_ev_cb *fn_event = NULL; + void *fn_user_data = NULL; + + Curl_mutex_acquire(&tqueue->lock); + *pdescription = NULL; + *ptimeout_ms = 0; + if(!tqueue->aborted) { + e = Curl_llist_head(&tqueue->sendq); + if(e) { + struct curltime now = curlx_now(); + timediff_t timeout_ms; + while(e) { + qitem = Curl_node_take_elem(e); + timeout_ms = (!qitem->timeout_ms) ? 0 : + (qitem->timeout_ms - curlx_ptimediff_ms(&now, &qitem->start)); + if(timeout_ms < 0) { + /* timed out while queued, place on receive queue */ + Curl_llist_append(&tqueue->recvq, qitem, &qitem->node); + fn_event = tqueue->fn_event; + fn_user_data = tqueue->fn_user_data; + qitem = NULL; + e = Curl_llist_head(&tqueue->sendq); + continue; + } + else { + *pdescription = qitem->description; + *ptimeout_ms = timeout_ms; + break; + } + } + } + } + Curl_mutex_release(&tqueue->lock); + /* avoiding deadlocks */ + if(fn_event) + fn_event(tqueue, CURL_THRDQ_EV_ITEM_DONE, fn_user_data); + return qitem; +} + +static void thrdq_tpool_return(void *item, void *user_data) +{ + struct curl_thrdq *tqueue = user_data; + struct thrdq_item *qitem = item; + Curl_thrdq_ev_cb *fn_event = NULL; + void *fn_user_data = NULL; + + if(!tqueue) { + thrdq_item_destroy(item); + return; + } + + Curl_mutex_acquire(&tqueue->lock); + if(tqueue->aborted) { + thrdq_item_destroy(qitem); + } + else { + DEBUGASSERT(!Curl_node_llist(&qitem->node)); + Curl_llist_append(&tqueue->recvq, qitem, &qitem->node); + fn_event = tqueue->fn_event; + fn_user_data = tqueue->fn_user_data; + } + Curl_mutex_release(&tqueue->lock); + /* avoiding deadlocks */ + if(fn_event) + fn_event(tqueue, CURL_THRDQ_EV_ITEM_DONE, fn_user_data); +} + +static void thrdq_tpool_process(void *item) +{ + struct thrdq_item *qitem = item; + qitem->fn_process(qitem->item); +} + +static void thrdq_unlink(struct curl_thrdq *tqueue, bool locked, bool join) +{ + DEBUGASSERT(tqueue->aborted); + if(tqueue->tpool) { + if(locked) + Curl_mutex_release(&tqueue->lock); + Curl_thrdpool_destroy(tqueue->tpool, join); + tqueue->tpool = NULL; + if(locked) + Curl_mutex_acquire(&tqueue->lock); + } + + Curl_llist_destroy(&tqueue->sendq, NULL); + Curl_llist_destroy(&tqueue->recvq, NULL); + curlx_free(tqueue->name); + Curl_cond_destroy(&tqueue->await); + if(locked) + Curl_mutex_release(&tqueue->lock); + Curl_mutex_destroy(&tqueue->lock); + curlx_free(tqueue); +} + +CURLcode Curl_thrdq_create(struct curl_thrdq **ptqueue, + const char *name, + uint32_t max_len, + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms, + Curl_thrdq_item_free_cb *fn_free, + Curl_thrdq_item_process_cb *fn_process, + Curl_thrdq_ev_cb *fn_event, + void *user_data) +{ + struct curl_thrdq *tqueue; + CURLcode result = CURLE_OUT_OF_MEMORY; + + tqueue = curlx_calloc(1, sizeof(*tqueue)); + if(!tqueue) + goto out; + + Curl_mutex_init(&tqueue->lock); + Curl_cond_init(&tqueue->await); + Curl_llist_init(&tqueue->sendq, thrdq_item_list_dtor); + Curl_llist_init(&tqueue->recvq, thrdq_item_list_dtor); + tqueue->fn_free = fn_free; + tqueue->fn_process = fn_process; + tqueue->fn_event = fn_event; + tqueue->fn_user_data = user_data; + tqueue->send_max_len = max_len; + + tqueue->name = curlx_strdup(name); + if(!tqueue->name) + goto out; + + result = Curl_thrdpool_create(&tqueue->tpool, name, + min_threads, max_threads, idle_time_ms, + thrdq_tpool_take, + thrdq_tpool_process, + thrdq_tpool_return, + tqueue); + +out: + if(result && tqueue) { + tqueue->aborted = TRUE; + thrdq_unlink(tqueue, FALSE, TRUE); + tqueue = NULL; + } + *ptqueue = tqueue; + return result; +} + +void Curl_thrdq_destroy(struct curl_thrdq *tqueue, bool join) +{ + Curl_mutex_acquire(&tqueue->lock); + DEBUGASSERT(!tqueue->aborted); + tqueue->aborted = TRUE; + thrdq_unlink(tqueue, TRUE, join); +} + +CURLcode Curl_thrdq_send(struct curl_thrdq *tqueue, void *item, + const char *description, timediff_t timeout_ms) +{ + CURLcode result = CURLE_AGAIN; + size_t signals = 0; + + Curl_mutex_acquire(&tqueue->lock); + if(tqueue->aborted) { + DEBUGASSERT(0); + result = CURLE_SEND_ERROR; + goto out; + } + if(timeout_ms < 0) { + result = CURLE_OPERATION_TIMEDOUT; + goto out; + } + + if(!tqueue->send_max_len || + (Curl_llist_count(&tqueue->sendq) < tqueue->send_max_len)) { + struct thrdq_item *qitem = thrdq_item_create(tqueue, item, description, + timeout_ms); + if(!qitem) { + result = CURLE_OUT_OF_MEMORY; + goto out; + } + Curl_llist_append(&tqueue->sendq, qitem, &qitem->node); + result = CURLE_OK; + signals = Curl_llist_count(&tqueue->sendq); + } + +out: + Curl_mutex_release(&tqueue->lock); + /* Signal thread pool unlocked to avoid deadlocks */ + if(!result && signals) + result = Curl_thrdpool_signal(tqueue->tpool, (uint32_t)signals); + return result; +} + +CURLcode Curl_thrdq_recv(struct curl_thrdq *tqueue, void **pitem) +{ + CURLcode result = CURLE_AGAIN; + struct Curl_llist_node *e; + + *pitem = NULL; + Curl_mutex_acquire(&tqueue->lock); + if(tqueue->aborted) { + DEBUGASSERT(0); + result = CURLE_RECV_ERROR; + goto out; + } + + e = Curl_llist_head(&tqueue->recvq); + if(e) { + struct thrdq_item *qitem = Curl_node_take_elem(e); + *pitem = qitem->item; + qitem->item = NULL; + thrdq_item_destroy(qitem); + result = CURLE_OK; + } +out: + Curl_mutex_release(&tqueue->lock); + return result; +} + +static void thrdq_llist_clean_matches(struct Curl_llist *llist, + Curl_thrdq_item_match_cb *fn_match, + void *match_data) +{ + struct Curl_llist_node *e, *n; + struct thrdq_item *qitem; + + for(e = Curl_llist_head(llist); e; e = n) { + n = Curl_node_next(e); + qitem = Curl_node_elem(e); + if(fn_match(qitem->item, match_data)) + Curl_node_remove(e); + } +} + +void Curl_thrdq_clear(struct curl_thrdq *tqueue, + Curl_thrdq_item_match_cb *fn_match, + void *match_data) +{ + Curl_mutex_acquire(&tqueue->lock); + if(tqueue->aborted) { + DEBUGASSERT(0); + goto out; + } + thrdq_llist_clean_matches(&tqueue->sendq, fn_match, match_data); + thrdq_llist_clean_matches(&tqueue->recvq, fn_match, match_data); +out: + Curl_mutex_release(&tqueue->lock); +} + +CURLcode Curl_thrdq_await_done(struct curl_thrdq *tqueue, + uint32_t timeout_ms) +{ + return Curl_thrdpool_await_idle(tqueue->tpool, timeout_ms); +} + +#ifdef CURLVERBOSE +void Curl_thrdq_trace(struct curl_thrdq *tqueue, + struct Curl_easy *data, + struct curl_trc_feat *feat) +{ + if(Curl_trc_ft_is_verbose(data, feat)) { + struct Curl_llist_node *e; + struct thrdq_item *qitem; + + Curl_thrdpool_trace(tqueue->tpool, data, feat); + Curl_mutex_acquire(&tqueue->lock); + if(!Curl_llist_count(&tqueue->sendq) && + !Curl_llist_count(&tqueue->recvq)) { + Curl_trc_feat_infof(data, feat, "[%s] [QUEUE] empty", tqueue->name); + } + for(e = Curl_llist_head(&tqueue->sendq); e; e = Curl_node_next(e)) { + qitem = Curl_node_elem(e); + Curl_trc_feat_infof(data, feat, "[%s] [QUEUE] in: %s", + tqueue->name, qitem->description); + } + for(e = Curl_llist_head(&tqueue->recvq); e; e = Curl_node_next(e)) { + qitem = Curl_node_elem(e); + Curl_trc_feat_infof(data, feat, "[%s] [QUEUE] out: %s", + tqueue->name, qitem->description); + } + Curl_mutex_release(&tqueue->lock); + } +} +#endif + +#endif /* USE_THREADS */ diff --git a/lib/thrdqueue.h b/lib/thrdqueue.h new file mode 100644 index 0000000000..516fcc043f --- /dev/null +++ b/lib/thrdqueue.h @@ -0,0 +1,114 @@ +#ifndef HEADER_CURL_THRDQUEUE_H +#define HEADER_CURL_THRDQUEUE_H +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) Daniel Stenberg, , et al. + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + * SPDX-License-Identifier: curl + * + ***************************************************************************/ +#include "curl_setup.h" +#include "curlx/timediff.h" + +#ifdef USE_THREADS + +struct Curl_easy; +struct curl_trc_feat; +struct curl_thrdq; + +typedef enum { + CURL_THRDQ_EV_ITEM_DONE /* an item has been processed and is ready */ +} Curl_thrdq_event; + +/* Notification callback when "events" happen in the queue. May be + * call from any thread, queue is not locked. */ +typedef void Curl_thrdq_ev_cb(const struct curl_thrdq *tqueue, + Curl_thrdq_event ev, + void *user_data); + +/* Process a queued item. Maybe call from any thread. Queue is + * not locked. */ +typedef void Curl_thrdq_item_process_cb(void *item); + +/* Free an item. May be called from any thread at any time for an + * item that is in the queue (either before or after processing). */ +typedef void Curl_thrdq_item_free_cb(void *item); + +/* Create a new queue processing "items" by a thread pool. + */ +CURLcode Curl_thrdq_create(struct curl_thrdq **ptqueue, + const char *name, + uint32_t max_len, /* 0 for unlimited */ + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms, + Curl_thrdq_item_free_cb *fn_free, + Curl_thrdq_item_process_cb *fn_process, + Curl_thrdq_ev_cb *fn_event, /* optional */ + void *user_data); + +/* Destroy the queue, free all queued items unprocessed and destroy + * the thread pool used. + * @param join TRUE when thread pool shall be joined. FALSE for + * detaching any running threads. + */ +void Curl_thrdq_destroy(struct curl_thrdq *tqueue, bool join); + +/* Send "item" onto the queue. The caller needs to clear any reference + * to "item" on success, e.g. the queue takes ownership. + * `description` is an optional string describing the item for tracing + * purposes. It needs to have the same lifetime as `item`. + * Returns CURLE_AGAIN when the queue has already been full. + * + * With`timeout_ms` != 0, items that get stuck that long in the send + * queue are removed and added to the receive queue right away. + */ +CURLcode Curl_thrdq_send(struct curl_thrdq *tqueue, void *item, + const char *description, timediff_t timeout_ms); + +/* Receive the oldest, processed item from the queue again, if there is one. + * The caller takes ownership of the item received, e.g. the queue + * relinquishes all references to item. + * Returns CURLE_AGAIN when there is no processed item, setting `pitem` + * to NULL. + */ +CURLcode Curl_thrdq_recv(struct curl_thrdq *tqueue, void **pitem); + +/* Return TRUE if the passed "item" matches. */ +typedef bool Curl_thrdq_item_match_cb(void *item, void *match_data); + +/* Clear all scheduled/processed items that match from the queue. This + * will *not* be able to clear items that are being processed. + */ +void Curl_thrdq_clear(struct curl_thrdq *tqueue, + Curl_thrdq_item_match_cb *fn_match, + void *match_data); + +CURLcode Curl_thrdq_await_done(struct curl_thrdq *tqueue, + uint32_t timeout_ms); + +#ifdef CURLVERBOSE +void Curl_thrdq_trace(struct curl_thrdq *tqueue, + struct Curl_easy *data, + struct curl_trc_feat *feat); +#endif + +#endif /* USE_THREADS */ + +#endif /* HEADER_CURL_THRDQUEUE_H */ diff --git a/tests/data/Makefile.am b/tests/data/Makefile.am index 87f5c857ac..3f410fe763 100644 --- a/tests/data/Makefile.am +++ b/tests/data/Makefile.am @@ -286,6 +286,8 @@ test3200 test3201 test3202 test3203 test3204 test3205 test3206 test3207 \ test3208 test3209 test3210 test3211 test3212 test3213 test3214 test3215 \ test3216 test3217 test3218 test3219 \ \ +test3300 test3301 \ +\ test4000 test4001 EXTRA_DIST = $(TESTCASES) DISABLED data-xml1 data320.html \ diff --git a/tests/data/test3300 b/tests/data/test3300 new file mode 100644 index 0000000000..bc5b41b5e0 --- /dev/null +++ b/tests/data/test3300 @@ -0,0 +1,19 @@ + + + + +unittest +threads + + + +# Client-side + + +unittest + + +thrdpool unit tests + + + diff --git a/tests/data/test3301 b/tests/data/test3301 new file mode 100644 index 0000000000..3c0e6a5c29 --- /dev/null +++ b/tests/data/test3301 @@ -0,0 +1,19 @@ + + + + +unittest +threads + + + +# Client-side + + +unittest + + +thrdqueue unit tests + + + diff --git a/tests/unit/Makefile.inc b/tests/unit/Makefile.inc index 01934d5fc7..459aee5f84 100644 --- a/tests/unit/Makefile.inc +++ b/tests/unit/Makefile.inc @@ -45,4 +45,5 @@ TESTS_C = \ unit1979.c unit1980.c \ unit2600.c unit2601.c unit2602.c unit2603.c unit2604.c unit2605.c \ unit3200.c unit3205.c \ - unit3211.c unit3212.c unit3213.c unit3214.c unit3216.c unit3219.c + unit3211.c unit3212.c unit3213.c unit3214.c unit3216.c unit3219.c \ + unit3300.c unit3301.c diff --git a/tests/unit/unit3300.c b/tests/unit/unit3300.c new file mode 100644 index 0000000000..ebef6d7228 --- /dev/null +++ b/tests/unit/unit3300.c @@ -0,0 +1,164 @@ +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) Daniel Stenberg, , et al. + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + * SPDX-License-Identifier: curl + * + ***************************************************************************/ +#include "unitcheck.h" + +#include "curlx/wait.h" +#include "thrdpool.h" + +#ifdef USE_THREADS + +struct unit3300_ctx { + uint32_t total; + uint32_t taken; + uint32_t returned; +}; + +static uint32_t unit3300_item = 23; +static uint32_t unit3300_delay_ms = 0; + +static void unit3300_ctx_init(struct unit3300_ctx *ctx, + uint32_t total, + uint32_t delay_ms) +{ + memset(ctx, 0, sizeof(*ctx)); + ctx->total = total; + unit3300_delay_ms = delay_ms; +} + +static void *unit3300_take(void *user_data, const char **pdescription, + timediff_t *ptimeout_ms) +{ + struct unit3300_ctx *ctx = user_data; + *pdescription = NULL; + *ptimeout_ms = 0; + if(ctx->taken < ctx->total) { + ctx->taken++; + return &unit3300_item; + } + return NULL; +} + +static void unit3300_process(void *item) +{ + fail_unless(item == &unit3300_item, "process unexpected item"); + if(unit3300_delay_ms) { + curlx_wait_ms(unit3300_delay_ms); + } +} + +static void unit3300_return(void *item, void *user_data) +{ + struct unit3300_ctx *ctx = user_data; + (void)item; + if(ctx) { + ctx->returned++; + fail_unless(ctx->returned <= ctx->total, "returned too many"); + } +} + +static CURLcode test_unit3300(const char *arg) +{ + UNITTEST_BEGIN_SIMPLE + struct curl_thrdpool *tpool; + struct unit3300_ctx ctx; + CURLcode r; + + /* pool without minimum, will not start anything */ + unit3300_ctx_init(&ctx, 10, 0); + r = Curl_thrdpool_create(&tpool, "unit3300a", 0, 2, 0, + unit3300_take, unit3300_process, unit3300_return, + &ctx); + fail_unless(!r, "pool-a create"); + Curl_thrdpool_destroy(tpool, TRUE); + fail_unless(!ctx.returned, "pool-a unexpected items returned"); + fail_unless(!ctx.taken, "pool-a unexpected items taken"); + + /* pool without minimum, signal start, consumes everything */ + unit3300_ctx_init(&ctx, 10, 0); + r = Curl_thrdpool_create(&tpool, "unit3300b", 0, 2, 0, + unit3300_take, unit3300_process, unit3300_return, + &ctx); + fail_unless(!r, "pool-b create"); + r = Curl_thrdpool_signal(tpool, 2); + fail_unless(!r, "pool-b signal"); + Curl_thrdpool_await_idle(tpool, 0); + Curl_thrdpool_destroy(tpool, TRUE); + fail_unless(ctx.returned == ctx.total, "pool-b items returned missing"); + fail_unless(ctx.taken == ctx.total, "pool-b items taken missing"); + + /* pool with minimum, consumes everything without signal */ + unit3300_ctx_init(&ctx, 10, 0); + r = Curl_thrdpool_create(&tpool, "unit3300c", 1, 2, 0, + unit3300_take, unit3300_process, unit3300_return, + &ctx); + fail_unless(!r, "pool-c create"); + Curl_thrdpool_await_idle(tpool, 0); + Curl_thrdpool_destroy(tpool, TRUE); + fail_unless(ctx.returned == ctx.total, "pool-c items returned missing"); + fail_unless(ctx.taken == ctx.total, "pool-c items taken missing"); + + /* pool with many max, signal abundance, consumes everything */ + unit3300_ctx_init(&ctx, 100, 0); + r = Curl_thrdpool_create(&tpool, "unit3300d", 0, 50, 0, + unit3300_take, unit3300_process, unit3300_return, + &ctx); + fail_unless(!r, "pool-d create"); + r = Curl_thrdpool_signal(tpool, 100); + fail_unless(!r, "pool-d signal"); + Curl_thrdpool_await_idle(tpool, 0); + Curl_thrdpool_destroy(tpool, TRUE); + fail_unless(ctx.returned == ctx.total, "pool-d items returned missing"); + fail_unless(ctx.taken == ctx.total, "pool-d items taken missing"); + + /* pool with 1 max, many to take, no await, destroy without join */ + unit3300_ctx_init(&ctx, 10000000, 1); + r = Curl_thrdpool_create(&tpool, "unit3300e", 0, 1, 0, + unit3300_take, unit3300_process, unit3300_return, + &ctx); + fail_unless(!r, "pool-e create"); + r = Curl_thrdpool_signal(tpool, 100); + fail_unless(!r, "pool-e signal"); + Curl_thrdpool_destroy(tpool, FALSE); + fail_unless(ctx.returned < ctx.total, "pool-e returned all"); + fail_unless(ctx.taken < ctx.total, "pool-e took all"); +#ifdef DEBUGBUILD + /* pool thread will notice destruction and should immediately abort. + * No memory leak should be reported. if the wait is too short on + * a slow system, thread sanitizer will freak out as memdebug will + * be called by threads after main thread shut down. */ + curlx_wait_ms(1000); +#endif + + UNITTEST_END_SIMPLE +} + +#else + +static CURLcode test_unit3300(const char *arg) +{ + UNITTEST_BEGIN_SIMPLE + (void)arg; + UNITTEST_END_SIMPLE +} +#endif /* USE_THREADS */ diff --git a/tests/unit/unit3301.c b/tests/unit/unit3301.c new file mode 100644 index 0000000000..9ded7bd93f --- /dev/null +++ b/tests/unit/unit3301.c @@ -0,0 +1,144 @@ +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) Daniel Stenberg, , et al. + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + * SPDX-License-Identifier: curl + * + ***************************************************************************/ +#include "unitcheck.h" + +#include "curlx/wait.h" +#include "thrdqueue.h" +#include "curl_threads.h" + +#ifdef USE_THREADS + +struct unit3301_item { + int id; + BIT(processed); +}; + +struct unit3301_ctx { + volatile int event; +}; + +static struct unit3301_item *unit3301_item_create(int id) +{ + struct unit3301_item *uitem; + uitem = curlx_calloc(1, sizeof(*uitem)); + if(uitem) { + uitem->id = id; + curl_mfprintf(stderr, "created item %d\n", uitem->id); + } + return uitem; +} + +static void unit3301_item_free(void *item) +{ + struct unit3301_item *uitem = item; + curl_mfprintf(stderr, "free item %d\n", uitem->id); + curlx_free(uitem); +} + +static void unit3301_event(const struct curl_thrdq *tqueue, + Curl_thrdq_event ev, + void *user_data) +{ + struct unit3301_ctx *ctx = user_data; + (void)tqueue; + switch(ev) { + case CURL_THRDQ_EV_ITEM_DONE: + ctx->event = 1; + break; + default: + break; + } +} + +static void unit3301_process(void *item) +{ + struct unit3301_item *uitem = item; + curlx_wait_ms(1); + uitem->processed = TRUE; +} + +static CURLcode test_unit3301(const char *arg) +{ + UNITTEST_BEGIN_SIMPLE + struct curl_thrdq *tqueue; + struct unit3301_ctx ctx; + int i, count, nrecvd; + CURLcode r; + + /* create and teardown queue */ + memset(&ctx, 0, sizeof(ctx)); + r = Curl_thrdq_create(&tqueue, "unit3301-a", 0, 0, 2, 1, + unit3301_item_free, unit3301_process, unit3301_event, + &ctx); + fail_unless(!r, "queue-a create"); + Curl_thrdq_destroy(tqueue, TRUE); + tqueue = NULL; + fail_unless(!ctx.event, "queue-a unexpected done count"); + + /* create queue, have it process `count` items */ + count = 10; + memset(&ctx, 0, sizeof(ctx)); + r = Curl_thrdq_create(&tqueue, "unit3301-b", 0, 0, 2, 1, + unit3301_item_free, unit3301_process, unit3301_event, + &ctx); + fail_unless(!r, "queue-b create"); + for(i = 0; i < count; ++i) { + struct unit3301_item *uitem = unit3301_item_create(i); + fail_unless(uitem, "queue-b item create"); + r = Curl_thrdq_send(tqueue, uitem, NULL, 0); + fail_unless(!r, "queue-b send"); + } + + r = Curl_thrdq_await_done(tqueue, 0); + fail_unless(!r, "queue-b await done"); + + nrecvd = 0; + for(i = 0; i < count; ++i) { + void *item; + r = Curl_thrdq_recv(tqueue, &item); + fail_unless(!r, "queue-b recv"); + if(item) { + struct unit3301_item *uitem = item; + curl_mfprintf(stderr, "received item %d\n", uitem->id); + ++nrecvd; + fail_unless(uitem->processed, "queue-b recv unprocessed item"); + unit3301_item_free(item); + } + } + Curl_thrdq_destroy(tqueue, TRUE); + tqueue = NULL; + fail_unless(nrecvd == count, "queue-b unexpected done count"); + + UNITTEST_END_SIMPLE +} + +#else + +static CURLcode test_unit3301(const char *arg) +{ + UNITTEST_BEGIN_SIMPLE + (void)arg; + UNITTEST_END_SIMPLE +} +#endif /* USE_THREADS */