From 015b017973d47f3047f8f4d7349c937fefd30f99 Mon Sep 17 00:00:00 2001 From: Slobodan Predolac Date: Fri, 28 Mar 2025 07:35:53 -0700 Subject: [PATCH] [thread_event] Add support for user events in thread events when stats are enabled --- include/jemalloc/internal/tcache_externs.h | 8 - include/jemalloc/internal/thread_event.h | 6 +- .../jemalloc/internal/thread_event_registry.h | 96 ++++++-- include/jemalloc/internal/witness.h | 1 + src/ctl.c | 19 ++ src/jemalloc.c | 1 + src/peak_event.c | 4 +- src/prof.c | 10 +- src/prof_threshold.c | 6 +- src/stats.c | 5 +- src/tcache.c | 4 +- src/thread_event.c | 98 ++++++-- src/thread_event_registry.c | 233 +++++++++++++++++- test/unit/mallctl.c | 40 ++- test/unit/thread_event.c | 20 ++ 15 files changed, 470 insertions(+), 81 deletions(-) diff --git a/include/jemalloc/internal/tcache_externs.h b/include/jemalloc/internal/tcache_externs.h index 024314fe..76d601c3 100644 --- a/include/jemalloc/internal/tcache_externs.h +++ b/include/jemalloc/internal/tcache_externs.h @@ -82,14 +82,6 @@ void tcache_enabled_set(tsd_t *tsd, bool enabled); void tcache_assert_initialized(tcache_t *tcache); -/* Only accessed by thread event. */ -uint64_t tcache_gc_new_event_wait(tsd_t *tsd); -uint64_t tcache_gc_postponed_event_wait(tsd_t *tsd); -void tcache_gc_event_handler(tsd_t *tsd, uint64_t elapsed); -uint64_t tcache_gc_dalloc_new_event_wait(tsd_t *tsd); -uint64_t tcache_gc_dalloc_postponed_event_wait(tsd_t *tsd); -void tcache_gc_dalloc_event_handler(tsd_t *tsd, uint64_t elapsed); - extern te_base_cb_t tcache_gc_te_handler; #endif /* JEMALLOC_INTERNAL_TCACHE_EXTERNS_H */ diff --git a/include/jemalloc/internal/thread_event.h b/include/jemalloc/internal/thread_event.h index e9631cbd..bf9ca3cc 100644 --- a/include/jemalloc/internal/thread_event.h +++ b/include/jemalloc/internal/thread_event.h @@ -48,10 +48,12 @@ void te_assert_invariants_debug(tsd_t *tsd); void te_event_trigger(tsd_t *tsd, te_ctx_t *ctx); void te_recompute_fast_threshold(tsd_t *tsd); void tsd_te_init(tsd_t *tsd); +void te_adjust_thresholds_helper(tsd_t *tsd, te_ctx_t *ctx, + uint64_t wait); /* List of all thread event counters. */ -#define ITERATE_OVER_ALL_COUNTERS \ - C(thread_allocated) \ +#define ITERATE_OVER_ALL_COUNTERS \ + C(thread_allocated) \ C(thread_allocated_last_event) \ C(prof_sample_last_event) \ C(stats_interval_last_event) diff --git a/include/jemalloc/internal/thread_event_registry.h b/include/jemalloc/internal/thread_event_registry.h index aee7a4f2..1957e727 100644 --- a/include/jemalloc/internal/thread_event_registry.h +++ b/include/jemalloc/internal/thread_event_registry.h @@ -2,37 +2,41 @@ #define JEMALLOC_INTERNAL_THREAD_EVENT_REGISTRY_H #include "jemalloc/internal/jemalloc_preamble.h" -#include "jemalloc/internal/tsd.h" +#include "jemalloc/internal/tsd_types.h" + +#define TE_MAX_USER_EVENTS 4 /* "te" is short for "thread_event" */ enum te_alloc_e { #ifdef JEMALLOC_PROF - te_alloc_prof_sample, + te_alloc_prof_sample, #endif - te_alloc_stats_interval, + te_alloc_stats_interval, + te_alloc_tcache_gc, #ifdef JEMALLOC_STATS - te_alloc_prof_threshold, + te_alloc_prof_threshold, + te_alloc_peak, #endif - te_alloc_tcache_gc, -#ifdef JEMALLOC_STATS - te_alloc_peak, - te_alloc_last = te_alloc_peak, -#else - te_alloc_last = te_alloc_tcache_gc, -#endif - te_alloc_count = te_alloc_last + 1 + te_alloc_user0, + te_alloc_user1, + te_alloc_user2, + te_alloc_user3, + te_alloc_last = te_alloc_user3, + te_alloc_count = te_alloc_last + 1 }; typedef enum te_alloc_e te_alloc_t; enum te_dalloc_e { - te_dalloc_tcache_gc, + te_dalloc_tcache_gc, #ifdef JEMALLOC_STATS - te_dalloc_peak, - te_dalloc_last = te_dalloc_peak, -#else - te_dalloc_last = te_dalloc_tcache_gc, + te_dalloc_peak, #endif - te_dalloc_count = te_dalloc_last + 1 + te_dalloc_user0, + te_dalloc_user1, + te_dalloc_user2, + te_dalloc_user3, + te_dalloc_last = te_dalloc_user3, + te_dalloc_count = te_dalloc_last + 1 }; typedef enum te_dalloc_e te_dalloc_t; @@ -42,17 +46,63 @@ struct te_data_s { uint64_t alloc_wait[te_alloc_count]; uint64_t dalloc_wait[te_dalloc_count]; }; -#define TE_DATA_INITIALIZER { {0}, {0} } +#define TE_DATA_INITIALIZER \ + { \ + {0}, { \ + 0 \ + } \ + } + +/* + * Check if user event is installed, installed and enabled, or not + * installed. + * + */ +enum te_enabled_e { te_enabled_not_installed, te_enabled_yes, te_enabled_no }; +typedef enum te_enabled_e te_enabled_t; typedef struct te_base_cb_s te_base_cb_t; struct te_base_cb_s { - bool (*enabled)(void); - uint64_t (*new_event_wait)(tsd_t *tsd); - uint64_t (*postponed_event_wait)(tsd_t *tsd); - void (*event_handler)(tsd_t *tsd); + te_enabled_t (*enabled)(void); + uint64_t (*new_event_wait)(tsd_t *tsd); + uint64_t (*postponed_event_wait)(tsd_t *tsd); + void (*event_handler)(tsd_t *tsd); }; extern te_base_cb_t *te_alloc_handlers[te_alloc_count]; extern te_base_cb_t *te_dalloc_handlers[te_dalloc_count]; +bool experimental_thread_events_boot(void); + +/* + * User callback for thread events + * + * is_alloc - true if event is allocation, false if event is free + * tallocated - number of bytes allocated on current thread so far + * tdallocated - number of bytes allocated on current thread so far + */ +typedef void (*user_event_cb_t)( + bool is_alloc, uint64_t tallocated, uint64_t tdallocated); + +typedef struct user_hook_object_s user_hook_object_t; +struct user_hook_object_s { + user_event_cb_t callback; + uint64_t interval; + bool is_alloc_only; +}; + +/* + * register user callback + * + * return zero if event was registered + * + * if interval is zero or callback is NULL, or + * no more slots are available event will not be registered + * and non-zero value will be returned + * + */ +int te_register_user_handler(tsdn_t *tsdn, user_hook_object_t *te_uobj); + +te_enabled_t te_user_event_enabled(size_t ue_idx, bool is_alloc); + #endif /* JEMALLOC_INTERNAL_THREAD_EVENT_REGISTRY_H */ diff --git a/include/jemalloc/internal/witness.h b/include/jemalloc/internal/witness.h index afee1246..acf7860d 100644 --- a/include/jemalloc/internal/witness.h +++ b/include/jemalloc/internal/witness.h @@ -78,6 +78,7 @@ enum witness_rank_e { WITNESS_RANK_PROF_RECENT_ALLOC = WITNESS_RANK_LEAF, WITNESS_RANK_PROF_STATS = WITNESS_RANK_LEAF, WITNESS_RANK_PROF_THREAD_ACTIVE_INIT = WITNESS_RANK_LEAF, + WITNESS_RANK_THREAD_EVENTS_USER = WITNESS_RANK_LEAF, }; typedef enum witness_rank_e witness_rank_t; diff --git a/src/ctl.c b/src/ctl.c index a30adc52..4f06363a 100644 --- a/src/ctl.c +++ b/src/ctl.c @@ -362,6 +362,7 @@ CTL_PROTO(experimental_hooks_prof_dump) CTL_PROTO(experimental_hooks_prof_sample) CTL_PROTO(experimental_hooks_prof_sample_free) CTL_PROTO(experimental_hooks_prof_threshold) +CTL_PROTO(experimental_hooks_thread_event) CTL_PROTO(experimental_hooks_safety_check_abort) CTL_PROTO(experimental_thread_activity_callback) CTL_PROTO(experimental_utilization_query) @@ -976,6 +977,7 @@ static const ctl_named_node_t experimental_hooks_node[] = { {NAME("prof_sample_free"), CTL(experimental_hooks_prof_sample_free)}, {NAME("prof_threshold"), CTL(experimental_hooks_prof_threshold)}, {NAME("safety_check_abort"), CTL(experimental_hooks_safety_check_abort)}, + {NAME("thread_event"), CTL(experimental_hooks_thread_event)}, }; static const ctl_named_node_t experimental_thread_node[] = { @@ -3818,6 +3820,23 @@ label_return: return ret; } +static int +experimental_hooks_thread_event_ctl(tsd_t *tsd, const size_t *mib, + size_t miblen, void *oldp, size_t *oldlenp, void *newp, size_t newlen) { + int ret; + + if (newp == NULL) { + ret = EINVAL; + goto label_return; + } + + user_hook_object_t t_new = {NULL, 0, false}; + WRITE(t_new, user_hook_object_t); + ret = te_register_user_handler(tsd_tsdn(tsd), &t_new); + +label_return: + return ret; +} /* For integration test purpose only. No plan to move out of experimental. */ static int diff --git a/src/jemalloc.c b/src/jemalloc.c index d958c8ca..a4509e68 100644 --- a/src/jemalloc.c +++ b/src/jemalloc.c @@ -1965,6 +1965,7 @@ malloc_init_hard_a0_locked(void) { return true; } hook_boot(); + experimental_thread_events_boot(); /* * Create enough scaffolding to allow recursive allocation in * malloc_ncpus(). diff --git a/src/peak_event.c b/src/peak_event.c index 430bfdea..e7f3ced6 100644 --- a/src/peak_event.c +++ b/src/peak_event.c @@ -58,9 +58,9 @@ peak_event_handler(tsd_t *tsd) { peak_event_activity_callback(tsd); } -static bool +static te_enabled_t peak_event_enabled(void) { - return config_stats; + return config_stats ? te_enabled_yes : te_enabled_no; } /* Handles alloc and dalloc */ diff --git a/src/prof.c b/src/prof.c index 94eddb6d..ec13afbd 100644 --- a/src/prof.c +++ b/src/prof.c @@ -306,11 +306,6 @@ prof_sample_event_handler(tsd_t *tsd) { } } -static bool -prof_sample_enabled(void) { - return config_prof && opt_prof; -} - uint64_t tsd_prof_sample_event_wait_get(tsd_t *tsd) { #ifdef JEMALLOC_PROF @@ -321,6 +316,11 @@ tsd_prof_sample_event_wait_get(tsd_t *tsd) { #endif } +static te_enabled_t +prof_sample_enabled(void) { + return config_prof && opt_prof ? te_enabled_yes : te_enabled_no; +} + te_base_cb_t prof_sample_te_handler = { .enabled = &prof_sample_enabled, .new_event_wait = &prof_sample_new_event_wait, diff --git a/src/prof_threshold.c b/src/prof_threshold.c index 516b0bf6..0b5cb53c 100644 --- a/src/prof_threshold.c +++ b/src/prof_threshold.c @@ -27,7 +27,7 @@ prof_threshold_hook_get(void) { } /* Invoke callback for threshold reached */ -static void +static inline void prof_threshold_update(tsd_t *tsd) { prof_threshold_hook_t prof_threshold_hook = prof_threshold_hook_get(); if (prof_threshold_hook == NULL) { @@ -56,9 +56,9 @@ prof_threshold_event_handler(tsd_t *tsd) { prof_threshold_update(tsd); } -static bool +static te_enabled_t prof_threshold_enabled(void) { - return config_stats; + return config_stats ? te_enabled_yes : te_enabled_no; } te_base_cb_t prof_threshold_te_handler = { diff --git a/src/stats.c b/src/stats.c index efc73223..b2a00319 100644 --- a/src/stats.c +++ b/src/stats.c @@ -2141,9 +2141,9 @@ stats_interval_event_handler(tsd_t *tsd) { } } -static bool +static te_enabled_t stats_interval_enabled(void) { - return opt_stats_interval >= 0; + return opt_stats_interval >= 0 ? te_enabled_yes : te_enabled_no; } te_base_cb_t stats_interval_te_handler = { @@ -2153,7 +2153,6 @@ te_base_cb_t stats_interval_te_handler = { .event_handler = &stats_interval_event_handler, }; - bool stats_boot(void) { uint64_t stats_interval; diff --git a/src/tcache.c b/src/tcache.c index 36af7d97..0154403d 100644 --- a/src/tcache.c +++ b/src/tcache.c @@ -1901,9 +1901,9 @@ void tcache_assert_initialized(tcache_t *tcache) { assert(!cache_bin_still_zero_initialized(&tcache->bins[0])); } -static bool +static te_enabled_t tcache_gc_enabled(void) { - return (opt_tcache_gc_incr_bytes > 0); + return (opt_tcache_gc_incr_bytes > 0) ? te_enabled_yes : te_enabled_no; } /* Handles alloc and dalloc the same way */ diff --git a/src/thread_event.c b/src/thread_event.c index 0b1adcc1..496c16be 100644 --- a/src/thread_event.c +++ b/src/thread_event.c @@ -10,13 +10,13 @@ te_ctx_has_active_events(te_ctx_t *ctx) { assert(config_debug); if (ctx->is_alloc) { for (int i = 0; i < te_alloc_count; ++i) { - if (te_alloc_handlers[i]->enabled()) { + if (te_enabled_yes == te_alloc_handlers[i]->enabled()) { return true; } } } else { for (int i = 0; i < te_dalloc_count; ++i) { - if (te_dalloc_handlers[i]->enabled()) { + if (te_enabled_yes == te_dalloc_handlers[i]->enabled()) { return true; } } @@ -26,14 +26,17 @@ te_ctx_has_active_events(te_ctx_t *ctx) { static uint64_t te_next_event_compute(tsd_t *tsd, bool is_alloc) { - te_base_cb_t **handlers = is_alloc ? te_alloc_handlers : te_dalloc_handlers; - uint64_t *waits = is_alloc ? tsd_te_datap_get_unsafe(tsd)->alloc_wait : tsd_te_datap_get_unsafe(tsd)->dalloc_wait; + te_base_cb_t **handlers = is_alloc ? + te_alloc_handlers : te_dalloc_handlers; + uint64_t *waits = is_alloc ? + tsd_te_datap_get_unsafe(tsd)->alloc_wait : + tsd_te_datap_get_unsafe(tsd)->dalloc_wait; int count = is_alloc ? te_alloc_count : te_dalloc_count; - + uint64_t wait = TE_MAX_START_WAIT; for (int i = 0; i < count; i++) { - if (handlers[i]->enabled()) { + if (te_enabled_yes == handlers[i]->enabled()) { uint64_t ev_wait = waits[i]; assert(ev_wait <= TE_MAX_START_WAIT); if (ev_wait > 0U && ev_wait < wait) { @@ -41,7 +44,6 @@ te_next_event_compute(tsd_t *tsd, bool is_alloc) { } } } - return wait; } @@ -64,6 +66,19 @@ te_assert_invariants_impl(tsd_t *tsd, te_ctx_t *ctx) { /* The subtraction is intentionally susceptible to underflow. */ assert(current_bytes - last_event < interval); + + /* This computation assumes that event did not become active in the + * time since the last trigger. This works fine if waits for inactive + * events are initialized with 0 as those are ignored + * If we wanted to initialize user events to anything other than + * zero, computation would take it into account and min_wait could + * be smaller than interval (as it was not part of the calc setting + * next_event). + * + * If we ever wanted to unregister the events assert would also + * need to account for the possibility that next_event was set, by + * event that is now gone + */ uint64_t min_wait = te_next_event_compute(tsd, te_ctx_is_alloc(ctx)); /* * next_event should have been pushed up only except when no event is @@ -161,8 +176,8 @@ te_recompute_fast_threshold(tsd_t *tsd) { } } -static void -te_adjust_thresholds_helper(tsd_t *tsd, te_ctx_t *ctx, +static inline void +te_adjust_thresholds_impl(tsd_t *tsd, te_ctx_t *ctx, uint64_t wait) { /* * The next threshold based on future events can only be adjusted after @@ -175,14 +190,21 @@ te_adjust_thresholds_helper(tsd_t *tsd, te_ctx_t *ctx, TE_MAX_INTERVAL ? wait : TE_MAX_INTERVAL); te_ctx_next_event_set(tsd, ctx, next_event); } +void +te_adjust_thresholds_helper(tsd_t *tsd, te_ctx_t *ctx, + uint64_t wait) { + te_adjust_thresholds_impl(tsd, ctx, wait); +} static void te_init_waits(tsd_t *tsd, uint64_t *wait, bool is_alloc) { te_base_cb_t **handlers = is_alloc ? te_alloc_handlers : te_dalloc_handlers; - uint64_t *waits = is_alloc ? tsd_te_datap_get_unsafe(tsd)->alloc_wait : tsd_te_datap_get_unsafe(tsd)->dalloc_wait; + uint64_t *waits = is_alloc ? + tsd_te_datap_get_unsafe(tsd)->alloc_wait : + tsd_te_datap_get_unsafe(tsd)->dalloc_wait; int count = is_alloc ? te_alloc_count : te_dalloc_count; for (int i = 0; i < count; i++) { - if (handlers[i]->enabled()) { + if (te_enabled_yes == handlers[i]->enabled()) { uint64_t ev_wait = handlers[i]->new_event_wait(tsd); assert(ev_wait > 0); waits[i] = ev_wait; @@ -229,7 +251,8 @@ te_update_alloc_events(tsd_t *tsd, te_base_cb_t **to_trigger, size_t nto_trigger = 0; uint64_t *waits = tsd_te_datap_get_unsafe(tsd)->alloc_wait; if (opt_tcache_gc_incr_bytes > 0) { - assert(te_alloc_handlers[te_alloc_tcache_gc]->enabled()); + assert(te_enabled_yes == + te_alloc_handlers[te_alloc_tcache_gc]->enabled()); if (te_update_wait(tsd, accumbytes, allow, &waits[te_alloc_tcache_gc], wait, te_alloc_handlers[te_alloc_tcache_gc], @@ -240,7 +263,8 @@ te_update_alloc_events(tsd_t *tsd, te_base_cb_t **to_trigger, } #ifdef JEMALLOC_PROF if (opt_prof) { - assert(te_alloc_handlers[te_alloc_prof_sample]->enabled()); + assert(te_enabled_yes == + te_alloc_handlers[te_alloc_prof_sample]->enabled()); if(te_update_wait(tsd, accumbytes, allow, &waits[te_alloc_prof_sample], wait, te_alloc_handlers[te_alloc_prof_sample], 0)) { @@ -255,27 +279,44 @@ te_update_alloc_events(tsd_t *tsd, te_base_cb_t **to_trigger, wait, te_alloc_handlers[te_alloc_stats_interval], stats_interval_accum_batch)) { - assert(te_alloc_handlers[te_alloc_stats_interval]->enabled()); + assert(te_enabled_yes == + te_alloc_handlers[te_alloc_stats_interval]->enabled()); to_trigger[nto_trigger++] = te_alloc_handlers[te_alloc_stats_interval]; } } #ifdef JEMALLOC_STATS - assert(te_alloc_handlers[te_alloc_peak]->enabled()); + assert(te_enabled_yes == te_alloc_handlers[te_alloc_peak]->enabled()); if(te_update_wait(tsd, accumbytes, allow, &waits[te_alloc_peak], wait, te_alloc_handlers[te_alloc_peak], PEAK_EVENT_WAIT)) { to_trigger[nto_trigger++] = te_alloc_handlers[te_alloc_peak]; } - assert(te_alloc_handlers[te_alloc_prof_threshold]->enabled()); + assert(te_enabled_yes == + te_alloc_handlers[te_alloc_prof_threshold]->enabled()); if(te_update_wait(tsd, accumbytes, allow, &waits[te_alloc_prof_threshold], wait, te_alloc_handlers[te_alloc_prof_threshold], 1 << opt_experimental_lg_prof_threshold)) { - to_trigger[nto_trigger++] = te_alloc_handlers[te_alloc_prof_threshold]; + to_trigger[nto_trigger++] = + te_alloc_handlers[te_alloc_prof_threshold]; } #endif + + for (te_alloc_t ue = te_alloc_user0; ue <= te_alloc_user3; ue++) { + te_enabled_t status = + te_user_event_enabled(ue - te_alloc_user0, true); + if (status == te_enabled_not_installed) { + break; + } else if (status == te_enabled_yes) { + if (te_update_wait(tsd, accumbytes, allow, &waits[ue], + wait, te_alloc_handlers[ue], 0)) { + to_trigger[nto_trigger++] = + te_alloc_handlers[ue]; + } + } + } return nto_trigger; } @@ -285,7 +326,8 @@ te_update_dalloc_events(tsd_t *tsd, te_base_cb_t **to_trigger, uint64_t accumbyt size_t nto_trigger = 0; uint64_t *waits = tsd_te_datap_get_unsafe(tsd)->dalloc_wait; if (opt_tcache_gc_incr_bytes > 0) { - assert(te_dalloc_handlers[te_dalloc_tcache_gc]->enabled()); + assert(te_enabled_yes == + te_dalloc_handlers[te_dalloc_tcache_gc]->enabled()); if (te_update_wait(tsd, accumbytes, allow, &waits[te_dalloc_tcache_gc], wait, te_dalloc_handlers[te_dalloc_tcache_gc], @@ -295,12 +337,26 @@ te_update_dalloc_events(tsd_t *tsd, te_base_cb_t **to_trigger, uint64_t accumbyt } } #ifdef JEMALLOC_STATS - assert(te_dalloc_handlers[te_dalloc_peak]->enabled()); + assert(te_enabled_yes == te_dalloc_handlers[te_dalloc_peak]->enabled()); if(te_update_wait(tsd, accumbytes, allow, &waits[te_dalloc_peak], wait, - te_dalloc_handlers[te_dalloc_peak], PEAK_EVENT_WAIT)) { + te_dalloc_handlers[te_dalloc_peak], + PEAK_EVENT_WAIT)) { to_trigger[nto_trigger++] = te_dalloc_handlers[te_dalloc_peak]; } #endif + for (te_dalloc_t ue = te_dalloc_user0; ue <= te_dalloc_user3; ue++) { + te_enabled_t status = + te_user_event_enabled(ue - te_dalloc_user0, false); + if (status == te_enabled_not_installed) { + break; + } else if (status == te_enabled_yes) { + if (te_update_wait(tsd, accumbytes, allow, &waits[ue], + wait, te_dalloc_handlers[ue], 0)) { + to_trigger[nto_trigger++] = + te_dalloc_handlers[ue]; + } + } + } return nto_trigger; } @@ -362,7 +418,7 @@ te_init(tsd_t *tsd, bool is_alloc) { uint64_t wait = TE_MAX_START_WAIT; te_init_waits(tsd, &wait, is_alloc); - te_adjust_thresholds_helper(tsd, &ctx, wait); + te_adjust_thresholds_impl(tsd, &ctx, wait); } void diff --git a/src/thread_event_registry.c b/src/thread_event_registry.c index 7543cfda..f5408178 100644 --- a/src/thread_event_registry.c +++ b/src/thread_event_registry.c @@ -3,30 +3,160 @@ #include "jemalloc/internal/thread_event.h" #include "jemalloc/internal/thread_event_registry.h" -#include "jemalloc/internal/thread_event_registry.h" #include "jemalloc/internal/tcache_externs.h" #include "jemalloc/internal/peak_event.h" #include "jemalloc/internal/prof_externs.h" #include "jemalloc/internal/prof_threshold.h" #include "jemalloc/internal/stats.h" +static malloc_mutex_t uevents_mu; -/* Table of all the thread events. - * Events share interface, but internally they will know thier - * data layout in tsd. +bool +experimental_thread_events_boot(void) { + return malloc_mutex_init(&uevents_mu, "thread_events", + WITNESS_RANK_THREAD_EVENTS_USER, malloc_mutex_rank_exclusive); +} + +#define TE_REGISTER_ERRCODE_FULL_SLOTS -1 +#define TE_REGISTER_ERRCODE_ALREADY_REGISTERED -2 + +static user_hook_object_t uevents_storage[TE_MAX_USER_EVENTS] = { + {NULL, 0, false}, +}; + +static atomic_p_t uevent_obj_p[TE_MAX_USER_EVENTS] = { + NULL, +}; + +static inline bool +user_object_eq(user_hook_object_t *lhs, user_hook_object_t *rhs) { + assert(lhs != NULL && rhs != NULL); + + return lhs->callback == rhs->callback && lhs->interval == rhs->interval + && lhs->is_alloc_only == rhs->is_alloc_only; +} + +/* + * Return slot number that event is registered at on success + * it will be [0, TE_MAX_USER_EVENTS) + * Return negative value on some error */ +static inline int +te_register_user_handler_locked(user_hook_object_t *new_obj) { + /* Attempt to find the free slot in global register */ + for (int i = 0; i < TE_MAX_USER_EVENTS; ++i) { + user_hook_object_t *p = (user_hook_object_t *)atomic_load_p( + &uevent_obj_p[i], ATOMIC_ACQUIRE); + + if (p && user_object_eq(p, new_obj)) { + /* Same callback and interval are registered - no error. */ + return TE_REGISTER_ERRCODE_ALREADY_REGISTERED; + } else if (p == NULL) { + /* Empty slot */ + uevents_storage[i] = *new_obj; + atomic_fence(ATOMIC_SEQ_CST); + atomic_store_p(&uevent_obj_p[i], &uevents_storage[i], + ATOMIC_RELEASE); + return i; + } + } + + return TE_REGISTER_ERRCODE_FULL_SLOTS; +} + +static inline user_hook_object_t * +uobj_get(size_t cb_idx) { + assert(cb_idx < TE_MAX_USER_EVENTS); + return (user_hook_object_t *)atomic_load_p( + &uevent_obj_p[cb_idx], ATOMIC_ACQUIRE); +} + +te_enabled_t +te_user_event_enabled(size_t ue_idx, bool is_alloc) { + assert(ue_idx < TE_MAX_USER_EVENTS); + user_hook_object_t *obj = uobj_get(ue_idx); + if (!obj) { + return te_enabled_not_installed; + } + if (is_alloc || !obj->is_alloc_only) { + return te_enabled_yes; + } + return te_enabled_no; +} + +static inline uint64_t +new_event_wait(size_t cb_idx) { + user_hook_object_t *obj = uobj_get(cb_idx); + /* Enabled should have guarded it */ + assert(obj); + return obj->interval; +} + +static uint64_t +postponed_event_wait(tsd_t *tsd) { + return TE_MIN_START_WAIT; +} + +static inline void +handler_wrapper(tsd_t *tsd, bool is_alloc, size_t cb_idx) { + user_hook_object_t *obj = uobj_get(cb_idx); + /* Enabled should have guarded it */ + assert(obj); + uint64_t alloc = tsd_thread_allocated_get(tsd); + uint64_t dalloc = tsd_thread_deallocated_get(tsd); + + pre_reentrancy(tsd, NULL); + obj->callback(is_alloc, alloc, dalloc); + post_reentrancy(tsd); +} + +#define TE_USER_HANDLER_BINDING_IDX(i) \ + static te_enabled_t te_user_alloc_enabled##i(void) { \ + return te_user_event_enabled(i, true); \ + } \ + static te_enabled_t te_user_dalloc_enabled##i(void) { \ + return te_user_event_enabled(i, false); \ + } \ + static uint64_t te_user_new_event_wait_##i(tsd_t *tsd) { \ + return new_event_wait(i); \ + } \ + static void te_user_alloc_handler_call##i(tsd_t *tsd) { \ + handler_wrapper(tsd, true, i); \ + } \ + static void te_user_dalloc_handler_call##i(tsd_t *tsd) { \ + handler_wrapper(tsd, false, i); \ + } \ + static te_base_cb_t user_alloc_handler##i = { \ + .enabled = &te_user_alloc_enabled##i, \ + .new_event_wait = &te_user_new_event_wait_##i, \ + .postponed_event_wait = &postponed_event_wait, \ + .event_handler = &te_user_alloc_handler_call##i}; \ + static te_base_cb_t user_dalloc_handler##i = { \ + .enabled = &te_user_dalloc_enabled##i, \ + .new_event_wait = &te_user_new_event_wait_##i, \ + .postponed_event_wait = &postponed_event_wait, \ + .event_handler = &te_user_dalloc_handler_call##i} + +TE_USER_HANDLER_BINDING_IDX(0); +TE_USER_HANDLER_BINDING_IDX(1); +TE_USER_HANDLER_BINDING_IDX(2); +TE_USER_HANDLER_BINDING_IDX(3); + +/* Table of all the thread events. */ te_base_cb_t *te_alloc_handlers[te_alloc_count] = { #ifdef JEMALLOC_PROF - &prof_sample_te_handler, + &prof_sample_te_handler, #endif - &stats_interval_te_handler, + &stats_interval_te_handler, + &tcache_gc_te_handler, #ifdef JEMALLOC_STATS - &prof_threshold_te_handler, -#endif - &tcache_gc_te_handler, -#ifdef JEMALLOC_STATS - &peak_te_handler, + &prof_threshold_te_handler, + &peak_te_handler, #endif + &user_alloc_handler0, + &user_alloc_handler1, + &user_alloc_handler2, + &user_alloc_handler3 }; te_base_cb_t *te_dalloc_handlers[te_dalloc_count] = { @@ -34,4 +164,85 @@ te_base_cb_t *te_dalloc_handlers[te_dalloc_count] = { #ifdef JEMALLOC_STATS &peak_te_handler, #endif + &user_dalloc_handler0, + &user_dalloc_handler1, + &user_dalloc_handler2, + &user_dalloc_handler3 }; + +static inline bool +te_update_tsd(tsd_t *tsd, uint64_t new_wait, size_t ue_idx, bool is_alloc) { + bool needs_recompute = false; + te_ctx_t ctx; + uint64_t next, current, cur_wait; + + if (is_alloc) { + tsd_te_datap_get_unsafe(tsd) + ->alloc_wait[te_alloc_user0 + ue_idx] = new_wait; + } else { + tsd_te_datap_get_unsafe(tsd) + ->dalloc_wait[te_dalloc_user0 + ue_idx] = new_wait; + } + te_ctx_get(tsd, &ctx, is_alloc); + + next = te_ctx_next_event_get(&ctx); + current = te_ctx_current_bytes_get(&ctx); + cur_wait = next - current; + + if (new_wait < cur_wait) { + /* + * Set last event to current (same as when te inits). This + * will make sure that all the invariants are correct, before + * we adjust next_event and next_event fast. + */ + te_ctx_last_event_set(&ctx, te_ctx_current_bytes_get(&ctx)); + te_adjust_thresholds_helper(tsd, &ctx, new_wait); + needs_recompute = true; + } + return needs_recompute; +} + +static inline void +te_recalculate_current_thread_data(tsdn_t *tsdn, int ue_idx, bool alloc_only) { + bool recompute = false; + /* we do not need lock to recalculate the events on the current thread */ + assert(ue_idx < TE_MAX_USER_EVENTS); + tsd_t *tsd = tsdn_null(tsdn) ? tsd_fetch() : tsdn_tsd(tsdn); + if (tsd) { + uint64_t new_wait = new_event_wait(ue_idx); + recompute = te_update_tsd(tsd, new_wait, ue_idx, true); + if (!alloc_only) { + recompute = te_update_tsd(tsd, new_wait, ue_idx, false) + || recompute; + } + + if (recompute) { + te_recompute_fast_threshold(tsd); + } + } +} + +int +te_register_user_handler(tsdn_t *tsdn, user_hook_object_t *te_uobj) { + int ret; + int reg_retcode; + if (!te_uobj || !te_uobj->callback || te_uobj->interval == 0) { + return EINVAL; + } + + malloc_mutex_lock(tsdn, &uevents_mu); + reg_retcode = te_register_user_handler_locked(te_uobj); + malloc_mutex_unlock(tsdn, &uevents_mu); + + if (reg_retcode >= 0) { + te_recalculate_current_thread_data( + tsdn, reg_retcode, te_uobj->is_alloc_only); + ret = 0; + } else if (reg_retcode == TE_REGISTER_ERRCODE_ALREADY_REGISTERED) { + ret = 0; + } else { + ret = EINVAL; + } + + return ret; +} diff --git a/test/unit/mallctl.c b/test/unit/mallctl.c index 68c3a705..838a4445 100644 --- a/test/unit/mallctl.c +++ b/test/unit/mallctl.c @@ -1347,6 +1347,43 @@ TEST_BEGIN(test_thread_activity_callback) { } TEST_END + + +static unsigned nuser_thread_event_cb_calls; +static void +user_thread_event_cb(bool is_alloc, uint64_t tallocated, uint64_t tdallocated) { + (void)tdallocated; + (void)tallocated; + ++nuser_thread_event_cb_calls; +} +static user_hook_object_t user_te_obj = { + .callback = user_thread_event_cb, + .interval = 100, + .is_alloc_only = false, +}; + +TEST_BEGIN(test_thread_event_hook) { + const size_t big_size = 10 * 1024 * 1024; + void *ptr; + int err; + + unsigned current_calls = nuser_thread_event_cb_calls; + err = mallctl("experimental.hooks.thread_event", NULL, 0, + &user_te_obj, sizeof(user_te_obj)); + assert_d_eq(0, err, ""); + + err = mallctl("experimental.hooks.thread_event", NULL, 0, + &user_te_obj, sizeof(user_te_obj)); + assert_d_eq(0, err, "Not an error to provide object with same interval and cb"); + + + ptr = mallocx(big_size, 0); + free(ptr); + expect_u64_lt(current_calls, nuser_thread_event_cb_calls, ""); +} +TEST_END + + int main(void) { return test( @@ -1387,5 +1424,6 @@ main(void) { test_hooks_exhaustion, test_thread_idle, test_thread_peak, - test_thread_activity_callback); + test_thread_activity_callback, + test_thread_event_hook); } diff --git a/test/unit/thread_event.c b/test/unit/thread_event.c index 8b4fb1d6..66d61cd2 100644 --- a/test/unit/thread_event.c +++ b/test/unit/thread_event.c @@ -1,5 +1,18 @@ #include "test/jemalloc_test.h" +static uint32_t nuser_hook_calls; +static bool is_registered = false; +static void +test_cb(bool is_alloc, uint64_t tallocated, uint64_t tdallocated) { + ++nuser_hook_calls; +} + +static user_hook_object_t tobj = { + .callback = &test_cb, + .interval = 10, + .is_alloc_only = false +}; + TEST_BEGIN(test_next_event_fast) { tsd_t *tsd = tsd_fetch(); te_ctx_t ctx; @@ -9,6 +22,12 @@ TEST_BEGIN(test_next_event_fast) { te_ctx_current_bytes_set(&ctx, TE_NEXT_EVENT_FAST_MAX - 8U); te_ctx_next_event_set(tsd, &ctx, TE_NEXT_EVENT_FAST_MAX); + if (!is_registered) { + is_registered = 0 == te_register_user_handler(tsd_tsdn(tsd), &tobj); + } + assert_true(is_registered || !config_stats, "Register user handler"); + nuser_hook_calls = 0; + uint64_t *waits = tsd_te_datap_get_unsafe(tsd)->alloc_wait; for (size_t i = 0; i < te_alloc_count; i++) { waits[i] = TE_NEXT_EVENT_FAST_MAX; @@ -16,6 +35,7 @@ TEST_BEGIN(test_next_event_fast) { /* Test next_event_fast rolling back to 0. */ void *p = malloc(16U); + assert_true(nuser_hook_calls == 1 || !config_stats, "Expected alloc call"); assert_ptr_not_null(p, "malloc() failed"); free(p);