[thread_event] Add support for user events in thread events when stats are enabled

This commit is contained in:
Slobodan Predolac 2025-03-28 07:35:53 -07:00
parent 387e9f0911
commit d49fc48ef1
15 changed files with 470 additions and 81 deletions

View file

@ -82,14 +82,6 @@ void tcache_enabled_set(tsd_t *tsd, bool enabled);
void tcache_assert_initialized(tcache_t *tcache);
/* Only accessed by thread event. */
uint64_t tcache_gc_new_event_wait(tsd_t *tsd);
uint64_t tcache_gc_postponed_event_wait(tsd_t *tsd);
void tcache_gc_event_handler(tsd_t *tsd, uint64_t elapsed);
uint64_t tcache_gc_dalloc_new_event_wait(tsd_t *tsd);
uint64_t tcache_gc_dalloc_postponed_event_wait(tsd_t *tsd);
void tcache_gc_dalloc_event_handler(tsd_t *tsd, uint64_t elapsed);
extern te_base_cb_t tcache_gc_te_handler;
#endif /* JEMALLOC_INTERNAL_TCACHE_EXTERNS_H */

View file

@ -48,10 +48,12 @@ void te_assert_invariants_debug(tsd_t *tsd);
void te_event_trigger(tsd_t *tsd, te_ctx_t *ctx);
void te_recompute_fast_threshold(tsd_t *tsd);
void tsd_te_init(tsd_t *tsd);
void te_adjust_thresholds_helper(tsd_t *tsd, te_ctx_t *ctx,
uint64_t wait);
/* List of all thread event counters. */
#define ITERATE_OVER_ALL_COUNTERS \
C(thread_allocated) \
#define ITERATE_OVER_ALL_COUNTERS \
C(thread_allocated) \
C(thread_allocated_last_event) \
C(prof_sample_last_event) \
C(stats_interval_last_event)

View file

@ -2,37 +2,41 @@
#define JEMALLOC_INTERNAL_THREAD_EVENT_REGISTRY_H
#include "jemalloc/internal/jemalloc_preamble.h"
#include "jemalloc/internal/tsd.h"
#include "jemalloc/internal/tsd_types.h"
#define TE_MAX_USER_EVENTS 4
/* "te" is short for "thread_event" */
enum te_alloc_e {
#ifdef JEMALLOC_PROF
te_alloc_prof_sample,
te_alloc_prof_sample,
#endif
te_alloc_stats_interval,
te_alloc_stats_interval,
te_alloc_tcache_gc,
#ifdef JEMALLOC_STATS
te_alloc_prof_threshold,
te_alloc_prof_threshold,
te_alloc_peak,
#endif
te_alloc_tcache_gc,
#ifdef JEMALLOC_STATS
te_alloc_peak,
te_alloc_last = te_alloc_peak,
#else
te_alloc_last = te_alloc_tcache_gc,
#endif
te_alloc_count = te_alloc_last + 1
te_alloc_user0,
te_alloc_user1,
te_alloc_user2,
te_alloc_user3,
te_alloc_last = te_alloc_user3,
te_alloc_count = te_alloc_last + 1
};
typedef enum te_alloc_e te_alloc_t;
enum te_dalloc_e {
te_dalloc_tcache_gc,
te_dalloc_tcache_gc,
#ifdef JEMALLOC_STATS
te_dalloc_peak,
te_dalloc_last = te_dalloc_peak,
#else
te_dalloc_last = te_dalloc_tcache_gc,
te_dalloc_peak,
#endif
te_dalloc_count = te_dalloc_last + 1
te_dalloc_user0,
te_dalloc_user1,
te_dalloc_user2,
te_dalloc_user3,
te_dalloc_last = te_dalloc_user3,
te_dalloc_count = te_dalloc_last + 1
};
typedef enum te_dalloc_e te_dalloc_t;
@ -42,17 +46,63 @@ struct te_data_s {
uint64_t alloc_wait[te_alloc_count];
uint64_t dalloc_wait[te_dalloc_count];
};
#define TE_DATA_INITIALIZER { {0}, {0} }
#define TE_DATA_INITIALIZER \
{ \
{0}, { \
0 \
} \
}
/*
* Check if user event is installed, installed and enabled, or not
* installed.
*
*/
enum te_enabled_e { te_enabled_not_installed, te_enabled_yes, te_enabled_no };
typedef enum te_enabled_e te_enabled_t;
typedef struct te_base_cb_s te_base_cb_t;
struct te_base_cb_s {
bool (*enabled)(void);
uint64_t (*new_event_wait)(tsd_t *tsd);
uint64_t (*postponed_event_wait)(tsd_t *tsd);
void (*event_handler)(tsd_t *tsd);
te_enabled_t (*enabled)(void);
uint64_t (*new_event_wait)(tsd_t *tsd);
uint64_t (*postponed_event_wait)(tsd_t *tsd);
void (*event_handler)(tsd_t *tsd);
};
extern te_base_cb_t *te_alloc_handlers[te_alloc_count];
extern te_base_cb_t *te_dalloc_handlers[te_dalloc_count];
bool experimental_thread_events_boot(void);
/*
* User callback for thread events
*
* is_alloc - true if event is allocation, false if event is free
* tallocated - number of bytes allocated on current thread so far
* tdallocated - number of bytes allocated on current thread so far
*/
typedef void (*user_event_cb_t)(
bool is_alloc, uint64_t tallocated, uint64_t tdallocated);
typedef struct user_hook_object_s user_hook_object_t;
struct user_hook_object_s {
user_event_cb_t callback;
uint64_t interval;
bool is_alloc_only;
};
/*
* register user callback
*
* return zero if event was registered
*
* if interval is zero or callback is NULL, or
* no more slots are available event will not be registered
* and non-zero value will be returned
*
*/
int te_register_user_handler(tsdn_t *tsdn, user_hook_object_t *te_uobj);
te_enabled_t te_user_event_enabled(size_t ue_idx, bool is_alloc);
#endif /* JEMALLOC_INTERNAL_THREAD_EVENT_REGISTRY_H */

