Wrap all RTS functions exposed to AMPI programs in special macros
[charm.git] / src / arch / pamilrts / lrtsqueue.h
blob6b003c36f6b73cfb1b6fa4ee9d6cedcae0651054
2 #ifndef __L2_ATOMIC_QUEUE__
3 #define __L2_ATOMIC_QUEUE__
5 #include <pthread.h>
6 #include <stdio.h>
7 #include <assert.h>
8 #include <stdint.h>
9 #include "spi/include/l2/atomic.h"
10 #include "spi/include/l1p/flush.h"
11 #include "pcqueue.h"
13 #define DEFAULT_SIZE 1024
14 #define L2_ATOMIC_FULL 0x8000000000000000UL
15 #define L2_ATOMIC_EMPTY 0x8000000000000000UL
17 #define L2A_SUCCESS 0
18 #define L2A_EAGAIN -1
19 #define L2A_FAIL -2
21 typedef void* LRTSQueueElement;
22 static void *l2atomicbuf;
24 typedef struct _l2atomicstate {
25 volatile uint64_t Consumer; // not used atomically
26 volatile uint64_t Producer;
27 volatile uint64_t UpperBound;
28 volatile uint64_t Flush; // contents not used
29 } L2AtomicState;
31 typedef struct _l2atomicq {
32 L2AtomicState * _l2state;
33 volatile void * volatile * _array;
34 int _useOverflowQ;
35 int _qsize;
36 PCQueue _overflowQ;
37 pthread_mutex_t _overflowMutex;
38 } L2AtomicQueue;
40 typedef L2AtomicQueue* LRTSQueue;
42 void LRTSQueueInit (void * l2mem,
43 size_t l2memsize,
44 LRTSQueue queue,
45 int use_overflow,
46 int nelem)
48 pami_result_t rc;
50 //Verify counter array is 64-byte aligned
51 assert( (((uintptr_t) l2mem) & (0x1F)) == 0 );
52 assert (sizeof(L2AtomicState) <= l2memsize);
54 queue->_useOverflowQ = use_overflow;
56 int qsize = 2;
57 while (qsize < nelem)
58 qsize *= 2;
59 queue->_qsize = qsize;
61 queue->_l2state = (L2AtomicState *)l2mem;
62 pthread_mutex_init(&queue->_overflowMutex, NULL);
63 queue->_overflowQ = PCQueueCreate();
64 L2_AtomicStore(&queue->_l2state->Consumer, 0);
65 L2_AtomicStore(&queue->_l2state->Producer, 0);
66 L2_AtomicStore(&queue->_l2state->UpperBound, qsize);
68 rc = (pami_result_t)posix_memalign ((void **)&queue->_array,
69 64, /*L1 line size for BG/Q */
70 sizeof(LRTSQueueElement) * qsize);
72 assert(rc == PAMI_SUCCESS);
73 memset((void*)queue->_array, 0, sizeof(LRTSQueueElement)*qsize);
76 int LRTSQueuePush(LRTSQueue queue,
77 void * element)
79 //fprintf(stderr,"Insert message %p\n", element);
80 int qsize_1 = queue->_qsize - 1;
81 uint64_t index = L2_AtomicLoadIncrementBounded(&queue->_l2state->Producer);
82 L1P_FlushRequests();
83 if (index != L2_ATOMIC_FULL) {
84 queue->_array[index & qsize_1] = element;
85 return L2A_SUCCESS;
88 //We dont want to use the overflow queue
89 if (!queue->_useOverflowQ)
90 return L2A_EAGAIN; //Q is full, try later
92 //No ordering is guaranteed if there is overflow
93 pthread_mutex_lock(&queue->_overflowMutex);
94 PCQueuePush(queue->_overflowQ, (char *)element);
95 pthread_mutex_unlock(&queue->_overflowMutex);
97 return L2A_SUCCESS;
100 void * LRTSQueuePop(LRTSQueue queue)
102 uint64_t head, tail;
103 tail = queue->_l2state->Producer;
104 head = queue->_l2state->Consumer;
105 int qsize_1 = queue->_qsize-1;
107 volatile void *e = NULL;
108 if (head < tail) {
109 e = queue->_array[head & qsize_1];
110 while (e == NULL)
111 e = queue->_array[head & qsize_1];
113 //fprintf(stderr,"Found message %p\n", e);
115 queue->_array[head & qsize_1] = NULL;
116 ppc_msync();
118 head ++;
119 queue->_l2state->Consumer = head;
121 //Charm++ does not require message ordering
122 //So we dont acquire overflow mutex here
123 uint64_t n = head + queue->_qsize;
124 // is atomic-store needed?
125 L2_AtomicStore(&queue->_l2state->UpperBound, n);
126 return (void*) e;
129 //We dont have an overflowQ
130 if (!queue->_useOverflowQ)
131 return NULL;
133 /* head == tail (head cannot be greater than tail) */
134 if (PCQueueLength(queue->_overflowQ) > 0) {
135 pthread_mutex_lock(&queue->_overflowMutex);
136 e = PCQueuePop (queue->_overflowQ);
137 pthread_mutex_unlock(&queue->_overflowMutex);
139 return (void *) e;
142 return (void *) e;
145 int LRTSQueueEmpty (LRTSQueue queue) {
146 return ( (PCQueueLength(queue->_overflowQ) == 0) &&
147 (queue->_l2state->Producer == queue->_l2state->Consumer) );
150 //spin block in the L2 atomic queue till there is a message. fail and
151 //return after n iterations
152 int LRTSQueueSpinWait (LRTSQueue queue,
153 int n)
155 if (!LRTSQueueEmpty(queue))
156 return 0; //queue is not empty so return
158 uint64_t head, tail;
159 head = queue->_l2state->Consumer;
161 size_t i = n;
162 do {
163 tail = queue->_l2state->Producer;
164 i--;
166 //While the queue is empty and i < n
167 while (head == tail && i != 0);
169 return 0; //fail queue is empty
172 //spin block in the L2 atomic queue till there is a message. fail and
173 //return after n iterations
174 int LRTSQueue2QSpinWait (LRTSQueue queue0,
175 LRTSQueue queue1,
176 int n)
178 if (!LRTSQueueEmpty(queue0))
179 return 0; //queue0 is not empty so return
181 if (!LRTSQueueEmpty(queue1))
182 return 0; //queue is not empty so return
184 uint64_t head0, tail0;
185 uint64_t head1, tail1;
187 head0 = queue0->_l2state->Consumer;
188 head1 = queue1->_l2state->Consumer;
190 size_t i = n;
191 do {
192 tail0 = queue0->_l2state->Producer;
193 tail1 = queue1->_l2state->Producer;
194 i --;
195 } while (head0==tail0 && head1==tail1 && i!=0);
197 return 0;
200 typedef pami_result_t (*pamix_proc_memalign_fn) (void**, size_t, size_t, const char*);
201 void LRTSQueuePreInit(void)
203 pami_result_t rc;
204 int actualNodeSize = 64/Kernel_ProcessCount();
205 pami_extension_t l2;
206 pamix_proc_memalign_fn PAMIX_L2_proc_memalign;
207 size_t size = (QUEUE_NUMS + 2*actualNodeSize) * sizeof(L2AtomicState);
208 // each rank, node, immediate
209 //size_t size = (4*actualNodeSize+1) * sizeof(L2AtomicState);
210 rc = PAMI_Extension_open(NULL, "EXT_bgq_l2atomic", &l2);
211 CmiAssert (rc == 0);
212 PAMIX_L2_proc_memalign = (pamix_proc_memalign_fn)PAMI_Extension_symbol(l2, "proc_memalign");
213 rc = PAMIX_L2_proc_memalign(&l2atomicbuf, 64, size, NULL);
214 CmiAssert (rc == 0);
217 LRTSQueue LRTSQueueCreate(void)
219 static int position=0;
220 int place;
221 if(CmiMyRank() == 0)
222 place = position;
223 else
224 place = CmiMyRank();
225 LRTSQueue Q;
226 Q = (LRTSQueue)calloc(1, sizeof( struct _l2atomicq ));
227 LRTSQueueInit ((char *) l2atomicbuf + sizeof(L2AtomicState)*place,
228 sizeof(L2AtomicState),
230 1, /*use overflow*/
231 DEFAULT_SIZE /*1024 entries*/);
232 if(CmiMyRank() == 0) {
233 if(position == 0) {
234 position = CmiMyNodeSize();
235 } else {
236 position++;
239 return Q;
241 #endif