2 #ifndef __L2_ATOMIC_QUEUE__
3 #define __L2_ATOMIC_QUEUE__
9 #include "spi/include/l2/atomic.h"
10 #include "spi/include/l1p/flush.h"
13 #define DEFAULT_SIZE 1024
14 #define L2_ATOMIC_FULL 0x8000000000000000UL
15 #define L2_ATOMIC_EMPTY 0x8000000000000000UL
21 typedef void* LRTSQueueElement
;
22 static void *l2atomicbuf
;
24 typedef struct _l2atomicstate
{
25 volatile uint64_t Consumer
; // not used atomically
26 volatile uint64_t Producer
;
27 volatile uint64_t UpperBound
;
28 volatile uint64_t Flush
; // contents not used
31 typedef struct _l2atomicq
{
32 L2AtomicState
* _l2state
;
33 volatile void * volatile * _array
;
37 pthread_mutex_t _overflowMutex
;
40 typedef L2AtomicQueue
* LRTSQueue
;
42 void LRTSQueueInit (void * l2mem
,
50 //Verify counter array is 64-byte aligned
51 assert( (((uintptr_t) l2mem
) & (0x1F)) == 0 );
52 assert (sizeof(L2AtomicState
) <= l2memsize
);
54 queue
->_useOverflowQ
= use_overflow
;
59 queue
->_qsize
= qsize
;
61 queue
->_l2state
= (L2AtomicState
*)l2mem
;
62 pthread_mutex_init(&queue
->_overflowMutex
, NULL
);
63 queue
->_overflowQ
= PCQueueCreate();
64 L2_AtomicStore(&queue
->_l2state
->Consumer
, 0);
65 L2_AtomicStore(&queue
->_l2state
->Producer
, 0);
66 L2_AtomicStore(&queue
->_l2state
->UpperBound
, qsize
);
68 rc
= (pami_result_t
)posix_memalign ((void **)&queue
->_array
,
69 64, /*L1 line size for BG/Q */
70 sizeof(LRTSQueueElement
) * qsize
);
72 assert(rc
== PAMI_SUCCESS
);
73 memset((void*)queue
->_array
, 0, sizeof(LRTSQueueElement
)*qsize
);
76 int LRTSQueuePush(LRTSQueue queue
,
79 //fprintf(stderr,"Insert message %p\n", element);
80 int qsize_1
= queue
->_qsize
- 1;
81 uint64_t index
= L2_AtomicLoadIncrementBounded(&queue
->_l2state
->Producer
);
83 if (index
!= L2_ATOMIC_FULL
) {
84 queue
->_array
[index
& qsize_1
] = element
;
88 //We dont want to use the overflow queue
89 if (!queue
->_useOverflowQ
)
90 return L2A_EAGAIN
; //Q is full, try later
92 //No ordering is guaranteed if there is overflow
93 pthread_mutex_lock(&queue
->_overflowMutex
);
94 PCQueuePush(queue
->_overflowQ
, (char *)element
);
95 pthread_mutex_unlock(&queue
->_overflowMutex
);
100 void * LRTSQueuePop(LRTSQueue queue
)
103 tail
= queue
->_l2state
->Producer
;
104 head
= queue
->_l2state
->Consumer
;
105 int qsize_1
= queue
->_qsize
-1;
107 volatile void *e
= NULL
;
109 e
= queue
->_array
[head
& qsize_1
];
111 e
= queue
->_array
[head
& qsize_1
];
113 //fprintf(stderr,"Found message %p\n", e);
115 queue
->_array
[head
& qsize_1
] = NULL
;
119 queue
->_l2state
->Consumer
= head
;
121 //Charm++ does not require message ordering
122 //So we dont acquire overflow mutex here
123 uint64_t n
= head
+ queue
->_qsize
;
124 // is atomic-store needed?
125 L2_AtomicStore(&queue
->_l2state
->UpperBound
, n
);
129 //We dont have an overflowQ
130 if (!queue
->_useOverflowQ
)
133 /* head == tail (head cannot be greater than tail) */
134 if (PCQueueLength(queue
->_overflowQ
) > 0) {
135 pthread_mutex_lock(&queue
->_overflowMutex
);
136 e
= PCQueuePop (queue
->_overflowQ
);
137 pthread_mutex_unlock(&queue
->_overflowMutex
);
145 int LRTSQueueEmpty (LRTSQueue queue
) {
146 return ( (PCQueueLength(queue
->_overflowQ
) == 0) &&
147 (queue
->_l2state
->Producer
== queue
->_l2state
->Consumer
) );
150 //spin block in the L2 atomic queue till there is a message. fail and
151 //return after n iterations
152 int LRTSQueueSpinWait (LRTSQueue queue
,
155 if (!LRTSQueueEmpty(queue
))
156 return 0; //queue is not empty so return
159 head
= queue
->_l2state
->Consumer
;
163 tail
= queue
->_l2state
->Producer
;
166 //While the queue is empty and i < n
167 while (head
== tail
&& i
!= 0);
169 return 0; //fail queue is empty
172 //spin block in the L2 atomic queue till there is a message. fail and
173 //return after n iterations
174 int LRTSQueue2QSpinWait (LRTSQueue queue0
,
178 if (!LRTSQueueEmpty(queue0
))
179 return 0; //queue0 is not empty so return
181 if (!LRTSQueueEmpty(queue1
))
182 return 0; //queue is not empty so return
184 uint64_t head0
, tail0
;
185 uint64_t head1
, tail1
;
187 head0
= queue0
->_l2state
->Consumer
;
188 head1
= queue1
->_l2state
->Consumer
;
192 tail0
= queue0
->_l2state
->Producer
;
193 tail1
= queue1
->_l2state
->Producer
;
195 } while (head0
==tail0
&& head1
==tail1
&& i
!=0);
200 typedef pami_result_t (*pamix_proc_memalign_fn
) (void**, size_t, size_t, const char*);
201 void LRTSQueuePreInit(void)
204 int actualNodeSize
= 64/Kernel_ProcessCount();
206 pamix_proc_memalign_fn PAMIX_L2_proc_memalign
;
207 size_t size
= (QUEUE_NUMS
+ 2*actualNodeSize
) * sizeof(L2AtomicState
);
208 // each rank, node, immediate
209 //size_t size = (4*actualNodeSize+1) * sizeof(L2AtomicState);
210 rc
= PAMI_Extension_open(NULL
, "EXT_bgq_l2atomic", &l2
);
212 PAMIX_L2_proc_memalign
= (pamix_proc_memalign_fn
)PAMI_Extension_symbol(l2
, "proc_memalign");
213 rc
= PAMIX_L2_proc_memalign(&l2atomicbuf
, 64, size
, NULL
);
217 LRTSQueue
LRTSQueueCreate(void)
219 static int position
=0;
226 Q
= (LRTSQueue
)calloc(1, sizeof( struct _l2atomicq
));
227 LRTSQueueInit ((char *) l2atomicbuf
+ sizeof(L2AtomicState
)*place
,
228 sizeof(L2AtomicState
),
231 DEFAULT_SIZE
/*1024 entries*/);
232 if(CmiMyRank() == 0) {
234 position
= CmiMyNodeSize();