spsc_queue: Relax memory ordering
[libnbds.git] / src / spsc_queue.c
blob8e9294163f2bfc08a3124bafb715e761e054e252
1 /*
2 libnbds
3 Copyright (C) 2013 Paweł Dziepak
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <errno.h>
20 #include <stdint.h>
21 #include <stdlib.h>
22 #include <string.h>
24 #include "spsc_queue.h"
26 #include "barriers.h"
28 #define BLOCK_SIZE 64
30 struct spsc_queue_block {
31 struct spsc_queue_block* next;
32 uint32_t consumer;
33 uint32_t producer;
36 static struct spsc_queue_block* new_block()
38 struct spsc_queue_block* block;
39 size_t block_size
40 = sizeof(struct spsc_queue_block) + BLOCK_SIZE * sizeof(void*);
41 int retval;
43 retval = posix_memalign((void**)&block, sizeof(void*), block_size);
44 if (retval)
45 return NULL;
47 block->consumer = 0;
48 block->producer = 0;
49 block->next = NULL;
51 return block;
54 int spsc_queue_init(struct spsc_queue* queue)
56 struct spsc_queue_block* block = new_block();
57 if (!block) {
58 errno = ENOMEM;
59 return -1;
62 queue->consumer = block;
63 queue->producer = block;
65 return 0;
68 void spsc_queue_destroy(struct spsc_queue* queue)
70 struct spsc_queue_block* block = queue->consumer;
71 struct spsc_queue_block* next;
73 while (block) {
74 next = block->next;
75 free(block);
76 block = next;
79 queue->consumer = NULL;
80 queue->producer = NULL;
83 int spsc_queue_enqueue(struct spsc_queue* queue, void* object)
85 struct spsc_queue_block* block;
86 void** array;
87 uint32_t consumer;
89 block = queue->producer;
90 consumer = block->consumer;
92 if (block->producer - consumer >= BLOCK_SIZE) {
93 block = new_block();
94 if (!block) {
95 errno = ENOMEM;
96 return -1;
100 array = (void**)&block[1];
102 array[block->producer & (BLOCK_SIZE - 1)] = object;
103 write_barrier();
104 block->producer++;
106 if (block != queue->producer) {
107 write_barrier();
108 queue->producer->next = block;
109 queue->producer = block;
112 return 0;
115 void* spsc_queue_dequeue(struct spsc_queue* queue)
117 struct spsc_queue_block* block;
118 struct spsc_queue_block* next;
119 void** array;
120 void* object;
121 uint32_t producer;
123 block = queue->consumer;
125 next = block->next;
126 read_barrier();
127 producer = block->producer;
129 if (producer == block->consumer && !next)
130 return NULL;
132 if (producer == block->consumer) {
133 queue->consumer = next;
134 free(block);
135 block = queue->consumer;
138 array = (void**)&block[1];
139 object = array[block->consumer & (BLOCK_SIZE - 1)];
140 block->consumer++;
142 return object;