redis/include/jemalloc/internal/mpsc_queue.h

135 lines
4.9 KiB
C

#ifndef JEMALLOC_INTERNAL_MPSC_QUEUE_H
#define JEMALLOC_INTERNAL_MPSC_QUEUE_H
#include "jemalloc/internal/atomic.h"
/*
* A concurrent implementation of a multi-producer, single-consumer queue. It
* supports three concurrent operations:
* - Push
* - Push batch
* - Pop batch
*
* These operations are all lock-free.
*
* The implementation is the simple two-stack queue built on a Treiber stack.
* It's not terribly efficient, but this isn't expected to go into anywhere with
* hot code. In fact, we don't really even need queue semantics in any
* anticipated use cases; we could get away with just the stack. But this way
* lets us frame the API in terms of the existing list types, which is a nice
* convenience. We can save on cache misses by introducing our own (parallel)
* single-linked list type here, and dropping FIFO semantics, if we need this to
* get faster. Since we're currently providing queue semantics though, we use
* the prev field in the link rather than the next field for Treiber-stack
* linkage, so that we can preserve order for bash-pushed lists (recall that the
* two-stack tricks reverses orders in the lock-free first stack).
*/
#define mpsc_queue(a_type) \
struct { \
atomic_p_t tail; \
}
#define mpsc_queue_proto(a_attr, a_prefix, a_queue_type, a_type, \
a_list_type) \
/* Initialize a queue. */ \
a_attr void \
a_prefix##new(a_queue_type *queue); \
/* Insert all items in src into the queue, clearing src. */ \
a_attr void \
a_prefix##push_batch(a_queue_type *queue, a_list_type *src); \
/* Insert node into the queue. */ \
a_attr void \
a_prefix##push(a_queue_type *queue, a_type *node); \
/* \
* Pop all items in the queue into the list at dst. dst should already \
* be initialized (and may contain existing items, which then remain \
* in dst). \
*/ \
a_attr void \
a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst);
#define mpsc_queue_gen(a_attr, a_prefix, a_queue_type, a_type, \
a_list_type, a_link) \
a_attr void \
a_prefix##new(a_queue_type *queue) { \
atomic_store_p(&queue->tail, NULL, ATOMIC_RELAXED); \
} \
a_attr void \
a_prefix##push_batch(a_queue_type *queue, a_list_type *src) { \
/* \
* Reuse the ql list next field as the Treiber stack next \
* field. \
*/ \
a_type *first = ql_first(src); \
a_type *last = ql_last(src, a_link); \
void* cur_tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED); \
do { \
/* \
* Note that this breaks the queue ring structure; \
* it's not a ring any more! \
*/ \
first->a_link.qre_prev = cur_tail; \
/* \
* Note: the upcoming CAS doesn't need an atomic; every \
* push only needs to synchronize with the next pop, \
* which we get from the release sequence rules. \
*/ \
} while (!atomic_compare_exchange_weak_p(&queue->tail, \
&cur_tail, last, ATOMIC_RELEASE, ATOMIC_RELAXED)); \
ql_new(src); \
} \
a_attr void \
a_prefix##push(a_queue_type *queue, a_type *node) { \
ql_elm_new(node, a_link); \
a_list_type list; \
ql_new(&list); \
ql_head_insert(&list, node, a_link); \
a_prefix##push_batch(queue, &list); \
} \
a_attr void \
a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst) { \
a_type *tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED); \
if (tail == NULL) { \
/* \
* In the common special case where there are no \
* pending elements, bail early without a costly RMW. \
*/ \
return; \
} \
tail = atomic_exchange_p(&queue->tail, NULL, ATOMIC_ACQUIRE); \
/* \
* It's a single-consumer queue, so if cur started non-NULL, \
* it'd better stay non-NULL. \
*/ \
assert(tail != NULL); \
/* \
* We iterate through the stack and both fix up the link \
* structure (stack insertion broke the list requirement that \
* the list be circularly linked). It's just as efficient at \
* this point to make the queue a "real" queue, so do that as \
* well. \
* If this ever gets to be a hot spot, we can omit this fixup \
* and make the queue a bag (i.e. not necessarily ordered), but \
* that would mean jettisoning the existing list API as the \
* batch pushing/popping interface. \
*/ \
a_list_type reversed; \
ql_new(&reversed); \
while (tail != NULL) { \
/* \
* Pop an item off the stack, prepend it onto the list \
* (reversing the order). Recall that we use the \
* list prev field as the Treiber stack next field to \
* preserve order of batch-pushed items when reversed. \
*/ \
a_type *next = tail->a_link.qre_prev; \
ql_elm_new(tail, a_link); \
ql_head_insert(&reversed, tail, a_link); \
tail = next; \
} \
ql_concat(dst, &reversed, a_link); \
}
#endif /* JEMALLOC_INTERNAL_MPSC_QUEUE_H */