View file

@ -78,6 +78,7 @@ enum witness_rank_e {
WITNESS_RANK_PROF_RECENT_ALLOC = WITNESS_RANK_LEAF,
WITNESS_RANK_PROF_STATS = WITNESS_RANK_LEAF,
WITNESS_RANK_PROF_THREAD_ACTIVE_INIT = WITNESS_RANK_LEAF,
WITNESS_RANK_THREAD_EVENTS_USER = WITNESS_RANK_LEAF,
};
typedef enum witness_rank_e witness_rank_t;

View file

@ -363,6 +363,7 @@ CTL_PROTO(experimental_hooks_prof_dump)
CTL_PROTO(experimental_hooks_prof_sample)
CTL_PROTO(experimental_hooks_prof_sample_free)
CTL_PROTO(experimental_hooks_prof_threshold)
CTL_PROTO(experimental_hooks_thread_event)
CTL_PROTO(experimental_hooks_safety_check_abort)
CTL_PROTO(experimental_thread_activity_callback)
CTL_PROTO(experimental_utilization_query)
@ -979,6 +980,7 @@ static const ctl_named_node_t experimental_hooks_node[] = {
{NAME("prof_sample_free"), CTL(experimental_hooks_prof_sample_free)},
{NAME("prof_threshold"), CTL(experimental_hooks_prof_threshold)},
{NAME("safety_check_abort"), CTL(experimental_hooks_safety_check_abort)},
{NAME("thread_event"), CTL(experimental_hooks_thread_event)},
};
static const ctl_named_node_t experimental_thread_node[] = {
@ -3823,6 +3825,23 @@ label_return:
return ret;
}
static int
experimental_hooks_thread_event_ctl(tsd_t *tsd, const size_t *mib,
size_t miblen, void *oldp, size_t *oldlenp, void *newp, size_t newlen) {
int ret;
if (newp == NULL) {
ret = EINVAL;
goto label_return;
}
user_hook_object_t t_new = {NULL, 0, false};
WRITE(t_new, user_hook_object_t);
ret = te_register_user_handler(tsd_tsdn(tsd), &t_new);
label_return:
return ret;
}
/* For integration test purpose only. No plan to move out of experimental. */
static int

View file

