Merge of pami-bgq-commthr.
[charm.git] / src / arch / pami-bluegeneq / L2AtomicQueue.h
blobd6e7f8ec0d25fd4452e206f75b7cbb7079a6e6e8
2 #ifndef __L2_ATOMIC_QUEUE__
3 #define __L2_ATOMIC_QUEUE__
5 #include <pthread.h>
6 #include <stdio.h>
7 #include <assert.h>
8 #include <stdint.h>
9 #include "spi/include/l2/atomic.h"
10 #include "spi/include/l1p/flush.h"
11 #include "pcqueue.h"
13 #define DEFAULT_SIZE 1024
14 #define L2_ATOMIC_FULL 0x8000000000000000UL
15 #define L2_ATOMIC_EMPTY 0x8000000000000000UL
17 #define L2A_SUCCESS 0
18 #define L2A_EAGAIN -1
19 #define L2A_FAIL -2
21 typedef void* L2AtomicQueueElement;
23 typedef struct _l2atomicstate {
24 volatile uint64_t Consumer; // not used atomically
25 volatile uint64_t Producer;
26 volatile uint64_t UpperBound;
27 volatile uint64_t Flush; // contents not used
28 } L2AtomicState;
30 typedef struct _l2atomicq {
31 L2AtomicState * _l2state;
32 volatile void * volatile * _array;
33 int _useOverflowQ;
34 int _qsize;
35 PCQueue _overflowQ;
36 pthread_mutex_t _overflowMutex;
37 } L2AtomicQueue;
39 void L2AtomicQueueInit (void * l2mem,
40 size_t l2memsize,
41 L2AtomicQueue * queue,
42 int use_overflow,
43 int nelem)
45 pami_result_t rc;
47 //Verify counter array is 64-byte aligned
48 assert( (((uintptr_t) l2mem) & (0x1F)) == 0 );
49 assert (sizeof(L2AtomicState) <= l2memsize);
51 queue->_useOverflowQ = use_overflow;
53 int qsize = 2;
54 while (qsize < nelem)
55 qsize *= 2;
56 queue->_qsize = qsize;
58 queue->_l2state = (L2AtomicState *)l2mem;
59 pthread_mutex_init(&queue->_overflowMutex, NULL);
60 queue->_overflowQ = PCQueueCreate();
61 L2_AtomicStore(&queue->_l2state->Consumer, 0);
62 L2_AtomicStore(&queue->_l2state->Producer, 0);
63 L2_AtomicStore(&queue->_l2state->UpperBound, qsize);
65 rc = posix_memalign ((void **)&queue->_array,
66 64, /*L1 line size for BG/Q */
67 sizeof(L2AtomicQueueElement) * qsize);
69 assert(rc == PAMI_SUCCESS);
70 memset((void*)queue->_array, 0, sizeof(L2AtomicQueueElement)*qsize);
73 int L2AtomicEnqueue (L2AtomicQueue * queue,
74 void * element)
76 //fprintf(stderr,"Insert message %p\n", element);
78 register int qsize_1 = queue->_qsize - 1;
79 uint64_t index = L2_AtomicLoadIncrementBounded(&queue->_l2state->Producer);
80 L1P_FlushRequests();
81 if (index != L2_ATOMIC_FULL) {
82 queue->_array[index & qsize_1] = element;
83 return L2A_SUCCESS;
86 //We dont want to use the overflow queue
87 if (!queue->_useOverflowQ)
88 return L2A_EAGAIN; //Q is full, try later
90 //No ordering is guaranteed if there is overflow
91 pthread_mutex_lock(&queue->_overflowMutex);
92 PCQueuePush(queue->_overflowQ, element);
93 pthread_mutex_unlock(&queue->_overflowMutex);
95 return L2A_SUCCESS;
98 void * L2AtomicDequeue (L2AtomicQueue *queue)
100 uint64_t head, tail;
101 tail = queue->_l2state->Producer;
102 head = queue->_l2state->Consumer;
103 register int qsize_1 = queue->_qsize-1;
105 volatile void *e = NULL;
106 if (head < tail) {
107 e = queue->_array[head & qsize_1];
108 while (e == NULL)
109 e = queue->_array[head & qsize_1];
111 //fprintf(stderr,"Found message %p\n", e);
113 queue->_array[head & qsize_1] = NULL;
114 ppc_msync();
116 head ++;
117 queue->_l2state->Consumer = head;
119 //Charm++ does not require message ordering
120 //So we dont acquire overflow mutex here
121 uint64_t n = head + queue->_qsize;
122 // is atomic-store needed?
123 L2_AtomicStore(&queue->_l2state->UpperBound, n);
124 return (void*) e;
127 //We dont have an overflowQ
128 if (!queue->_useOverflowQ)
129 return NULL;
131 /* head == tail (head cannot be greater than tail) */
132 if (PCQueueLength(queue->_overflowQ) > 0) {
133 pthread_mutex_lock(&queue->_overflowMutex);
134 e = PCQueuePop (queue->_overflowQ);
135 pthread_mutex_unlock(&queue->_overflowMutex);
137 return (void *) e;
140 return (void *) e;
143 int L2AtomicQueueEmpty (L2AtomicQueue *queue) {
144 return ( (PCQueueLength(queue->_overflowQ) == 0) &&
145 (queue->_l2state->Producer == queue->_l2state->Consumer) );
148 //spin block in the L2 atomic queue till there is a message. fail and
149 //return after n iterations
150 int L2AtomicQueueSpinWait (L2AtomicQueue * queue,
151 int n)
153 if (!L2AtomicQueueEmpty(queue))
154 return 0; //queue is not empty so return
156 uint64_t head, tail;
157 head = queue->_l2state->Consumer;
159 size_t i = n;
160 do {
161 tail = queue->_l2state->Producer;
162 i--;
164 //While the queue is empty and i < n
165 while (head == tail && i != 0);
167 return 0; //fail queue is empty
170 //spin block in the L2 atomic queue till there is a message. fail and
171 //return after n iterations
172 int L2AtomicQueue2QSpinWait (L2AtomicQueue * queue0,
173 L2AtomicQueue * queue1,
174 int n)
176 if (!L2AtomicQueueEmpty(queue0))
177 return 0; //queue0 is not empty so return
179 if (!L2AtomicQueueEmpty(queue1))
180 return 0; //queue is not empty so return
182 uint64_t head0, tail0;
183 uint64_t head1, tail1;
185 head0 = queue0->_l2state->Consumer;
186 head1 = queue1->_l2state->Consumer;
188 size_t i = n;
189 do {
190 tail0 = queue0->_l2state->Producer;
191 tail1 = queue1->_l2state->Producer;
192 i --;
193 } while (head0==tail0 && head1==tail1 && i!=0);
195 return 0;
200 #endif