diff --git a/Makefile.in b/Makefile.in index df244adb..ef6e1764 100644 --- a/Makefile.in +++ b/Makefile.in @@ -98,6 +98,7 @@ C_SRCS := $(srcroot)src/jemalloc.c \ $(srcroot)src/arena.c \ $(srcroot)src/background_thread.c \ $(srcroot)src/base.c \ + $(srcroot)src/batcher.c \ $(srcroot)src/bin.c \ $(srcroot)src/bin_info.c \ $(srcroot)src/bitmap.c \ @@ -204,6 +205,7 @@ TESTS_UNIT := \ $(srcroot)test/unit/background_thread_enable.c \ $(srcroot)test/unit/base.c \ $(srcroot)test/unit/batch_alloc.c \ + $(srcroot)test/unit/batcher.c \ $(srcroot)test/unit/binshard.c \ $(srcroot)test/unit/bitmap.c \ $(srcroot)test/unit/bit_util.c \ diff --git a/include/jemalloc/internal/batcher.h b/include/jemalloc/internal/batcher.h new file mode 100644 index 00000000..a435f0b7 --- /dev/null +++ b/include/jemalloc/internal/batcher.h @@ -0,0 +1,44 @@ +#ifndef JEMALLOC_INTERNAL_BATCHER_H +#define JEMALLOC_INTERNAL_BATCHER_H + +#include "jemalloc/internal/jemalloc_preamble.h" +#include "jemalloc/internal/atomic.h" +#include "jemalloc/internal/mutex.h" + +#define BATCHER_NO_IDX ((size_t)-1) + +typedef struct batcher_s batcher_t; +struct batcher_s { + /* + * Optimize for locality -- nelems_max and nelems are always touched + * togehter, along with the front of the mutex. The end of the mutex is + * only touched if there's contention. + */ + atomic_zu_t nelems; + size_t nelems_max; + malloc_mutex_t mtx; +}; + +void batcher_init(batcher_t *batcher, size_t nelems_max); + +/* + * Returns an index (into some user-owned array) to use for pushing, or + * BATCHER_NO_IDX if no index is free. If the former, the caller must call + * batcher_push_end once done. + */ +size_t batcher_push_begin(tsdn_t *tsdn, batcher_t *batcher, + size_t elems_to_push); +void batcher_push_end(tsdn_t *tsdn, batcher_t *batcher); + +/* + * Returns the number of items to pop, or BATCHER_NO_IDX if there are none. + * If the former, must be followed by a call to batcher_pop_end. + */ +size_t batcher_pop_begin(tsdn_t *tsdn, batcher_t *batcher); +void batcher_pop_end(tsdn_t *tsdn, batcher_t *batcher); + +void batcher_prefork(tsdn_t *tsdn, batcher_t *batcher); +void batcher_postfork_parent(tsdn_t *tsdn, batcher_t *batcher); +void batcher_postfork_child(tsdn_t *tsdn, batcher_t *batcher); + +#endif /* JEMALLOC_INTERNAL_BATCHER_H */ diff --git a/include/jemalloc/internal/witness.h b/include/jemalloc/internal/witness.h index 937ca2d5..afee1246 100644 --- a/include/jemalloc/internal/witness.h +++ b/include/jemalloc/internal/witness.h @@ -64,9 +64,10 @@ enum witness_rank_e { WITNESS_RANK_BASE, WITNESS_RANK_ARENA_LARGE, WITNESS_RANK_HOOK, + WITNESS_RANK_BIN, WITNESS_RANK_LEAF=0x1000, - WITNESS_RANK_BIN = WITNESS_RANK_LEAF, + WITNESS_RANK_BATCHER=WITNESS_RANK_LEAF, WITNESS_RANK_ARENA_STATS = WITNESS_RANK_LEAF, WITNESS_RANK_COUNTER_ACCUM = WITNESS_RANK_LEAF, WITNESS_RANK_DSS = WITNESS_RANK_LEAF, diff --git a/msvc/projects/vc2015/jemalloc/jemalloc.vcxproj b/msvc/projects/vc2015/jemalloc/jemalloc.vcxproj index 03c241ca..58bd7b3e 100644 --- a/msvc/projects/vc2015/jemalloc/jemalloc.vcxproj +++ b/msvc/projects/vc2015/jemalloc/jemalloc.vcxproj @@ -38,6 +38,7 @@ + @@ -378,4 +379,4 @@ - \ No newline at end of file + diff --git a/msvc/projects/vc2015/jemalloc/jemalloc.vcxproj.filters b/msvc/projects/vc2015/jemalloc/jemalloc.vcxproj.filters index 514368aa..82ad3e35 100644 --- a/msvc/projects/vc2015/jemalloc/jemalloc.vcxproj.filters +++ b/msvc/projects/vc2015/jemalloc/jemalloc.vcxproj.filters @@ -16,6 +16,9 @@ Source Files + + Source Files + Source Files @@ -197,4 +200,4 @@ Source Files - \ No newline at end of file + diff --git a/msvc/projects/vc2017/jemalloc/jemalloc.vcxproj b/msvc/projects/vc2017/jemalloc/jemalloc.vcxproj index 5d23d8e2..6e59c035 100644 --- a/msvc/projects/vc2017/jemalloc/jemalloc.vcxproj +++ b/msvc/projects/vc2017/jemalloc/jemalloc.vcxproj @@ -38,6 +38,7 @@ + @@ -377,4 +378,4 @@ - \ No newline at end of file + diff --git a/msvc/projects/vc2017/jemalloc/jemalloc.vcxproj.filters b/msvc/projects/vc2017/jemalloc/jemalloc.vcxproj.filters index 514368aa..82ad3e35 100644 --- a/msvc/projects/vc2017/jemalloc/jemalloc.vcxproj.filters +++ b/msvc/projects/vc2017/jemalloc/jemalloc.vcxproj.filters @@ -16,6 +16,9 @@ Source Files + + Source Files + Source Files @@ -197,4 +200,4 @@ Source Files - \ No newline at end of file + diff --git a/msvc/projects/vc2019/jemalloc/jemalloc.vcxproj b/msvc/projects/vc2019/jemalloc/jemalloc.vcxproj index 8eaab36b..db06fc6d 100644 --- a/msvc/projects/vc2019/jemalloc/jemalloc.vcxproj +++ b/msvc/projects/vc2019/jemalloc/jemalloc.vcxproj @@ -38,6 +38,7 @@ + @@ -377,4 +378,4 @@ - \ No newline at end of file + diff --git a/msvc/projects/vc2019/jemalloc/jemalloc.vcxproj.filters b/msvc/projects/vc2019/jemalloc/jemalloc.vcxproj.filters index 514368aa..82ad3e35 100644 --- a/msvc/projects/vc2019/jemalloc/jemalloc.vcxproj.filters +++ b/msvc/projects/vc2019/jemalloc/jemalloc.vcxproj.filters @@ -16,6 +16,9 @@ Source Files + + Source Files + Source Files @@ -197,4 +200,4 @@ Source Files - \ No newline at end of file + diff --git a/msvc/projects/vc2022/jemalloc/jemalloc.vcxproj b/msvc/projects/vc2022/jemalloc/jemalloc.vcxproj index cd871379..01de0dcb 100644 --- a/msvc/projects/vc2022/jemalloc/jemalloc.vcxproj +++ b/msvc/projects/vc2022/jemalloc/jemalloc.vcxproj @@ -38,6 +38,7 @@ + @@ -377,4 +378,4 @@ - \ No newline at end of file + diff --git a/msvc/projects/vc2022/jemalloc/jemalloc.vcxproj.filters b/msvc/projects/vc2022/jemalloc/jemalloc.vcxproj.filters index 514368aa..82ad3e35 100644 --- a/msvc/projects/vc2022/jemalloc/jemalloc.vcxproj.filters +++ b/msvc/projects/vc2022/jemalloc/jemalloc.vcxproj.filters @@ -16,6 +16,9 @@ Source Files + + Source Files + Source Files @@ -197,4 +200,4 @@ Source Files - \ No newline at end of file + diff --git a/src/batcher.c b/src/batcher.c new file mode 100644 index 00000000..19af7d83 --- /dev/null +++ b/src/batcher.c @@ -0,0 +1,86 @@ +#include "jemalloc/internal/jemalloc_preamble.h" + +#include "jemalloc/internal/batcher.h" + +#include "jemalloc/internal/assert.h" +#include "jemalloc/internal/atomic.h" + +void +batcher_init(batcher_t *batcher, size_t nelems_max) { + atomic_store_zu(&batcher->nelems, 0, ATOMIC_RELAXED); + batcher->nelems_max = nelems_max; + malloc_mutex_init(&batcher->mtx, "batcher", WITNESS_RANK_BATCHER, + malloc_mutex_rank_exclusive); +} + +/* + * Returns an index (into some user-owned array) to use for pushing, or + * BATCHER_NO_IDX if no index is free. + */ +size_t batcher_push_begin(tsdn_t *tsdn, batcher_t *batcher, + size_t elems_to_push) { + assert(elems_to_push > 0); + size_t nelems_guess = atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED); + if (nelems_guess + elems_to_push > batcher->nelems_max) { + return BATCHER_NO_IDX; + } + malloc_mutex_lock(tsdn, &batcher->mtx); + size_t nelems = atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED); + if (nelems + elems_to_push > batcher->nelems_max) { + malloc_mutex_unlock(tsdn, &batcher->mtx); + return BATCHER_NO_IDX; + } + assert(elems_to_push <= batcher->nelems_max - nelems); + /* + * We update nelems at push time (instead of during pop) so that other + * racing accesses of the batcher can fail fast instead of trying to + * acquire a mutex only to discover that there's no space for them. + */ + atomic_store_zu(&batcher->nelems, nelems + elems_to_push, ATOMIC_RELAXED); + return nelems; +} + +void +batcher_push_end(tsdn_t *tsdn, batcher_t *batcher) { + malloc_mutex_assert_owner(tsdn, &batcher->mtx); + assert(atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED) > 0); + malloc_mutex_unlock(tsdn, &batcher->mtx); +} + +size_t +batcher_pop_begin(tsdn_t *tsdn, batcher_t *batcher) { + size_t nelems_guess = atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED); + assert(nelems_guess <= batcher->nelems_max); + if (nelems_guess == 0) { + return BATCHER_NO_IDX; + } + malloc_mutex_lock(tsdn, &batcher->mtx); + size_t nelems = atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED); + assert(nelems <= batcher->nelems_max); + if (nelems == 0) { + malloc_mutex_unlock(tsdn, &batcher->mtx); + return BATCHER_NO_IDX; + } + atomic_store_zu(&batcher->nelems, 0, ATOMIC_RELAXED); + return nelems; +} + +void batcher_pop_end(tsdn_t *tsdn, batcher_t *batcher) { + assert(atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED) == 0); + malloc_mutex_unlock(tsdn, &batcher->mtx); +} + +void +batcher_prefork(tsdn_t *tsdn, batcher_t *batcher) { + malloc_mutex_prefork(tsdn, &batcher->mtx); +} + +void +batcher_postfork_parent(tsdn_t *tsdn, batcher_t *batcher) { + malloc_mutex_postfork_parent(tsdn, &batcher->mtx); +} + +void +batcher_postfork_child(tsdn_t *tsdn, batcher_t *batcher) { + malloc_mutex_postfork_child(tsdn, &batcher->mtx); +} diff --git a/test/unit/batcher.c b/test/unit/batcher.c new file mode 100644 index 00000000..df9d3e5b --- /dev/null +++ b/test/unit/batcher.c @@ -0,0 +1,246 @@ +#include "test/jemalloc_test.h" + +#include "jemalloc/internal/batcher.h" + +TEST_BEGIN(test_simple) { + enum { NELEMS_MAX = 10, DATA_BASE_VAL = 100, NRUNS = 5 }; + batcher_t batcher; + size_t data[NELEMS_MAX]; + for (size_t nelems = 0; nelems < NELEMS_MAX; nelems++) { + batcher_init(&batcher, nelems); + for (int run = 0; run < NRUNS; run++) { + for (int i = 0; i < NELEMS_MAX; i++) { + data[i] = (size_t)-1; + } + for (size_t i = 0; i < nelems; i++) { + size_t idx = batcher_push_begin(TSDN_NULL, + &batcher, 1); + assert_zu_eq(i, idx, "Wrong index"); + assert_zu_eq((size_t)-1, data[idx], + "Expected uninitialized slot"); + data[idx] = DATA_BASE_VAL + i; + batcher_push_end(TSDN_NULL, &batcher); + } + if (nelems > 0) { + size_t idx = batcher_push_begin(TSDN_NULL, + &batcher, 1); + assert_zu_eq(BATCHER_NO_IDX, idx, + "Shouldn't be able to push into a full " + "batcher"); + } + + size_t npop = batcher_pop_begin(TSDN_NULL, &batcher); + if (nelems == 0) { + assert_zu_eq(npop, BATCHER_NO_IDX, + "Shouldn't get any items out of an empty " + "batcher"); + } else { + assert_zu_eq(npop, nelems, + "Wrong number of elements popped"); + } + for (size_t i = 0; i < nelems; i++) { + assert_zu_eq(data[i], DATA_BASE_VAL + i, + "Item popped out of order!"); + } + if (nelems != 0) { + batcher_pop_end(TSDN_NULL, &batcher); + } + } + } +} +TEST_END + +TEST_BEGIN(test_multi_push) { + size_t idx, nelems; + batcher_t batcher; + batcher_init(&batcher, 11); + /* Push two at a time, 5 times, for 10 total. */ + for (int i = 0; i < 5; i++) { + idx = batcher_push_begin(TSDN_NULL, &batcher, 2); + assert_zu_eq(2 * i, idx, "Should push in order"); + batcher_push_end(TSDN_NULL, &batcher); + } + /* Pushing two more should fail -- would put us at 12 elems. */ + idx = batcher_push_begin(TSDN_NULL, &batcher, 2); + assert_zu_eq(BATCHER_NO_IDX, idx, "Should be out of space"); + /* But one more should work */ + idx = batcher_push_begin(TSDN_NULL, &batcher, 1); + assert_zu_eq(10, idx, "Should be out of space"); + batcher_push_end(TSDN_NULL, &batcher); + nelems = batcher_pop_begin(TSDN_NULL, &batcher); + batcher_pop_end(TSDN_NULL, &batcher); + assert_zu_eq(11, nelems, "Should have popped everything"); +} +TEST_END + +enum { + STRESS_TEST_ELEMS = 10, + STRESS_TEST_THREADS = 4, + STRESS_TEST_OPS = 1000 * 1000, + STRESS_TEST_PUSH_TO_POP_RATIO = 5, +}; + +typedef struct stress_test_data_s stress_test_data_t; +struct stress_test_data_s { + batcher_t batcher; + mtx_t pop_mtx; + atomic_u32_t thread_id; + + uint32_t elems_data[STRESS_TEST_ELEMS]; + size_t push_count[STRESS_TEST_ELEMS]; + size_t pop_count[STRESS_TEST_ELEMS]; + atomic_zu_t atomic_push_count[STRESS_TEST_ELEMS]; + atomic_zu_t atomic_pop_count[STRESS_TEST_ELEMS]; +}; + +/* + * Note: 0-indexed. If one element is set and you want to find it, you call + * get_nth_set(elems, 0). + */ +static size_t +get_nth_set(bool elems_owned[STRESS_TEST_ELEMS], size_t n) { + size_t ntrue = 0; + for (size_t i = 0; i < STRESS_TEST_ELEMS; i++) { + if (elems_owned[i]) { + ntrue++; + } + if (ntrue > n) { + return i; + } + } + assert_not_reached("Asked for the %zu'th set element when < %zu are " + "set", + n, n); + /* Just to silence a compiler warning. */ + return 0; +} + +static void * +stress_test_thd(void *arg) { + stress_test_data_t *data = arg; + size_t prng = atomic_fetch_add_u32(&data->thread_id, 1, + ATOMIC_RELAXED); + + size_t nelems_owned = 0; + bool elems_owned[STRESS_TEST_ELEMS] = {0}; + size_t local_push_count[STRESS_TEST_ELEMS] = {0}; + size_t local_pop_count[STRESS_TEST_ELEMS] = {0}; + + for (int i = 0; i < STRESS_TEST_OPS; i++) { + size_t rnd = prng_range_zu(&prng, + STRESS_TEST_PUSH_TO_POP_RATIO); + if (rnd == 0 || nelems_owned == 0) { + size_t nelems = batcher_pop_begin(TSDN_NULL, + &data->batcher); + if (nelems == BATCHER_NO_IDX) { + continue; + } + for (size_t i = 0; i < nelems; i++) { + uint32_t elem = data->elems_data[i]; + assert_false(elems_owned[elem], + "Shouldn't already own what we just " + "popped"); + elems_owned[elem] = true; + nelems_owned++; + local_pop_count[elem]++; + data->pop_count[elem]++; + } + batcher_pop_end(TSDN_NULL, &data->batcher); + } else { + size_t elem_to_push_idx = prng_range_zu(&prng, + nelems_owned); + size_t elem = get_nth_set(elems_owned, + elem_to_push_idx); + assert_true( + elems_owned[elem], + "Should own element we're about to pop"); + elems_owned[elem] = false; + local_push_count[elem]++; + data->push_count[elem]++; + nelems_owned--; + size_t idx = batcher_push_begin(TSDN_NULL, + &data->batcher, 1); + assert_zu_ne(idx, BATCHER_NO_IDX, + "Batcher can't be full -- we have one of its " + "elems!"); + data->elems_data[idx] = (uint32_t)elem; + batcher_push_end(TSDN_NULL, &data->batcher); + } + } + + /* Push all local elems back, flush local counts to the shared ones. */ + size_t push_idx = 0; + if (nelems_owned != 0) { + push_idx = batcher_push_begin(TSDN_NULL, &data->batcher, + nelems_owned); + assert_zu_ne(BATCHER_NO_IDX, push_idx, + "Should be space to push"); + } + for (size_t i = 0; i < STRESS_TEST_ELEMS; i++) { + if (elems_owned[i]) { + data->elems_data[push_idx] = (uint32_t)i; + push_idx++; + local_push_count[i]++; + data->push_count[i]++; + } + atomic_fetch_add_zu( + &data->atomic_push_count[i], local_push_count[i], + ATOMIC_RELAXED); + atomic_fetch_add_zu( + &data->atomic_pop_count[i], local_pop_count[i], + ATOMIC_RELAXED); + } + if (nelems_owned != 0) { + batcher_push_end(TSDN_NULL, &data->batcher); + } + + return NULL; +} + +TEST_BEGIN(test_stress) { + stress_test_data_t data; + batcher_init(&data.batcher, STRESS_TEST_ELEMS); + bool err = mtx_init(&data.pop_mtx); + assert_false(err, "mtx_init failure"); + atomic_store_u32(&data.thread_id, 0, ATOMIC_RELAXED); + for (int i = 0; i < STRESS_TEST_ELEMS; i++) { + data.push_count[i] = 0; + data.pop_count[i] = 0; + atomic_store_zu(&data.atomic_push_count[i], 0, ATOMIC_RELAXED); + atomic_store_zu(&data.atomic_pop_count[i], 0, ATOMIC_RELAXED); + + size_t idx = batcher_push_begin(TSDN_NULL, &data.batcher, 1); + assert_zu_eq(i, idx, "Should push in order"); + data.elems_data[idx] = i; + batcher_push_end(TSDN_NULL, &data.batcher); + } + + thd_t threads[STRESS_TEST_THREADS]; + for (int i = 0; i < STRESS_TEST_THREADS; i++) { + thd_create(&threads[i], stress_test_thd, &data); + } + for (int i = 0; i < STRESS_TEST_THREADS; i++) { + thd_join(threads[i], NULL); + } + for (int i = 0; i < STRESS_TEST_ELEMS; i++) { + assert_zu_ne(0, data.push_count[i], + "Should have done something!"); + assert_zu_eq(data.push_count[i], data.pop_count[i], + "every element should be pushed and popped an equal number " + "of times"); + assert_zu_eq(data.push_count[i], + atomic_load_zu(&data.atomic_push_count[i], ATOMIC_RELAXED), + "atomic and non-atomic count should be equal given proper " + "synchronization"); + assert_zu_eq(data.pop_count[i], + atomic_load_zu(&data.atomic_pop_count[i], ATOMIC_RELAXED), + "atomic and non-atomic count should be equal given proper " + "synchronization"); + } +} +TEST_END + +int +main(void) { + return test_no_reentrancy(test_simple, test_multi_push, test_stress); +}