2 #ifndef __L2_ATOMIC_QUEUE__
3 #define __L2_ATOMIC_QUEUE__
9 #include "spi/include/l2/atomic.h"
10 #include "spi/include/l1p/flush.h"
13 #define DEFAULT_SIZE 1024
14 #define L2_ATOMIC_FULL 0x8000000000000000UL
15 #define L2_ATOMIC_EMPTY 0x8000000000000000UL
21 typedef void* L2AtomicQueueElement
;
23 typedef struct _l2atomicstate
{
24 volatile uint64_t Consumer
; // not used atomically
25 volatile uint64_t Producer
;
26 volatile uint64_t UpperBound
;
27 volatile uint64_t Flush
; // contents not used
30 typedef struct _l2atomicq
{
31 L2AtomicState
* _l2state
;
32 volatile void * volatile * _array
;
36 pthread_mutex_t _overflowMutex
;
39 void L2AtomicQueueInit (void * l2mem
,
41 L2AtomicQueue
* queue
,
47 //Verify counter array is 64-byte aligned
48 assert( (((uintptr_t) l2mem
) & (0x1F)) == 0 );
49 assert (sizeof(L2AtomicState
) <= l2memsize
);
51 queue
->_useOverflowQ
= use_overflow
;
56 queue
->_qsize
= qsize
;
58 queue
->_l2state
= (L2AtomicState
*)l2mem
;
59 pthread_mutex_init(&queue
->_overflowMutex
, NULL
);
60 queue
->_overflowQ
= PCQueueCreate();
61 L2_AtomicStore(&queue
->_l2state
->Consumer
, 0);
62 L2_AtomicStore(&queue
->_l2state
->Producer
, 0);
63 L2_AtomicStore(&queue
->_l2state
->UpperBound
, qsize
);
65 rc
= posix_memalign ((void **)&queue
->_array
,
66 64, /*L1 line size for BG/Q */
67 sizeof(L2AtomicQueueElement
) * qsize
);
69 assert(rc
== PAMI_SUCCESS
);
70 memset((void*)queue
->_array
, 0, sizeof(L2AtomicQueueElement
)*qsize
);
73 int L2AtomicEnqueue (L2AtomicQueue
* queue
,
76 //fprintf(stderr,"Insert message %p\n", element);
78 register int qsize_1
= queue
->_qsize
- 1;
79 uint64_t index
= L2_AtomicLoadIncrementBounded(&queue
->_l2state
->Producer
);
81 if (index
!= L2_ATOMIC_FULL
) {
82 queue
->_array
[index
& qsize_1
] = element
;
86 //We dont want to use the overflow queue
87 if (!queue
->_useOverflowQ
)
88 return L2A_EAGAIN
; //Q is full, try later
90 //No ordering is guaranteed if there is overflow
91 pthread_mutex_lock(&queue
->_overflowMutex
);
92 PCQueuePush(queue
->_overflowQ
, element
);
93 pthread_mutex_unlock(&queue
->_overflowMutex
);
98 void * L2AtomicDequeue (L2AtomicQueue
*queue
)
101 tail
= queue
->_l2state
->Producer
;
102 head
= queue
->_l2state
->Consumer
;
103 register int qsize_1
= queue
->_qsize
-1;
105 volatile void *e
= NULL
;
107 e
= queue
->_array
[head
& qsize_1
];
109 e
= queue
->_array
[head
& qsize_1
];
111 //fprintf(stderr,"Found message %p\n", e);
113 queue
->_array
[head
& qsize_1
] = NULL
;
117 queue
->_l2state
->Consumer
= head
;
119 //Charm++ does not require message ordering
120 //So we dont acquire overflow mutex here
121 uint64_t n
= head
+ queue
->_qsize
;
122 // is atomic-store needed?
123 L2_AtomicStore(&queue
->_l2state
->UpperBound
, n
);
127 //We dont have an overflowQ
128 if (!queue
->_useOverflowQ
)
131 /* head == tail (head cannot be greater than tail) */
132 if (PCQueueLength(queue
->_overflowQ
) > 0) {
133 pthread_mutex_lock(&queue
->_overflowMutex
);
134 e
= PCQueuePop (queue
->_overflowQ
);
135 pthread_mutex_unlock(&queue
->_overflowMutex
);
143 int L2AtomicQueueEmpty (L2AtomicQueue
*queue
) {
144 return ( (PCQueueLength(queue
->_overflowQ
) == 0) &&
145 (queue
->_l2state
->Producer
== queue
->_l2state
->Consumer
) );
148 //spin block in the L2 atomic queue till there is a message. fail and
149 //return after n iterations
150 int L2AtomicQueueSpinWait (L2AtomicQueue
* queue
,
153 if (!L2AtomicQueueEmpty(queue
))
154 return 0; //queue is not empty so return
157 head
= queue
->_l2state
->Consumer
;
161 tail
= queue
->_l2state
->Producer
;
164 //While the queue is empty and i < n
165 while (head
== tail
&& i
!= 0);
167 return 0; //fail queue is empty
170 //spin block in the L2 atomic queue till there is a message. fail and
171 //return after n iterations
172 int L2AtomicQueue2QSpinWait (L2AtomicQueue
* queue0
,
173 L2AtomicQueue
* queue1
,
176 if (!L2AtomicQueueEmpty(queue0
))
177 return 0; //queue0 is not empty so return
179 if (!L2AtomicQueueEmpty(queue1
))
180 return 0; //queue is not empty so return
182 uint64_t head0
, tail0
;
183 uint64_t head1
, tail1
;
185 head0
= queue0
->_l2state
->Consumer
;
186 head1
= queue1
->_l2state
->Consumer
;
190 tail0
= queue0
->_l2state
->Producer
;
191 tail1
= queue1
->_l2state
->Producer
;
193 } while (head0
==tail0
&& head1
==tail1
&& i
!=0);