bump rx to official 2.1 release.
[mono-project.git] / mono / utils / lock-free-array-queue.c
blob6f692f976e8f169f54c51ca5057fb2aeec2044f9
1 /*
2 * lock-free-array-queue.c: A lock-free somewhat-queue that doesn't
3 * require hazard pointers.
5 * (C) Copyright 2011 Xamarin Inc.
6 */
8 /*
9 * The queue is a linked list of arrays (chunks). Chunks are never
10 * removed from the list, only added to the end, in a lock-free manner.
12 * Adding or removing an entry in the queue is only possible at the
13 * end. To do so, the thread first has to increment or decrement
14 * q->num_used_entries. The entry thus added or removed now "belongs"
15 * to that thread. It first CASes the state to BUSY, writes/reads the
16 * entry data, and then sets the state to USED or FREE.
19 #include <mono/io-layer/io-layer.h>
20 #include <mono/utils/mono-membar.h>
21 #include <mono/utils/mono-mmap.h>
23 #include <mono/utils/lock-free-array-queue.h>
25 struct _MonoLockFreeArrayChunk {
26 MonoLockFreeArrayChunk *next;
27 gint32 num_entries;
28 char entries [MONO_ZERO_LEN_ARRAY];
31 typedef MonoLockFreeArrayChunk Chunk;
33 #define CHUNK_NTH(arr,chunk,index) ((chunk)->entries + (index) * (arr)->entry_size)
35 static Chunk*
36 alloc_chunk (MonoLockFreeArray *arr)
38 int size = mono_pagesize ();
39 int num_entries = (size - (sizeof (Chunk) - arr->entry_size * MONO_ZERO_LEN_ARRAY)) / arr->entry_size;
40 Chunk *chunk = mono_valloc (0, size, MONO_MMAP_READ | MONO_MMAP_WRITE);
41 g_assert (chunk);
42 chunk->num_entries = num_entries;
43 return chunk;
46 static void
47 free_chunk (Chunk *chunk)
49 mono_vfree (chunk, mono_pagesize ());
52 gpointer
53 mono_lock_free_array_nth (MonoLockFreeArray *arr, int index)
55 Chunk *chunk;
57 g_assert (index >= 0);
59 if (!arr->chunk_list) {
60 chunk = alloc_chunk (arr);
61 mono_memory_write_barrier ();
62 if (InterlockedCompareExchangePointer ((volatile gpointer *)&arr->chunk_list, chunk, NULL) != NULL)
63 free_chunk (chunk);
66 chunk = arr->chunk_list;
67 g_assert (chunk);
69 while (index >= chunk->num_entries) {
70 Chunk *next = chunk->next;
71 if (!next) {
72 next = alloc_chunk (arr);
73 mono_memory_write_barrier ();
74 if (InterlockedCompareExchangePointer ((volatile gpointer *) &chunk->next, next, NULL) != NULL) {
75 free_chunk (next);
76 next = chunk->next;
77 g_assert (next);
80 index -= chunk->num_entries;
81 chunk = next;
84 return CHUNK_NTH (arr, chunk, index);
87 gpointer
88 mono_lock_free_array_iterate (MonoLockFreeArray *arr, MonoLockFreeArrayIterateFunc func, gpointer user_data)
90 Chunk *chunk;
91 for (chunk = arr->chunk_list; chunk; chunk = chunk->next) {
92 int i;
93 for (i = 0; i < chunk->num_entries; ++i) {
94 gpointer result = func (i, CHUNK_NTH (arr, chunk, i), user_data);
95 if (result)
96 return result;
99 return NULL;
102 void
103 mono_lock_free_array_cleanup (MonoLockFreeArray *arr)
105 Chunk *chunk;
107 chunk = arr->chunk_list;
108 arr->chunk_list = NULL;
109 while (chunk) {
110 Chunk *next = chunk->next;
111 free_chunk (chunk);
112 chunk = next;
116 enum {
117 STATE_FREE,
118 STATE_USED,
119 STATE_BUSY
122 typedef struct {
123 gint32 state;
124 gpointer data [MONO_ZERO_LEN_ARRAY];
125 } Entry;
127 typedef MonoLockFreeArrayQueue Queue;
129 /* The queue's entry size, calculated from the array's. */
130 #define ENTRY_SIZE(q) ((q)->array.entry_size - sizeof (gpointer))
132 void
133 mono_lock_free_array_queue_push (MonoLockFreeArrayQueue *q, gpointer entry_data_ptr)
135 int index, num_used;
136 Entry *entry;
138 do {
139 index = InterlockedIncrement (&q->num_used_entries) - 1;
140 entry = mono_lock_free_array_nth (&q->array, index);
141 } while (InterlockedCompareExchange (&entry->state, STATE_BUSY, STATE_FREE) != STATE_FREE);
143 mono_memory_write_barrier ();
145 memcpy (entry->data, entry_data_ptr, ENTRY_SIZE (q));
147 mono_memory_write_barrier ();
149 entry->state = STATE_USED;
151 mono_memory_barrier ();
153 do {
154 num_used = q->num_used_entries;
155 if (num_used > index)
156 break;
157 } while (InterlockedCompareExchange (&q->num_used_entries, index + 1, num_used) != num_used);
159 mono_memory_write_barrier ();
162 gboolean
163 mono_lock_free_array_queue_pop (MonoLockFreeArrayQueue *q, gpointer entry_data_ptr)
165 int index;
166 Entry *entry;
168 do {
169 do {
170 index = q->num_used_entries;
171 if (index == 0)
172 return FALSE;
173 } while (InterlockedCompareExchange (&q->num_used_entries, index - 1, index) != index);
175 entry = mono_lock_free_array_nth (&q->array, index - 1);
176 } while (InterlockedCompareExchange (&entry->state, STATE_BUSY, STATE_USED) != STATE_USED);
178 /* Reading the item must happen before CASing the state. */
179 mono_memory_barrier ();
181 memcpy (entry_data_ptr, entry->data, ENTRY_SIZE (q));
183 mono_memory_barrier ();
185 entry->state = STATE_FREE;
187 mono_memory_write_barrier ();
189 return TRUE;
192 void
193 mono_lock_free_array_queue_cleanup (MonoLockFreeArrayQueue *q)
195 mono_lock_free_array_cleanup (&q->array);
196 q->num_used_entries = 0;