2 * mono-cq.c: concurrent queue
5 * Gonzalo Paniagua Javier (gonzalo@novell.com)
7 * Copyright (c) 2011 Novell, Inc (http://www.novell.com)
8 * Copyright 2011 Xamarin Inc
11 #include <mono/metadata/object.h>
12 #include <mono/metadata/mono-cq.h>
13 #include <mono/metadata/mono-mlist.h>
14 #include <mono/utils/mono-memory-model.h>
15 #include <mono/utils/atomic.h>
18 //#define CQ_DEBUG(...) g_message(__VA_ARGS__)
23 volatile gint32 count
;
26 /* matches the System.MonoListItem object */
33 /* matches the System.MonoCQItem object */
36 MonoArray
*array
; // MonoObjects
37 MonoArray
*array_state
; // byte array
38 volatile gint32 first
;
42 typedef struct _MonoCQItem MonoCQItem
;
43 #define CQ_ARRAY_SIZE 64
45 static MonoVTable
*monocq_item_vtable
= NULL
;
48 mono_cqitem_alloc (void)
51 MonoDomain
*domain
= mono_get_root_domain ();
53 if (!monocq_item_vtable
) {
54 MonoClass
*klass
= mono_class_from_name (mono_defaults
.corlib
, "System", "MonoCQItem");
55 monocq_item_vtable
= mono_class_vtable (domain
, klass
);
56 g_assert (monocq_item_vtable
);
58 queue
= (MonoCQItem
*) mono_object_new_fast (monocq_item_vtable
);
59 MONO_OBJECT_SETREF (queue
, array
, mono_array_new (domain
, mono_defaults
.object_class
, CQ_ARRAY_SIZE
));
60 MONO_OBJECT_SETREF (queue
, array_state
, mono_array_new (domain
, mono_defaults
.byte_class
, CQ_ARRAY_SIZE
));
69 cq
= g_new0 (MonoCQ
, 1);
70 MONO_GC_REGISTER_ROOT_SINGLE (cq
->head
);
71 MONO_GC_REGISTER_ROOT_SINGLE (cq
->tail
);
72 cq
->head
= mono_mlist_alloc ((MonoObject
*) mono_cqitem_alloc ());
74 CQ_DEBUG ("Created %p", cq
);
79 mono_cq_destroy (MonoCQ
*cq
)
81 CQ_DEBUG ("Destroy %p", cq
);
85 mono_gc_bzero_aligned (cq
, sizeof (MonoCQ
));
86 MONO_GC_UNREGISTER_ROOT (cq
->tail
);
87 MONO_GC_UNREGISTER_ROOT (cq
->head
);
92 mono_cq_count (MonoCQ
*cq
)
97 CQ_DEBUG ("Count %d", cq
->count
);
102 mono_cq_add_node (MonoCQ
*cq
)
105 MonoMList
*prev_tail
;
107 CQ_DEBUG ("Adding node");
108 n
= mono_mlist_alloc ((MonoObject
*) mono_cqitem_alloc ());
109 prev_tail
= cq
->tail
;
110 MONO_OBJECT_SETREF (prev_tail
, next
, n
);
112 /* prev_tail->next must be visible before the new tail is */
119 mono_cqitem_try_enqueue (MonoCQ
*cq
, MonoObject
*obj
)
126 queue
= (MonoCQItem
*) tail
->data
;
129 if (pos
>= CQ_ARRAY_SIZE
) {
130 CQ_DEBUG ("enqueue(): pos >= CQ_ARRAY_SIZE, %d >= %d", pos
, CQ_ARRAY_SIZE
);
134 if (InterlockedCompareExchange (&queue
->last
, pos
+ 1, pos
) == pos
) {
135 mono_array_setref_fast (queue
->array
, pos
, obj
);
137 mono_array_set_fast (queue
->array_state
, char, pos
, TRUE
);
138 if ((pos
+ 1) == CQ_ARRAY_SIZE
) {
139 CQ_DEBUG ("enqueue(): pos + 1 == CQ_ARRAY_SIZE, %d. Adding node.", CQ_ARRAY_SIZE
);
140 mono_cq_add_node (cq
);
145 g_assert_not_reached ();
149 mono_cq_enqueue (MonoCQ
*cq
, MonoObject
*obj
)
151 if (cq
== NULL
|| obj
== NULL
)
155 if (mono_cqitem_try_enqueue (cq
, obj
)) {
156 CQ_DEBUG ("Queued one");
157 InterlockedIncrement (&cq
->count
);
165 mono_cq_remove_node (MonoCQ
*cq
)
169 CQ_DEBUG ("Removing node");
171 /* Not needed now that array_state is GC memory
175 queue = (MonoCQItem *) old_head->data;
178 for (i = 0; i < CQ_ARRAY_SIZE; i++) {
179 if (mono_array_get (queue->array_state, char, i) == TRUE) {
188 while (old_head
->next
== NULL
)
190 cq
->head
= old_head
->next
;
192 MONO_OBJECT_SETREF (old_head
, next
, NULL
);
197 mono_cqitem_try_dequeue (MonoCQ
*cq
, MonoObject
**obj
)
204 queue
= (MonoCQItem
*) head
->data
;
207 if (pos
>= queue
->last
|| pos
>= CQ_ARRAY_SIZE
)
210 if (InterlockedCompareExchange (&queue
->first
, pos
+ 1, pos
) == pos
) {
211 while (mono_array_get (queue
->array_state
, char, pos
) == FALSE
) {
215 *obj
= mono_array_get (queue
->array
, MonoObject
*, pos
);
218 Here don't need to fence since the only spot that reads it is the one above.
219 Additionally, the first store is superfluous, so it can happen OOO with the second.
221 mono_array_set (queue
->array
, MonoObject
*, pos
, NULL
);
222 mono_array_set (queue
->array_state
, char, pos
, FALSE
);
225 We should do a STORE_LOAD fence here to make sure subsequent loads see new state instead
226 of the above stores. We can safely ignore this as the only issue of seeing a stale value
227 is the thread yielding. Given how unfrequent this will be in practice, we better avoid the
228 very expensive STORE_LOAD fence.
231 if ((pos
+ 1) == CQ_ARRAY_SIZE
) {
232 mono_cq_remove_node (cq
);
237 g_assert_not_reached ();
241 mono_cq_dequeue (MonoCQ
*cq
, MonoObject
**result
)
243 while (cq
->count
> 0) {
244 if (mono_cqitem_try_dequeue (cq
, result
)) {
245 CQ_DEBUG ("Dequeued one");
246 InterlockedDecrement (&cq
->count
);