From 40a1f59a5bd5fef91ac3eadce47589cb68a0f403 Mon Sep 17 00:00:00 2001 From: Pawel Dziepak Date: Fri, 14 Feb 2014 03:43:33 +0100 Subject: [PATCH] Add multiple producers, single consumer lock-free queue Signed-off-by: Pawel Dziepak --- .gitignore | 4 +- configure.ac | 17 ++++ src/Makefile.am | 14 ++- src/atomic.h | 30 +++++++ src/mpsc_queue.c | 239 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/mpsc_queue.h | 38 +++++++++ test/Makefile.am | 4 +- test/mpsc_queue_m.c | 101 ++++++++++++++++++++++ test/mpsc_queue_s.c | 85 +++++++++++++++++++ 9 files changed, 525 insertions(+), 7 deletions(-) create mode 100644 src/atomic.h create mode 100644 src/mpsc_queue.c create mode 100644 src/mpsc_queue.h create mode 100644 test/mpsc_queue_m.c create mode 100644 test/mpsc_queue_s.c diff --git a/.gitignore b/.gitignore index d32fa8e..e92f91e 100644 --- a/.gitignore +++ b/.gitignore @@ -9,8 +9,8 @@ *.log *.trs -/test/spsc_queue_s -/test/spsc_queue_m +/test/*_s +/test/*_m /aclocal.m4 /autom4te.cache/ diff --git a/configure.ac b/configure.ac index e3bd6d1..3f1d3be 100644 --- a/configure.ac +++ b/configure.ac @@ -49,6 +49,23 @@ if test "X${cf_cv_have_sync_synchronize}" != "Xyes"; then AC_MSG_ERROR([__sync_synchronize() not supported]) fi +AC_CACHE_CHECK([whether the target supports __sync_bool_compare_and_swap], + [cf_cv_have_sync_bool_compare_and_swap], + [ + AC_LINK_IFELSE([ + AC_LANG_PROGRAM([[ + ]],[[ + int val = 0; + __sync_bool_compare_and_swap(&val, 0, 1); + ]])], + [cf_cv_have_sync_bool_compare_and_swap=yes], + [cf_cv_have_sync_bool_compare_and_swap=no]) + ]) +if test "X${cf_cv_have_sync_bool_compare_and_swap}" != "Xyes"; then + AC_MSG_ERROR([__sync_bool_compare_and_swap() not supported]) +fi + + AC_CONFIG_FILES([ Makefile src/Makefile diff --git a/src/Makefile.am b/src/Makefile.am index 4483f25..3a3b459 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,6 +1,6 @@ # libnbds # -# Copyright (C) 2013 Paweł Dziepak +# Copyright (C) 2013-2014 Paweł Dziepak AUTOMAKE_OPTIONS = 1.4 foreign @@ -10,11 +10,17 @@ LIBRARY_VERSION = 1:0:0 lib_LTLIBRARIES = libnbds.la -libnbds_la_SOURCES = spsc_queue.c +libnbds_la_SOURCES = \ + mpsc_queue.c \ + spsc_queue.c -noinst_HEADERS = barriers.h +noinst_HEADERS = \ + atomic.h \ + barriers.h -pkginclude_HEADERS = spsc_queue.h +pkginclude_HEADERS = \ + mpsc_queue.h \ + spsc_queue.h libnbds_la_LDFLAGS = -version-info $(LIBRARY_VERSION) libnbds_la_CFLAGS = diff --git a/src/atomic.h b/src/atomic.h new file mode 100644 index 0000000..c1cccaf --- /dev/null +++ b/src/atomic.h @@ -0,0 +1,30 @@ +/* + libnbds + Copyright (C) 2014 Paweł Dziepak + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#ifndef ATOMIC_H +#define ATOMIC_H + +#define compare_and_swap(ptr, old, new) \ + __sync_bool_compare_and_swap(ptr, old, new) + +#define atomic_fetch_add(ptr, val) __sync_fetch_and_add(ptr, val) +#define atomic_fetch_clear(ptr) __sync_fetch_and_and(ptr, 0) + +#define atomic_or(ptr, val) __sync_fetch_and_or(ptr, val) + +#endif diff --git a/src/mpsc_queue.c b/src/mpsc_queue.c new file mode 100644 index 0000000..cfac95c --- /dev/null +++ b/src/mpsc_queue.c @@ -0,0 +1,239 @@ +/* + libnbds + Copyright (C) 2014 Paweł Dziepak + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#include +#include +#include +#include +#include + +#include "mpsc_queue.h" + +#include "atomic.h" +#include "barriers.h" + +#define BLOCK_SIZE 512 +#define NEXT_BLOCK (1u << 31) + +struct mpsc_queue_block { + struct mpsc_queue_block* next; + uint32_t consumer; + uint32_t producer_producer; +}; + +static struct mpsc_queue_block* new_block() +{ + struct mpsc_queue_block* block; + size_t block_size + = sizeof(struct mpsc_queue_block) + BLOCK_SIZE * sizeof(void*); + int retval; + + retval = posix_memalign((void**)&block, sizeof(void*), block_size); + if (retval) + return NULL; + + memset(block, 0, block_size); + block->consumer = 0; + block->producer_producer = 0; + block->next = NULL; + + return block; +} + +static void lock_blocks(struct mpsc_queue* queue) +{ + atomic_fetch_add(&queue->reference_counter, 1); +} + +void unlock_blocks(struct mpsc_queue* queue) +{ + struct mpsc_queue_block* next; + struct mpsc_queue_block* current; + struct mpsc_queue_block* block = atomic_fetch_clear(&queue->delete_list); + + if (atomic_fetch_add(&queue->reference_counter, -1) != 1) { + if (block) { + current = block; + while (current->next) + current = current->next; + + do { + next = queue->delete_list; + current->next = next; + } while (!compare_and_swap(&queue->delete_list, next, block)); + } + + return; + } + + while (block) { + next = block->next; + free(block); + block = next; + } +} + +static void free_block(struct mpsc_queue* queue, struct mpsc_queue_block* block) +{ + struct mpsc_queue_block* next; + + do { + if (queue->reference_counter == 0) { + free(block); + return; + } + + next = queue->delete_list; + block->next = next; + } while (!compare_and_swap(&queue->delete_list, next, block)); +} + +int mpsc_queue_init(struct mpsc_queue* queue) +{ + struct mpsc_queue_block* block = new_block(); + if (!block) { + errno = ENOMEM; + return -1; + } + + queue->consumer = block; + queue->producer_producer = block; + queue->reference_counter = 0; + queue->delete_list = NULL; + + return 0; +} + +void mpsc_queue_destroy(struct mpsc_queue* queue) +{ + struct mpsc_queue_block* block; + struct mpsc_queue_block* next; + + assert(!queue->reference_counter); + + block = queue->consumer; + while (block) { + next = block->next; + free(block); + block = next; + } + + block = queue->delete_list; + while (block) { + next = block->next; + free(block); + block = next; + } + + queue->consumer = NULL; + queue->producer_producer = NULL; + queue->delete_list = NULL; +} + +int mpsc_queue_enqueue(struct mpsc_queue* queue, void* object) +{ + struct mpsc_queue_block* block; + struct mpsc_queue_block* next_block; + void** array; + uint32_t producer; + int retry; + + lock_blocks(queue); + + do { + retry = 0; + block = queue->producer_producer; + + do { + producer = block->producer_producer; + if (producer & NEXT_BLOCK) { + retry = 1; + break; + } + + if (producer - block->consumer >= BLOCK_SIZE) { + next_block = new_block(); + if (!next_block) { + errno = ENOMEM; + return -1; + } + + if (!compare_and_swap(&queue->producer_producer, block, + next_block)) { + free(next_block); + } else { + assert(!block->next); + block->next = next_block; + atomic_or(&block->producer_producer, NEXT_BLOCK); + } + retry = 1; + break; + } + } while (!compare_and_swap(&block->producer_producer, producer, + producer + 1)); + } while (retry); + + unlock_blocks(queue); + + array = (void**)&block[1]; + array[producer & (BLOCK_SIZE - 1)] = object; + + return 0; +} + +void* mpsc_queue_dequeue(struct mpsc_queue* queue) +{ + struct mpsc_queue_block* block; + struct mpsc_queue_block* next; + void** array; + void** ptr; + void* object; + uint32_t producer; + uint32_t next_block; + + block = queue->consumer; + + next = block->next; + read_barrier(); + producer = block->producer_producer; + + next_block = producer & NEXT_BLOCK; + producer &= ~NEXT_BLOCK; + + if (producer == block->consumer && !next) + return NULL; + + if (producer == block->consumer && next_block) { + queue->consumer = next; + free_block(queue, block); + + block = queue->consumer; + } + + array = (void**)&block[1]; + ptr = &array[block->consumer & (BLOCK_SIZE - 1)]; + object = *ptr; + + if (object) { + *ptr = NULL; + write_barrier(); + block->consumer++; + } + + return object; +} diff --git a/src/mpsc_queue.h b/src/mpsc_queue.h new file mode 100644 index 0000000..4c2a883 --- /dev/null +++ b/src/mpsc_queue.h @@ -0,0 +1,38 @@ +/* + libnbds + Copyright (C) 2014 Paweł Dziepak + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#ifndef NBDS_MPSC_QUEUE_H +#define NBDS_MPSC_QUEUE_H + +struct mpsc_queue_block; + +struct mpsc_queue { + struct mpsc_queue_block* consumer; + struct mpsc_queue_block* producer_producer; + + uint32_t reference_counter; + struct mpsc_queue_block* delete_list; +}; + +int mpsc_queue_init(struct mpsc_queue* queue); +void mpsc_queue_destroy(struct mpsc_queue* queue); + +int mpsc_queue_enqueue(struct mpsc_queue* queue, void* object); +void* mpsc_queue_dequeue(struct mpsc_queue* queue); + +#endif diff --git a/test/Makefile.am b/test/Makefile.am index 2dca3a9..612ea6c 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -1,6 +1,6 @@ # libnbds # -# Copyright (C) 2013 Paweł Dziepak +# Copyright (C) 2013-2014 Paweł Dziepak AUTOMAKE_OPTIONS = 1.4 foreign @@ -12,6 +12,8 @@ AM_CFLAGS = -Wall -Wextra -Werror -pedantic --std=gnu99 -I../src LDADD = ../src/libnbds.la TESTS = \ + mpsc_queue_s \ + mpsc_queue_m \ spsc_queue_s \ spsc_queue_m diff --git a/test/mpsc_queue_m.c b/test/mpsc_queue_m.c new file mode 100644 index 0000000..1480e8b --- /dev/null +++ b/test/mpsc_queue_m.c @@ -0,0 +1,101 @@ +/* + libnbds + Copyright (C) 2014 Paweł Dziepak + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#include +#include +#include +#include + +#include +#include + +#include "mpsc_queue.h" + +struct mpsc_queue queue; + +pthread_barrier_t start; + +#define THREADS 4u +#define ITERATIONS (1024 * 1024 * 2u) + +void* thread_func(void* ptr) +{ + uintptr_t offset = *(uintptr_t*)ptr; + uintptr_t i; + int error; + + pthread_barrier_wait(&start); + + for (i = 1; i < ITERATIONS; i++) { + error = mpsc_queue_enqueue(&queue, (void*)(i + offset)); + if (error) + exit(1); + } + + return NULL; +} + +int main(int argc, char** argv) +{ + uintptr_t i, val, idx; + int error; + pthread_t threads[THREADS]; + uintptr_t offsets[THREADS] + = { 0x00000000, 0x10000000, 0x20000000, 0x30000000 }; + uintptr_t last_val[THREADS]; + + (void)argc; + (void)argv; + + memcpy(last_val, offsets, sizeof(last_val)); + + error = mpsc_queue_init(&queue); + if (error) + return 1; + + if (mpsc_queue_dequeue(&queue)) + return 1; + + pthread_barrier_init(&start, NULL, THREADS + 1); + + for (i = 0; i < THREADS; i++) + pthread_create(&threads[i], NULL, thread_func, &offsets[i]); + + pthread_barrier_wait(&start); + + for (i = 1; i <= (ITERATIONS - 1) * THREADS; i++) { + do { + val = (uintptr_t)mpsc_queue_dequeue(&queue); + } while (!val); + + idx = val >> 28; + if (idx >= THREADS) + return 1; + if (last_val[idx] + 1 != val) + return 1; + last_val[idx]++; + } + + for (i = 0; i < THREADS; i++) + pthread_join(threads[i], NULL); + + if (mpsc_queue_dequeue(&queue)) + return 1; + + mpsc_queue_destroy(&queue); +} diff --git a/test/mpsc_queue_s.c b/test/mpsc_queue_s.c new file mode 100644 index 0000000..68baf36 --- /dev/null +++ b/test/mpsc_queue_s.c @@ -0,0 +1,85 @@ +/* + libnbds + Copyright (C) 2014 Paweł Dziepak + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#include + +#include "mpsc_queue.h" + +struct mpsc_queue queue; + +#define SIZE 1024u + +int main(int argc, char** argv) +{ + uintptr_t i; + int error; + + (void)argc; + (void)argv; + + error = mpsc_queue_init(&queue); + if (error) + return 1; + + if (mpsc_queue_dequeue(&queue)) + return 1; + + for (i = 1; i < SIZE; i++) { + error = mpsc_queue_enqueue(&queue, (void*)i); + if (error) + return 1; + } + + for (i = 1; i < SIZE; i++) { + if ((uintptr_t)mpsc_queue_dequeue(&queue) != i) + return 1; + } + + if (mpsc_queue_dequeue(&queue)) + return 1; + + for (i = 1; i < SIZE; i++) { + mpsc_queue_enqueue(&queue, (void*)i); + if ((uintptr_t)mpsc_queue_dequeue(&queue) != i) + return 1; + } + + if (mpsc_queue_dequeue(&queue)) + return 1; + + mpsc_queue_destroy(&queue); + error = mpsc_queue_init(&queue); + if (error) + return 1; + + for (i = 1; i < SIZE; i++) { + error = mpsc_queue_enqueue(&queue, (void*)i); + if (error) + return 1; + } + + mpsc_queue_destroy(&queue); + error = mpsc_queue_init(&queue); + if (error) + return 1; + + if (mpsc_queue_dequeue(&queue)) + return 1; + + mpsc_queue_destroy(&queue); +} -- 2.11.4.GIT