Fix parallel build of examples/charm++/user-driven-interop
[charm.git] / src / conv-core / conv-conds.c
blob61fde7660ff797521ed58675e0e5a58eaf5477f9
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <math.h>
5 #include "converse.h"
7 /**
8 * Structure to hold the requisites for a callback
9 */
10 typedef struct _ccd_callback {
11 CcdVoidFn fn;
12 void *arg;
13 int pe; /* the pe that sets the callback */
14 } ccd_callback;
18 /**
19 * An element (a single callback) in a list of callbacks
21 typedef struct _ccd_cblist_elem {
22 ccd_callback cb;
23 short int next;
24 short int prev;
25 } ccd_cblist_elem;
29 /**
30 * A list of callbacks stored as an array and handled like a list
32 typedef struct _ccd_cblist {
33 unsigned short int maxlen;
34 unsigned short int len;
35 short int first, last;
36 short int first_free;
37 unsigned char flag;
38 ccd_cblist_elem *elems;
39 } ccd_cblist;
43 /** Initialize a list of callbacks. Alloc memory, set counters etc. */
44 static void init_cblist(ccd_cblist *l, unsigned int ml)
46 int i;
47 l->elems = (ccd_cblist_elem*) malloc(ml*sizeof(ccd_cblist_elem));
48 _MEMCHECK(l->elems);
49 for(i=0;i<ml;i++) {
50 l->elems[i].next = i+1;
51 l->elems[i].prev = i-1;
53 l->elems[ml-1].next = -1;
54 l->len = 0;
55 l->maxlen = ml;
56 l->first = l->last = -1;
57 l->first_free = 0;
58 l->flag = 0;
63 /** Expand the callback list to a max length of ml */
64 static void expand_cblist(ccd_cblist *l, unsigned int ml)
66 ccd_cblist_elem *old_elems = l->elems;
67 int i = 0;
68 l->elems = (ccd_cblist_elem*) malloc(ml*sizeof(ccd_cblist_elem));
69 _MEMCHECK(l->elems);
70 for(i=0;i<(l->len);i++)
71 l->elems[i] = old_elems[i];
72 free(old_elems);
73 for(i=l->len;i<ml;i++) {
74 l->elems[i].next = i+1;
75 l->elems[i].prev = i-1;
77 l->elems[ml-1].next = -1;
78 l->elems[l->len].prev = -1;
79 l->maxlen = ml;
80 l->first_free = l->len;
85 /** Remove element referred to by given list index idx. */
86 static void remove_elem(ccd_cblist *l, int idx)
88 ccd_cblist_elem *e = l->elems;
89 /* remove lidx from the busy list */
90 if(e[idx].next != (-1))
91 e[e[idx].next].prev = e[idx].prev;
92 if(e[idx].prev != (-1))
93 e[e[idx].prev].next = e[idx].next;
94 if(idx==(l->first))
95 l->first = e[idx].next;
96 if(idx==(l->last))
97 l->last = e[idx].prev;
98 /* put lidx in the free list */
99 e[idx].prev = -1;
100 e[idx].next = l->first_free;
101 if(e[idx].next != (-1))
102 e[e[idx].next].prev = idx;
103 l->first_free = idx;
104 l->len--;
109 /** Remove n elements from the beginning of the list. */
110 static void remove_n_elems(ccd_cblist *l, int n)
112 int i;
113 if(n==0 || (l->len < n))
114 return;
115 for(i=0;i<n;i++) {
116 remove_elem(l, l->first);
122 /** Append callback to the given cblist, and return the index. */
123 static int append_elem(ccd_cblist *l, CcdVoidFn fn, void *arg, int pe)
125 int idx;
126 ccd_cblist_elem *e;
127 if(l->len == l->maxlen)
128 expand_cblist(l, l->maxlen*2);
129 idx = l->first_free;
130 e = l->elems;
131 l->first_free = e[idx].next;
132 e[idx].next = -1;
133 e[idx].prev = l->last;
134 if(l->first == (-1))
135 l->first = idx;
136 if(l->last != (-1))
137 e[l->last].next = idx;
138 l->last = idx;
139 e[idx].cb.fn = fn;
140 e[idx].cb.arg = arg;
141 e[idx].cb.pe = pe;
142 l->len++;
143 return idx;
149 * Trigger the callbacks in the provided callback list and *retain* them
150 * after they are called.
152 * Callbacks that are added after this function is started (e.g. callbacks
153 * registered from other callbacks) are ignored.
154 * @note: It is illegal to cancel callbacks from within ccd callbacks.
156 static void call_cblist_keep(ccd_cblist *l,double curWallTime)
158 int i, len = l->len, idx;
159 for(i=0, idx=l->first;i<len;i++) {
160 int old = CmiSwitchToPE(l->elems[idx].cb.pe);
161 (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
162 int unused = CmiSwitchToPE(old);
163 idx = l->elems[idx].next;
170 * Trigger the callbacks in the provided callback list and *remove* them
171 * from the list after they are called.
173 * Callbacks that are added after this function is started (e.g. callbacks
174 * registered from other callbacks) are ignored.
175 * @note: It is illegal to cancel callbacks from within ccd callbacks.
177 static void call_cblist_remove(ccd_cblist *l,double curWallTime)
179 int i, len = l->len, idx;
180 /* reentrant */
181 if (l->flag) return;
182 l->flag = 1;
183 #if ! CMK_BIGSIM_CHARM
184 for(i=0, idx=l->first;i<len;i++) {
185 int old = CmiSwitchToPE(l->elems[idx].cb.pe);
186 (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
187 int unused = CmiSwitchToPE(old);
188 idx = l->elems[idx].next;
190 #else
191 for(i=0, idx=l->last;i<len;i++) {
192 int old = CmiSwitchToPE(l->elems[idx].cb.pe);
193 (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
194 int unused = CmiSwitchToPE(old);
195 idx = l->elems[idx].prev;
197 #endif
198 remove_n_elems(l,len);
199 l->flag = 0;
204 #define CBLIST_INIT_LEN 8
205 #define MAXNUMCONDS (CcdUSERMAX + 1)
208 * Lists of conditional callbacks that are maintained by the scheduler
210 typedef struct {
211 ccd_cblist condcb[MAXNUMCONDS];
212 ccd_cblist condcb_keep[MAXNUMCONDS];
213 } ccd_cond_callbacks;
215 /***/
216 CpvStaticDeclare(ccd_cond_callbacks, conds);
219 // Default resolution of .005 seconds aka 5 milliseconds
220 #define CCD_DEFAULT_RESOLUTION 5.0e-3
222 /*Make sure this matches the CcdPERIODIC_* list in converse.h*/
223 #define CCD_PERIODIC_MAX 13
224 const static double periodicCallInterval[CCD_PERIODIC_MAX]=
225 {0.001, 0.010, 0.100, 1.0, 5.0, 10.0, 60.0, 2*60.0, 5*60.0, 10*60.0, 3600.0, 12*3600.0, 24*3600.0};
228 * List of periodic callbacks maintained by the scheduler
230 typedef struct {
231 int nSkip;/*Number of opportunities to skip*/
232 double lastCheck;/*Time of last check*/
233 double resolution;
234 double nextCall[CCD_PERIODIC_MAX];
235 } ccd_periodic_callbacks;
237 /** */
238 CpvStaticDeclare(ccd_periodic_callbacks, pcb);
239 CpvDeclare(int, _ccd_numchecks);
243 #define MAXTIMERHEAPENTRIES 128
246 * Structure used to manage callbacks in a heap
248 typedef struct {
249 double time;
250 ccd_callback cb;
251 } ccd_heap_elem;
254 /* Note : The heap is only stored in elements ccd_heap[0] to
255 * ccd_heap[ccd_heaplen]
258 /** An array of time-scheduled callbacks managed as a heap */
259 CpvStaticDeclare(ccd_heap_elem*, ccd_heap);
260 /** The length of the callback heap */
261 CpvStaticDeclare(int, ccd_heaplen);
262 /** The max allowed length of the callback heap */
263 CpvStaticDeclare(int, ccd_heapmaxlen);
267 /** Swap two elements on the heap */
268 static void ccd_heap_swap(int index1, int index2)
270 ccd_heap_elem *h = CpvAccess(ccd_heap);
271 ccd_heap_elem temp;
273 temp = h[index1];
274 h[index1] = h[index2];
275 h[index2] = temp;
281 * Expand the ccd_heap to make more room.
283 * Double the heap size and copy everything over. Initial 128 is reasonably
284 * big, so expanding won't happen often.
286 * Had a bug previously due to late expansion, should work now - Gengbin 12/4/03
288 static void expand_ccd_heap(void)
290 int i;
291 int oldlen = CpvAccess(ccd_heapmaxlen);
292 int newlen = oldlen*2;
293 ccd_heap_elem *newheap;
295 CmiPrintf("[%d] Warning: ccd_heap expand from %d to %d\n", CmiMyPe(),oldlen, newlen);
297 newheap = (ccd_heap_elem*) malloc(sizeof(ccd_heap_elem)*2*(newlen+1));
298 _MEMCHECK(newheap);
299 /* need to copy the second half part ??? */
300 for (i=0; i<=oldlen; i++) {
301 newheap[i] = CpvAccess(ccd_heap)[i];
302 newheap[i+newlen] = CpvAccess(ccd_heap)[i+oldlen];
304 free(CpvAccess(ccd_heap));
305 CpvAccess(ccd_heap) = newheap;
306 CpvAccess(ccd_heapmaxlen) = newlen;
312 * Insert a new callback into the heap
314 static void ccd_heap_insert(double t, CcdVoidFn fnp, void *arg, int pe)
316 int child, parent;
317 ccd_heap_elem *h;
319 if(CpvAccess(ccd_heaplen) >= CpvAccess(ccd_heapmaxlen)) {
320 /* CmiAbort("Heap overflow (InsertInHeap), exiting...\n"); */
321 expand_ccd_heap();
324 h = CpvAccess(ccd_heap);
327 ccd_heap_elem *e = &(h[++CpvAccess(ccd_heaplen)]);
328 e->time = t;
329 e->cb.fn = fnp;
330 e->cb.arg = arg;
331 e->cb.pe = pe;
332 child = CpvAccess(ccd_heaplen);
333 parent = child / 2;
334 while((parent>0) && (h[child].time<h[parent].time)) {
335 ccd_heap_swap(child, parent);
336 child = parent;
337 parent = parent / 2;
345 * Remove the top of the heap
347 static void ccd_heap_remove(void)
349 int parent,child;
350 ccd_heap_elem *h = CpvAccess(ccd_heap);
352 parent = 1;
353 if(CpvAccess(ccd_heaplen)>0) {
354 /* put deleted value at end of heap */
355 ccd_heap_swap(1,CpvAccess(ccd_heaplen));
356 CpvAccess(ccd_heaplen)--;
357 if(CpvAccess(ccd_heaplen)) {
358 /* if any left, then bubble up values */
359 child = 2 * parent;
360 while(child <= CpvAccess(ccd_heaplen)) {
361 if(((child + 1) <= CpvAccess(ccd_heaplen)) &&
362 (h[child].time > h[child+1].time))
363 child++; /* use the smaller of the two */
364 if(h[parent].time <= h[child].time)
365 break;
366 ccd_heap_swap(parent,child);
367 parent = child; /* go down the tree one more step */
368 child = 2 * child;
377 * Identify any (over)due callbacks that were scheduled
378 * and trigger them.
380 static void ccd_heap_update(double curWallTime)
382 ccd_heap_elem *h = CpvAccess(ccd_heap);
383 ccd_heap_elem *e = h+CpvAccess(ccd_heapmaxlen);
384 int i,ne=0;
385 /* Pull out all expired heap entries */
386 while ((CpvAccess(ccd_heaplen)>0) && (h[1].time<curWallTime)) {
387 e[ne++]=h[1];
388 ccd_heap_remove();
390 /* Now execute those heap entries. This must be
391 separated from the removal phase because executing
392 an entry may change the heap.
394 for (i=0;i<ne;i++) {
396 ccd_heap_elem *h = CpvAccess(ccd_heap);
397 ccd_heap_elem *e = h+CpvAccess(ccd_heapmaxlen);
399 int old = CmiSwitchToPE(e[i].cb.pe);
400 (*(e[i].cb.fn))(e[i].cb.arg,curWallTime);
401 int unused = CmiSwitchToPE(old);
407 void CcdCallBacksReset(void *ignored,double curWallTime);
410 * Initialize the callback containers
412 void CcdModuleInit(char **ignored)
414 int i;
415 double curTime;
416 CpvInitialize(ccd_heap_elem*, ccd_heap);
417 CpvInitialize(ccd_cond_callbacks, conds);
418 CpvInitialize(ccd_periodic_callbacks, pcb);
419 CpvInitialize(int, ccd_heaplen);
420 CpvInitialize(int, ccd_heapmaxlen);
421 CpvInitialize(int, _ccd_numchecks);
423 CpvAccess(ccd_heaplen) = 0;
424 CpvAccess(ccd_heapmaxlen) = MAXTIMERHEAPENTRIES;
425 CpvAccess(ccd_heap) =
426 (ccd_heap_elem*) malloc(sizeof(ccd_heap_elem)*2*(MAXTIMERHEAPENTRIES + 1));
427 _MEMCHECK(CpvAccess(ccd_heap));
428 for(i=0;i<MAXNUMCONDS;i++) {
429 init_cblist(&(CpvAccess(conds).condcb[i]), CBLIST_INIT_LEN);
430 init_cblist(&(CpvAccess(conds).condcb_keep[i]), CBLIST_INIT_LEN);
432 CpvAccess(_ccd_numchecks) = 1;
433 CpvAccess(pcb).nSkip = 1;
434 curTime=CmiWallTimer();
435 CpvAccess(pcb).lastCheck = curTime;
436 for (i=0;i<CCD_PERIODIC_MAX;i++)
437 CpvAccess(pcb).nextCall[i]=curTime+periodicCallInterval[i];
438 CpvAccess(pcb).resolution = CCD_DEFAULT_RESOLUTION;
439 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,CcdCallBacksReset,0);
440 CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,CcdCallBacksReset,0);
446 * Register a callback function that will be triggered when the specified
447 * condition is raised the next time
449 int CcdCallOnCondition(int condnum, CcdVoidFn fnp, void *arg)
451 CmiAssert(condnum < MAXNUMCONDS);
452 return append_elem(&(CpvAccess(conds).condcb[condnum]), fnp, arg, CcdIGNOREPE);
455 /**
456 * Register a callback function that will be triggered on the specified PE
457 * when the specified condition is raised the next time
459 int CcdCallOnConditionOnPE(int condnum, CcdVoidFn fnp, void *arg, int pe)
461 CmiAssert(condnum < MAXNUMCONDS);
462 return append_elem(&(CpvAccess(conds).condcb[condnum]), fnp, arg, pe);
466 * Register a callback function that will be triggered *whenever* the specified
467 * condition is raised
469 int CcdCallOnConditionKeep(int condnum, CcdVoidFn fnp, void *arg)
471 CmiAssert(condnum < MAXNUMCONDS);
472 return append_elem(&(CpvAccess(conds).condcb_keep[condnum]), fnp, arg, CcdIGNOREPE);
476 * Register a callback function that will be triggered on the specified PE
477 * *whenever* the specified condition is raised
479 int CcdCallOnConditionKeepOnPE(int condnum, CcdVoidFn fnp, void *arg, int pe)
481 CmiAssert(condnum < MAXNUMCONDS);
482 return append_elem(&(CpvAccess(conds).condcb_keep[condnum]), fnp, arg, pe);
487 * Cancel a previously registered conditional callback
489 void CcdCancelCallOnCondition(int condnum, int idx)
491 CmiAssert(condnum < MAXNUMCONDS);
492 remove_elem(&(CpvAccess(conds).condcb[condnum]), idx);
497 * Cancel a previously registered conditional callback
499 void CcdCancelCallOnConditionKeep(int condnum, int idx)
501 CmiAssert(condnum < MAXNUMCONDS);
502 remove_elem(&(CpvAccess(conds).condcb_keep[condnum]), idx);
507 * Register a callback function that will be triggered on the specified PE
508 * after a minimum delay of deltaT
510 void CcdCallFnAfterOnPE(CcdVoidFn fnp, void *arg, double deltaT, int pe)
512 double ctime = CmiWallTimer();
513 double tcall = ctime + deltaT/1000.0;
514 ccd_heap_insert(tcall, fnp, arg, pe);
518 * Register a callback function that will be triggered after a minimum
519 * delay of deltaT
521 void CcdCallFnAfter(CcdVoidFn fnp, void *arg, double deltaT)
523 CcdCallFnAfterOnPE(fnp, arg, deltaT, CcdIGNOREPE);
528 * Raise a condition causing all registered callbacks corresponding to
529 * that condition to be triggered
531 void CcdRaiseCondition(int condnum)
533 CmiAssert(condnum < MAXNUMCONDS);
534 double curWallTime=CmiWallTimer();
535 call_cblist_remove(&(CpvAccess(conds).condcb[condnum]),curWallTime);
536 call_cblist_keep(&(CpvAccess(conds).condcb_keep[condnum]),curWallTime);
541 * Internal helper function that updates the polling resolution for time
542 * based callbacks to minimum of two arguments and ensures appropriate
543 * counters etc are reset
545 double CcdSetMinResolution(double newResolution, double minResolution) {
546 ccd_periodic_callbacks* o = &CpvAccess(pcb);
547 double oldResolution = o->resolution;
549 o->resolution = fmin(newResolution, minResolution);
551 // Ensure we don't miss the new quantum
552 if (o->resolution < oldResolution) {
553 CcdCallBacksReset(NULL, CmiWallTimer());
556 return oldResolution;
560 * Set the polling resolution for time based callbacks
562 double CcdSetResolution(double newResolution) {
563 return CcdSetMinResolution(newResolution, CCD_DEFAULT_RESOLUTION);
567 * Reset the polling resolution for time based callbacks to its default value
569 double CcdResetResolution() {
570 ccd_periodic_callbacks* o = &CpvAccess(pcb);
571 double oldResolution = o->resolution;
573 o->resolution = CCD_DEFAULT_RESOLUTION;
575 return oldResolution;
579 * "Safe" operation that only ever increases the polling resolution for time
580 * based callbacks
582 double CcdIncreaseResolution(double newResolution) {
583 return CcdSetMinResolution(newResolution, CpvAccess(pcb).resolution);
587 * Trigger callbacks periodically, and also the time-indexed
588 * functions if their time has arrived
590 void CcdCallBacks(void)
592 int i;
593 ccd_periodic_callbacks *o=&CpvAccess(pcb);
595 /* Figure out how many times to skip Ccd processing */
596 double curWallTime = CmiWallTimer();
598 unsigned int nSkip=o->nSkip;
599 #if 1
600 /* Dynamically adjust the number of messages to skip */
601 double elapsed = curWallTime - o->lastCheck;
602 // Adjust the number of skipped messages by a multiple between .5, if we
603 // skipped too many messages last time, and 2, if we skipped too few.
604 // Ideally elapsed = resolution and we keep nSkip the same i.e. multiply by 1
605 if (elapsed > 0.0) {
606 nSkip = (int)(nSkip * fmax(0.5, fmin(2.0, o->resolution / elapsed)));
609 /* Keep skipping within a sensible range */
610 #define minSkip 1u
611 #define maxSkip 20u
612 if (nSkip<minSkip) nSkip=minSkip;
613 else if (nSkip>maxSkip) nSkip=maxSkip;
614 #else
615 /* Always skip a fixed number of messages */
616 nSkip=1;
617 #endif
619 CpvAccess(_ccd_numchecks)=o->nSkip=nSkip;
620 o->lastCheck=curWallTime;
622 ccd_heap_update(curWallTime);
624 for (i=0;i<CCD_PERIODIC_MAX;i++)
625 if (o->nextCall[i]<=curWallTime) {
626 CcdRaiseCondition(CcdPERIODIC+i);
627 o->nextCall[i]=curWallTime+periodicCallInterval[i];
629 else
630 break; /*<- because intervals are multiples of one another*/
636 * Called when something drastic changes-- restart ccd_num_checks
638 void CcdCallBacksReset(void *ignored,double curWallTime)
640 ccd_periodic_callbacks *o=&CpvAccess(pcb);
641 CpvAccess(_ccd_numchecks)=o->nSkip=1;
642 o->lastCheck=curWallTime;