@ -1970,6 +1970,7 @@ malloc_init_hard_a0_locked(void) {
return true;
}
hook_boot();
experimental_thread_events_boot();
/*
* Create enough scaffolding to allow recursive allocation in
* malloc_ncpus().

View file

@ -58,9 +58,9 @@ peak_event_handler(tsd_t *tsd) {
peak_event_activity_callback(tsd);
}
static bool
static te_enabled_t
peak_event_enabled(void) {
return config_stats;
return config_stats ? te_enabled_yes : te_enabled_no;
}
/* Handles alloc and dalloc */

View file

@ -306,11 +306,6 @@ prof_sample_event_handler(tsd_t *tsd) {
}
}
static bool
prof_sample_enabled(void) {
return config_prof && opt_prof;
}
uint64_t
tsd_prof_sample_event_wait_get(tsd_t *tsd) {
#ifdef JEMALLOC_PROF
@ -321,6 +316,11 @@ tsd_prof_sample_event_wait_get(tsd_t *tsd) {
#endif
}
static te_enabled_t
prof_sample_enabled(void) {
return config_prof && opt_prof ? te_enabled_yes : te_enabled_no;
}
te_base_cb_t prof_sample_te_handler = {
.enabled = &prof_sample_enabled,
.new_event_wait = &prof_sample_new_event_wait,

View file

@ -27,7 +27,7 @@ prof_threshold_hook_get(void) {
}
/* Invoke callback for threshold reached */
static void
static inline void
prof_threshold_update(tsd_t *tsd) {
prof_threshold_hook_t prof_threshold_hook = prof_threshold_hook_get();
if (prof_threshold_hook == NULL) {
@ -56,9 +56,9 @@ prof_threshold_event_handler(tsd_t *tsd) {
prof_threshold_update(tsd);
}
static bool
static te_enabled_t
prof_threshold_enabled(void) {
return config_stats;
return config_stats ? te_enabled_yes : te_enabled_no;
}
te_base_cb_t prof_threshold_te_handler = {

View file

@ -2142,9 +2142,9 @@ stats_interval_event_handler(tsd_t *tsd) {
}
}
static bool
static te_enabled_t
stats_interval_enabled(void) {
return opt_stats_interval >= 0;
return opt_stats_interval >= 0 ? te_enabled_yes : te_enabled_no;
}
te_base_cb_t stats_interval_te_handler = {
@ -2154,7 +2154,6 @@ te_base_cb_t stats_interval_te_handler = {
.event_handler = &stats_interval_event_handler,
};
bool
stats_boot(void) {
uint64_t stats_interval;

View file

@ -1901,9 +1901,9 @@ void tcache_assert_initialized(tcache_t *tcache) {
assert(!cache_bin_still_zero_initialized(&tcache->bins[0]));
}
static bool
static te_enabled_t
tcache_gc_enabled(void) {
return (opt_tcache_gc_incr_bytes > 0);
return (opt_tcache_gc_incr_bytes > 0) ? te_enabled_yes : te_enabled_no;
}
/* Handles alloc and dalloc the same way */

View file

@ -10,13 +10,13 @@ te_ctx_has_active_events(te_ctx_t *ctx) {
assert(config_debug);
if (ctx->is_alloc) {
for (int i = 0; i < te_alloc_count; ++i) {
if (te_alloc_handlers[i]->enabled()) {
if (te_enabled_yes == te_alloc_handlers[i]->enabled()) {
return true;
}
}
} else {
for (int i = 0; i < te_dalloc_count; ++i) {
if (te_dalloc_handlers[i]->enabled()) {
if (te_enabled_yes == te_dalloc_handlers[i]->enabled()) {
return true;
}
}
@ -26,14 +26,17 @@ te_ctx_has_active_events(te_ctx_t *ctx) {
static uint64_t
te_next_event_compute(tsd_t *tsd, bool is_alloc) {
te_base_cb_t **handlers = is_alloc ? te_alloc_handlers : te_dalloc_handlers;
uint64_t *waits = is_alloc ? tsd_te_datap_get_unsafe(tsd)->alloc_wait : tsd_te_datap_get_unsafe(tsd)->dalloc_wait;
te_base_cb_t **handlers = is_alloc ?
te_alloc_handlers : te_dalloc_handlers;
uint64_t *waits = is_alloc ?
tsd_te_datap_get_unsafe(tsd)->alloc_wait :
tsd_te_datap_get_unsafe(tsd)->dalloc_wait;
int count = is_alloc ? te_alloc_count : te_dalloc_count;
uint64_t wait = TE_MAX_START_WAIT;
for (int i = 0; i < count; i++) {
if (handlers[i]->enabled()) {
if (te_enabled_yes == handlers[i]->enabled()) {
uint64_t ev_wait = waits[i];
assert(ev_wait <= TE_MAX_START_WAIT);
if (ev_wait > 0U && ev_wait < wait) {
@ -41,7 +44,6 @@ te_next_event_compute(tsd_t *tsd, bool is_alloc) {
}
}
}
return wait;
}
@ -64,6 +66,19 @@ te_assert_invariants_impl(tsd_t *tsd, te_ctx_t *ctx) {
/* The subtraction is intentionally susceptible to underflow. */
assert(current_bytes - last_event < interval);
/* This computation assumes that event did not become active in the
* time since the last trigger. This works fine if waits for inactive
* events are initialized with 0 as those are ignored
* If we wanted to initialize user events to anything other than
* zero, computation would take it into account and min_wait could
* be smaller than interval (as it was not part of the calc setting
* next_event).
*
* If we ever wanted to unregister the events assert would also
* need to account for the possibility that next_event was set, by
* event that is now gone
*/
uint64_t min_wait = te_next_event_compute(tsd, te_ctx_is_alloc(ctx));
/*
* next_event should have been pushed up only except when no event is
@ -161,8 +176,8 @@ te_recompute_fast_threshold(tsd_t *tsd) {
}
}
static void
te_adjust_thresholds_helper(tsd_t *tsd, te_ctx_t *ctx,
static inline void
te_adjust_thresholds_impl(tsd_t *tsd, te_ctx_t *ctx,
uint64_t wait) {
/*
* The next threshold based on future events can only be adjusted after
@ -175,14 +190,21 @@ te_adjust_thresholds_helper(tsd_t *tsd, te_ctx_t *ctx,
TE_MAX_INTERVAL ? wait : TE_MAX_INTERVAL);
te_ctx_next_event_set(tsd, ctx, next_event);
}
void
te_adjust_thresholds_helper(tsd_t *tsd, te_ctx_t *ctx,
uint64_t wait) {
te_adjust_thresholds_impl(tsd, ctx, wait);
}
static void
te_init_waits(tsd_t *tsd, uint64_t *wait, bool is_alloc) {
te_base_cb_t **handlers = is_alloc ? te_alloc_handlers : te_dalloc_handlers;
uint64_t *waits = is_alloc ? tsd_te_datap_get_unsafe(tsd)->alloc_wait : tsd_te_datap_get_unsafe(tsd)->dalloc_wait;
uint64_t *waits = is_alloc ?
tsd_te_datap_get_unsafe(tsd)->alloc_wait :
tsd_te_datap_get_unsafe(tsd)->dalloc_wait;
int count = is_alloc ? te_alloc_count : te_dalloc_count;
for (int i = 0; i < count; i++) {
if (handlers[i]->enabled()) {
if (te_enabled_yes == handlers[i]->enabled()) {
uint64_t ev_wait = handlers[i]->new_event_wait(tsd);
assert(ev_wait > 0);
waits[i] = ev_wait;
@ -229,7 +251,8 @@ te_update_alloc_events(tsd_t *tsd, te_base_cb_t **to_trigger,
size_t nto_trigger = 0;
uint64_t *waits = tsd_te_datap_get_unsafe(tsd)->alloc_wait;
if (opt_tcache_gc_incr_bytes > 0) {
assert(te_alloc_handlers[te_alloc_tcache_gc]->enabled());
assert(te_enabled_yes ==
te_alloc_handlers[te_alloc_tcache_gc]->enabled());
if (te_update_wait(tsd, accumbytes, allow,
&waits[te_alloc_tcache_gc], wait,
te_alloc_handlers[te_alloc_tcache_gc],
@ -240,7 +263,8 @@ te_update_alloc_events(tsd_t *tsd, te_base_cb_t **to_trigger,
}
#ifdef JEMALLOC_PROF
if (opt_prof) {
assert(te_alloc_handlers[te_alloc_prof_sample]->enabled());
assert(te_enabled_yes ==
te_alloc_handlers[te_alloc_prof_sample]->enabled());
if(te_update_wait(tsd, accumbytes, allow,
&waits[te_alloc_prof_sample], wait,
te_alloc_handlers[te_alloc_prof_sample], 0)) {
@ -255,27 +279,44 @@ te_update_alloc_events(tsd_t *tsd, te_base_cb_t **to_trigger,
wait,
te_alloc_handlers[te_alloc_stats_interval],
stats_interval_accum_batch)) {
assert(te_alloc_handlers[te_alloc_stats_interval]->enabled());
assert(te_enabled_yes ==
te_alloc_handlers[te_alloc_stats_interval]->enabled());
to_trigger[nto_trigger++] =
te_alloc_handlers[te_alloc_stats_interval];
}
}
#ifdef JEMALLOC_STATS
assert(te_alloc_handlers[te_alloc_peak]->enabled());
assert(te_enabled_yes == te_alloc_handlers[te_alloc_peak]->enabled());
if(te_update_wait(tsd, accumbytes, allow, &waits[te_alloc_peak], wait,
te_alloc_handlers[te_alloc_peak], PEAK_EVENT_WAIT)) {
to_trigger[nto_trigger++] = te_alloc_handlers[te_alloc_peak];
}
assert(te_alloc_handlers[te_alloc_prof_threshold]->enabled());
assert(te_enabled_yes ==
te_alloc_handlers[te_alloc_prof_threshold]->enabled());
if(te_update_wait(tsd, accumbytes, allow,
&waits[te_alloc_prof_threshold], wait,
te_alloc_handlers[te_alloc_prof_threshold],
1 << opt_experimental_lg_prof_threshold)) {
to_trigger[nto_trigger++] = te_alloc_handlers[te_alloc_prof_threshold];
to_trigger[nto_trigger++] =
te_alloc_handlers[te_alloc_prof_threshold];
}
#endif
for (te_alloc_t ue = te_alloc_user0; ue <= te_alloc_user3; ue++) {
te_enabled_t status =
te_user_event_enabled(ue - te_alloc_user0, true);
if (status == te_enabled_not_installed) {
break;
} else if (status == te_enabled_yes) {
if (te_update_wait(tsd, accumbytes, allow, &waits[ue],
wait, te_alloc_handlers[ue], 0)) {
to_trigger[nto_trigger++] =
te_alloc_handlers[ue];
}
}
}
return nto_trigger;
}
@ -285,7 +326,8 @@ te_update_dalloc_events(tsd_t *tsd, te_base_cb_t **to_trigger, uint64_t accumbyt
size_t nto_trigger = 0;
uint64_t *waits = tsd_te_datap_get_unsafe(tsd)->dalloc_wait;
if (opt_tcache_gc_incr_bytes > 0) {
assert(te_dalloc_handlers[te_dalloc_tcache_gc]->enabled());
assert(te_enabled_yes ==
te_dalloc_handlers[te_dalloc_tcache_gc]->enabled());
if (te_update_wait(tsd, accumbytes, allow,
&waits[te_dalloc_tcache_gc], wait,
te_dalloc_handlers[te_dalloc_tcache_gc],
@ -295,12 +337,26 @@ te_update_dalloc_events(tsd_t *tsd, te_base_cb_t **to_trigger, uint64_t accumbyt
}
}
#ifdef JEMALLOC_STATS
assert(te_dalloc_handlers[te_dalloc_peak]->enabled());
assert(te_enabled_yes == te_dalloc_handlers[te_dalloc_peak]->enabled());
if(te_update_wait(tsd, accumbytes, allow, &waits[te_dalloc_peak], wait,
te_dalloc_handlers[te_dalloc_peak], PEAK_EVENT_WAIT)) {
te_dalloc_handlers[te_dalloc_peak],
PEAK_EVENT_WAIT)) {
to_trigger[nto_trigger++] = te_dalloc_handlers[te_dalloc_peak];
}
#endif
for (te_dalloc_t ue = te_dalloc_user0; ue <= te_dalloc_user3; ue++) {
te_enabled_t status =
te_user_event_enabled(ue - te_dalloc_user0, false);
if (status == te_enabled_not_installed) {
break;
} else if (status == te_enabled_yes) {
if (te_update_wait(tsd, accumbytes, allow, &waits[ue],
wait, te_dalloc_handlers[ue], 0)) {
to_trigger[nto_trigger++] =
te_dalloc_handlers[ue];
}
}
}
return nto_trigger;
}
@ -362,7 +418,7 @@ te_init(tsd_t *tsd, bool is_alloc) {
uint64_t wait = TE_MAX_START_WAIT;
te_init_waits(tsd, &wait, is_alloc);
te_adjust_thresholds_helper(tsd, &ctx, wait);
te_adjust_thresholds_impl(tsd, &ctx, wait);
}
void

View file

@ -3,30 +3,160 @@
#include "jemalloc/internal/thread_event.h"
#include "jemalloc/internal/thread_event_registry.h"
#include "jemalloc/internal/thread_event_registry.h"
#include "jemalloc/internal/tcache_externs.h"
#include "jemalloc/internal/peak_event.h"
#include "jemalloc/internal/prof_externs.h"
#include "jemalloc/internal/prof_threshold.h"
#include "jemalloc/internal/stats.h"
static malloc_mutex_t uevents_mu;
/* Table of all the thread events.
* Events share interface, but internally they will know thier
* data layout in tsd.
bool
experimental_thread_events_boot(void) {
return malloc_mutex_init(&uevents_mu, "thread_events",
WITNESS_RANK_THREAD_EVENTS_USER, malloc_mutex_rank_exclusive);
}
#define TE_REGISTER_ERRCODE_FULL_SLOTS -1
#define TE_REGISTER_ERRCODE_ALREADY_REGISTERED -2
static user_hook_object_t uevents_storage[TE_MAX_USER_EVENTS] = {
{NULL, 0, false},
};
static atomic_p_t uevent_obj_p[TE_MAX_USER_EVENTS] = {
NULL,
};
static inline bool
user_object_eq(user_hook_object_t *lhs, user_hook_object_t *rhs) {
assert(lhs != NULL && rhs != NULL);
return lhs->callback == rhs->callback && lhs->interval == rhs->interval
&& lhs->is_alloc_only == rhs->is_alloc_only;
}
/*
* Return slot number that event is registered at on success
* it will be [0, TE_MAX_USER_EVENTS)
* Return negative value on some error
*/
static inline int
te_register_user_handler_locked(user_hook_object_t *new_obj) {
/* Attempt to find the free slot in global register */
for (int i = 0; i < TE_MAX_USER_EVENTS; ++i) {
user_hook_object_t *p = (user_hook_object_t *)atomic_load_p(
&uevent_obj_p[i], ATOMIC_ACQUIRE);
if (p && user_object_eq(p, new_obj)) {
/* Same callback and interval are registered - no error. */
return TE_REGISTER_ERRCODE_ALREADY_REGISTERED;
} else if (p == NULL) {
/* Empty slot */
uevents_storage[i] = *new_obj;
atomic_fence(ATOMIC_SEQ_CST);
atomic_store_p(&uevent_obj_p[i], &uevents_storage[i],
ATOMIC_RELEASE);
return i;
}
}
return TE_REGISTER_ERRCODE_FULL_SLOTS;
}
static inline user_hook_object_t *
uobj_get(size_t cb_idx) {
assert(cb_idx < TE_MAX_USER_EVENTS);
return (user_hook_object_t *)atomic_load_p(
&uevent_obj_p[cb_idx], ATOMIC_ACQUIRE);
}
te_enabled_t
te_user_event_enabled(size_t ue_idx, bool is_alloc) {
assert(ue_idx < TE_MAX_USER_EVENTS);
user_hook_object_t *obj = uobj_get(ue_idx);
if (!obj) {
return te_enabled_not_installed;
}
if (is_alloc || !obj->is_alloc_only) {
return te_enabled_yes;
}
return te_enabled_no;
}
static inline uint64_t
new_event_wait(size_t cb_idx) {
user_hook_object_t *obj = uobj_get(cb_idx);
/* Enabled should have guarded it */
assert(obj);
return obj->interval;
}
static uint64_t
postponed_event_wait(tsd_t *tsd) {
return TE_MIN_START_WAIT;
}
static inline void
handler_wrapper(tsd_t *tsd, bool is_alloc, size_t cb_idx) {
user_hook_object_t *obj = uobj_get(cb_idx);
/* Enabled should have guarded it */
assert(obj);
uint64_t alloc = tsd_thread_allocated_get(tsd);
uint64_t dalloc = tsd_thread_deallocated_get(tsd);
pre_reentrancy(tsd, NULL);
obj->callback(is_alloc, alloc, dalloc);
post_reentrancy(tsd);
}
#define TE_USER_HANDLER_BINDING_IDX(i) \
static te_enabled_t te_user_alloc_enabled##i(void) { \
return te_user_event_enabled(i, true); \
} \
static te_enabled_t te_user_dalloc_enabled##i(void) { \
return te_user_event_enabled(i, false); \
} \
static uint64_t te_user_new_event_wait_##i(tsd_t *tsd) { \
return new_event_wait(i); \
} \
static void te_user_alloc_handler_call##i(tsd_t *tsd) { \
handler_wrapper(tsd, true, i); \
} \
static void te_user_dalloc_handler_call##i(tsd_t *tsd) { \
handler_wrapper(tsd, false, i); \
} \
static te_base_cb_t user_alloc_handler##i = { \
.enabled = &te_user_alloc_enabled##i, \
.new_event_wait = &te_user_new_event_wait_##i, \
.postponed_event_wait = &postponed_event_wait, \
.event_handler = &te_user_alloc_handler_call##i}; \
static te_base_cb_t user_dalloc_handler##i = { \
.enabled = &te_user_dalloc_enabled##i, \
.new_event_wait = &te_user_new_event_wait_##i, \
.postponed_event_wait = &postponed_event_wait, \
.event_handler = &te_user_dalloc_handler_call##i}
TE_USER_HANDLER_BINDING_IDX(0);
TE_USER_HANDLER_BINDING_IDX(1);
TE_USER_HANDLER_BINDING_IDX(2);
TE_USER_HANDLER_BINDING_IDX(3);
/* Table of all the thread events. */
te_base_cb_t *te_alloc_handlers[te_alloc_count] = {
#ifdef JEMALLOC_PROF
&prof_sample_te_handler,
&prof_sample_te_handler,
#endif
&stats_interval_te_handler,
&stats_interval_te_handler,
&tcache_gc_te_handler,
#ifdef JEMALLOC_STATS
&prof_threshold_te_handler,
#endif
&tcache_gc_te_handler,
#ifdef JEMALLOC_STATS
&peak_te_handler,
&prof_threshold_te_handler,
&peak_te_handler,
#endif
&user_alloc_handler0,
&user_alloc_handler1,
&user_alloc_handler2,
&user_alloc_handler3
};
te_base_cb_t *te_dalloc_handlers[te_dalloc_count] = {
@ -34,4 +164,85 @@ te_base_cb_t *te_dalloc_handlers[te_dalloc_count] = {
#ifdef JEMALLOC_STATS
&peak_te_handler,
#endif
&user_dalloc_handler0,
&user_dalloc_handler1,
&user_dalloc_handler2,
&user_dalloc_handler3
};
static inline bool
te_update_tsd(tsd_t *tsd, uint64_t new_wait, size_t ue_idx, bool is_alloc) {
bool needs_recompute = false;
te_ctx_t ctx;
uint64_t next, current, cur_wait;
if (is_alloc) {
tsd_te_datap_get_unsafe(tsd)
->alloc_wait[te_alloc_user0 + ue_idx] = new_wait;
} else {
tsd_te_datap_get_unsafe(tsd)
->dalloc_wait[te_dalloc_user0 + ue_idx] = new_wait;
}
te_ctx_get(tsd, &ctx, is_alloc);
next = te_ctx_next_event_get(&ctx);
current = te_ctx_current_bytes_get(&ctx);
cur_wait = next - current;
if (new_wait < cur_wait) {
/*
* Set last event to current (same as when te inits). This
* will make sure that all the invariants are correct, before
* we adjust next_event and next_event fast.
*/
te_ctx_last_event_set(&ctx, te_ctx_current_bytes_get(&ctx));
te_adjust_thresholds_helper(tsd, &ctx, new_wait);
needs_recompute = true;
}
return needs_recompute;
}
static inline void
te_recalculate_current_thread_data(tsdn_t *tsdn, int ue_idx, bool alloc_only) {
bool recompute = false;
/* we do not need lock to recalculate the events on the current thread */
assert(ue_idx < TE_MAX_USER_EVENTS);
tsd_t *tsd = tsdn_null(tsdn) ? tsd_fetch() : tsdn_tsd(tsdn);
if (tsd) {
uint64_t new_wait = new_event_wait(ue_idx);
recompute = te_update_tsd(tsd, new_wait, ue_idx, true);
if (!alloc_only) {
recompute = te_update_tsd(tsd, new_wait, ue_idx, false)
|| recompute;
}
if (recompute) {
te_recompute_fast_threshold(tsd);
}
}
}
int
te_register_user_handler(tsdn_t *tsdn, user_hook_object_t *te_uobj) {
int ret;
int reg_retcode;
if (!te_uobj || !te_uobj->callback || te_uobj->interval == 0) {
return EINVAL;
}
malloc_mutex_lock(tsdn, &uevents_mu);
reg_retcode = te_register_user_handler_locked(te_uobj);
malloc_mutex_unlock(tsdn, &uevents_mu);
if (reg_retcode >= 0) {
te_recalculate_current_thread_data(
tsdn, reg_retcode, te_uobj->is_alloc_only);
ret = 0;
} else if (reg_retcode == TE_REGISTER_ERRCODE_ALREADY_REGISTERED) {
ret = 0;
} else {
ret = EINVAL;
}
return ret;
}

View file

@ -1348,6 +1348,43 @@ TEST_BEGIN(test_thread_activity_callback) {
}
TEST_END
static unsigned nuser_thread_event_cb_calls;
static void
user_thread_event_cb(bool is_alloc, uint64_t tallocated, uint64_t tdallocated) {
(void)tdallocated;
(void)tallocated;
++nuser_thread_event_cb_calls;
}
static user_hook_object_t user_te_obj = {
.callback = user_thread_event_cb,
.interval = 100,
.is_alloc_only = false,
};
TEST_BEGIN(test_thread_event_hook) {
const size_t big_size = 10 * 1024 * 1024;
void *ptr;
int err;
unsigned current_calls = nuser_thread_event_cb_calls;
err = mallctl("experimental.hooks.thread_event", NULL, 0,
&user_te_obj, sizeof(user_te_obj));
assert_d_eq(0, err, "");
err = mallctl("experimental.hooks.thread_event", NULL, 0,
&user_te_obj, sizeof(user_te_obj));
assert_d_eq(0, err, "Not an error to provide object with same interval and cb");
ptr = mallocx(big_size, 0);
free(ptr);
expect_u64_lt(current_calls, nuser_thread_event_cb_calls, "");
}
TEST_END
int
main(void) {
return test(
@ -1388,5 +1425,6 @@ main(void) {
test_hooks_exhaustion,
test_thread_idle,
test_thread_peak,
test_thread_activity_callback);
test_thread_activity_callback,
test_thread_event_hook);
}

View file

@ -1,5 +1,18 @@
#include "test/jemalloc_test.h"
static uint32_t nuser_hook_calls;
static bool is_registered = false;
static void
test_cb(bool is_alloc, uint64_t tallocated, uint64_t tdallocated) {
++nuser_hook_calls;
}
static user_hook_object_t tobj = {
.callback = &test_cb,
.interval = 10,
.is_alloc_only = false
};
TEST_BEGIN(test_next_event_fast) {
tsd_t *tsd = tsd_fetch();
te_ctx_t ctx;
@ -9,6 +22,12 @@ TEST_BEGIN(test_next_event_fast) {
te_ctx_current_bytes_set(&ctx, TE_NEXT_EVENT_FAST_MAX - 8U);
te_ctx_next_event_set(tsd, &ctx, TE_NEXT_EVENT_FAST_MAX);
if (!is_registered) {
is_registered = 0 == te_register_user_handler(tsd_tsdn(tsd), &tobj);
}
assert_true(is_registered || !config_stats, "Register user handler");
nuser_hook_calls = 0;
uint64_t *waits = tsd_te_datap_get_unsafe(tsd)->alloc_wait;
for (size_t i = 0; i < te_alloc_count; i++) {
waits[i] = TE_NEXT_EVENT_FAST_MAX;
@ -16,6 +35,7 @@ TEST_BEGIN(test_next_event_fast) {
/* Test next_event_fast rolling back to 0. */
void *p = malloc(16U);
assert_true(nuser_hook_calls == 1 || !config_stats, "Expected alloc call");
assert_ptr_not_null(p, "malloc() failed");
free(p);