8 * Structure to hold the requisites for a callback
10 typedef struct _ccd_callback
{
13 int pe
; /* the pe that sets the callback */
19 * An element (a single callback) in a list of callbacks
21 typedef struct _ccd_cblist_elem
{
30 * A list of callbacks stored as an array and handled like a list
32 typedef struct _ccd_cblist
{
33 unsigned short int maxlen
;
34 unsigned short int len
;
35 short int first
, last
;
38 ccd_cblist_elem
*elems
;
43 /** Initialize a list of callbacks. Alloc memory, set counters etc. */
44 static void init_cblist(ccd_cblist
*l
, unsigned int ml
)
47 l
->elems
= (ccd_cblist_elem
*) malloc(ml
*sizeof(ccd_cblist_elem
));
50 l
->elems
[i
].next
= i
+1;
51 l
->elems
[i
].prev
= i
-1;
53 l
->elems
[ml
-1].next
= -1;
56 l
->first
= l
->last
= -1;
63 /** Expand the callback list to a max length of ml */
64 static void expand_cblist(ccd_cblist
*l
, unsigned int ml
)
66 ccd_cblist_elem
*old_elems
= l
->elems
;
68 l
->elems
= (ccd_cblist_elem
*) malloc(ml
*sizeof(ccd_cblist_elem
));
70 for(i
=0;i
<(l
->len
);i
++)
71 l
->elems
[i
] = old_elems
[i
];
73 for(i
=l
->len
;i
<ml
;i
++) {
74 l
->elems
[i
].next
= i
+1;
75 l
->elems
[i
].prev
= i
-1;
77 l
->elems
[ml
-1].next
= -1;
78 l
->elems
[l
->len
].prev
= -1;
80 l
->first_free
= l
->len
;
85 /** Remove element referred to by given list index idx. */
86 static void remove_elem(ccd_cblist
*l
, int idx
)
88 ccd_cblist_elem
*e
= l
->elems
;
89 /* remove lidx from the busy list */
90 if(e
[idx
].next
!= (-1))
91 e
[e
[idx
].next
].prev
= e
[idx
].prev
;
92 if(e
[idx
].prev
!= (-1))
93 e
[e
[idx
].prev
].next
= e
[idx
].next
;
95 l
->first
= e
[idx
].next
;
97 l
->last
= e
[idx
].prev
;
98 /* put lidx in the free list */
100 e
[idx
].next
= l
->first_free
;
101 if(e
[idx
].next
!= (-1))
102 e
[e
[idx
].next
].prev
= idx
;
109 /** Remove n elements from the beginning of the list. */
110 static void remove_n_elems(ccd_cblist
*l
, int n
)
113 if(n
==0 || (l
->len
< n
))
116 remove_elem(l
, l
->first
);
122 /** Append callback to the given cblist, and return the index. */
123 static int append_elem(ccd_cblist
*l
, CcdVoidFn fn
, void *arg
, int pe
)
127 if(l
->len
== l
->maxlen
)
128 expand_cblist(l
, l
->maxlen
*2);
131 l
->first_free
= e
[idx
].next
;
133 e
[idx
].prev
= l
->last
;
137 e
[l
->last
].next
= idx
;
149 * Trigger the callbacks in the provided callback list and *retain* them
150 * after they are called.
152 * Callbacks that are added after this function is started (e.g. callbacks
153 * registered from other callbacks) are ignored.
154 * @note: It is illegal to cancel callbacks from within ccd callbacks.
156 static void call_cblist_keep(ccd_cblist
*l
,double curWallTime
)
158 int i
, len
= l
->len
, idx
;
159 for(i
=0, idx
=l
->first
;i
<len
;i
++) {
160 int old
= CmiSwitchToPE(l
->elems
[idx
].cb
.pe
);
161 (*(l
->elems
[idx
].cb
.fn
))(l
->elems
[idx
].cb
.arg
,curWallTime
);
162 int unused
= CmiSwitchToPE(old
);
163 idx
= l
->elems
[idx
].next
;
170 * Trigger the callbacks in the provided callback list and *remove* them
171 * from the list after they are called.
173 * Callbacks that are added after this function is started (e.g. callbacks
174 * registered from other callbacks) are ignored.
175 * @note: It is illegal to cancel callbacks from within ccd callbacks.
177 static void call_cblist_remove(ccd_cblist
*l
,double curWallTime
)
179 int i
, len
= l
->len
, idx
;
183 #if ! CMK_BIGSIM_CHARM
184 for(i
=0, idx
=l
->first
;i
<len
;i
++) {
185 int old
= CmiSwitchToPE(l
->elems
[idx
].cb
.pe
);
186 (*(l
->elems
[idx
].cb
.fn
))(l
->elems
[idx
].cb
.arg
,curWallTime
);
187 int unused
= CmiSwitchToPE(old
);
188 idx
= l
->elems
[idx
].next
;
191 for(i
=0, idx
=l
->last
;i
<len
;i
++) {
192 int old
= CmiSwitchToPE(l
->elems
[idx
].cb
.pe
);
193 (*(l
->elems
[idx
].cb
.fn
))(l
->elems
[idx
].cb
.arg
,curWallTime
);
194 int unused
= CmiSwitchToPE(old
);
195 idx
= l
->elems
[idx
].prev
;
198 remove_n_elems(l
,len
);
204 #define CBLIST_INIT_LEN 8
205 #define MAXNUMCONDS (CcdUSERMAX + 1)
208 * Lists of conditional callbacks that are maintained by the scheduler
211 ccd_cblist condcb
[MAXNUMCONDS
];
212 ccd_cblist condcb_keep
[MAXNUMCONDS
];
213 } ccd_cond_callbacks
;
216 CpvStaticDeclare(ccd_cond_callbacks
, conds
);
219 // Default resolution of .005 seconds aka 5 milliseconds
220 #define CCD_DEFAULT_RESOLUTION 5.0e-3
222 /*Make sure this matches the CcdPERIODIC_* list in converse.h*/
223 #define CCD_PERIODIC_MAX 13
224 const static double periodicCallInterval
[CCD_PERIODIC_MAX
]=
225 {0.001, 0.010, 0.100, 1.0, 5.0, 10.0, 60.0, 2*60.0, 5*60.0, 10*60.0, 3600.0, 12*3600.0, 24*3600.0};
228 * List of periodic callbacks maintained by the scheduler
231 int nSkip
;/*Number of opportunities to skip*/
232 double lastCheck
;/*Time of last check*/
234 double nextCall
[CCD_PERIODIC_MAX
];
235 } ccd_periodic_callbacks
;
238 CpvStaticDeclare(ccd_periodic_callbacks
, pcb
);
239 CpvDeclare(int, _ccd_numchecks
);
243 #define MAXTIMERHEAPENTRIES 128
246 * Structure used to manage callbacks in a heap
254 /* Note : The heap is only stored in elements ccd_heap[0] to
255 * ccd_heap[ccd_heaplen]
258 /** An array of time-scheduled callbacks managed as a heap */
259 CpvStaticDeclare(ccd_heap_elem
*, ccd_heap
);
260 /** The length of the callback heap */
261 CpvStaticDeclare(int, ccd_heaplen
);
262 /** The max allowed length of the callback heap */
263 CpvStaticDeclare(int, ccd_heapmaxlen
);
267 /** Swap two elements on the heap */
268 static void ccd_heap_swap(int index1
, int index2
)
270 ccd_heap_elem
*h
= CpvAccess(ccd_heap
);
274 h
[index1
] = h
[index2
];
281 * Expand the ccd_heap to make more room.
283 * Double the heap size and copy everything over. Initial 128 is reasonably
284 * big, so expanding won't happen often.
286 * Had a bug previously due to late expansion, should work now - Gengbin 12/4/03
288 static void expand_ccd_heap(void)
291 int oldlen
= CpvAccess(ccd_heapmaxlen
);
292 int newlen
= oldlen
*2;
293 ccd_heap_elem
*newheap
;
295 CmiPrintf("[%d] Warning: ccd_heap expand from %d to %d\n", CmiMyPe(),oldlen
, newlen
);
297 newheap
= (ccd_heap_elem
*) malloc(sizeof(ccd_heap_elem
)*2*(newlen
+1));
299 /* need to copy the second half part ??? */
300 for (i
=0; i
<=oldlen
; i
++) {
301 newheap
[i
] = CpvAccess(ccd_heap
)[i
];
302 newheap
[i
+newlen
] = CpvAccess(ccd_heap
)[i
+oldlen
];
304 free(CpvAccess(ccd_heap
));
305 CpvAccess(ccd_heap
) = newheap
;
306 CpvAccess(ccd_heapmaxlen
) = newlen
;
312 * Insert a new callback into the heap
314 static void ccd_heap_insert(double t
, CcdVoidFn fnp
, void *arg
, int pe
)
319 if(CpvAccess(ccd_heaplen
) >= CpvAccess(ccd_heapmaxlen
)) {
320 /* CmiAbort("Heap overflow (InsertInHeap), exiting...\n"); */
324 h
= CpvAccess(ccd_heap
);
327 ccd_heap_elem
*e
= &(h
[++CpvAccess(ccd_heaplen
)]);
332 child
= CpvAccess(ccd_heaplen
);
334 while((parent
>0) && (h
[child
].time
<h
[parent
].time
)) {
335 ccd_heap_swap(child
, parent
);
345 * Remove the top of the heap
347 static void ccd_heap_remove(void)
350 ccd_heap_elem
*h
= CpvAccess(ccd_heap
);
353 if(CpvAccess(ccd_heaplen
)>0) {
354 /* put deleted value at end of heap */
355 ccd_heap_swap(1,CpvAccess(ccd_heaplen
));
356 CpvAccess(ccd_heaplen
)--;
357 if(CpvAccess(ccd_heaplen
)) {
358 /* if any left, then bubble up values */
360 while(child
<= CpvAccess(ccd_heaplen
)) {
361 if(((child
+ 1) <= CpvAccess(ccd_heaplen
)) &&
362 (h
[child
].time
> h
[child
+1].time
))
363 child
++; /* use the smaller of the two */
364 if(h
[parent
].time
<= h
[child
].time
)
366 ccd_heap_swap(parent
,child
);
367 parent
= child
; /* go down the tree one more step */
377 * Identify any (over)due callbacks that were scheduled
380 static void ccd_heap_update(double curWallTime
)
382 ccd_heap_elem
*h
= CpvAccess(ccd_heap
);
383 ccd_heap_elem
*e
= h
+CpvAccess(ccd_heapmaxlen
);
385 /* Pull out all expired heap entries */
386 while ((CpvAccess(ccd_heaplen
)>0) && (h
[1].time
<curWallTime
)) {
390 /* Now execute those heap entries. This must be
391 separated from the removal phase because executing
392 an entry may change the heap.
396 ccd_heap_elem *h = CpvAccess(ccd_heap);
397 ccd_heap_elem *e = h+CpvAccess(ccd_heapmaxlen);
399 int old
= CmiSwitchToPE(e
[i
].cb
.pe
);
400 (*(e
[i
].cb
.fn
))(e
[i
].cb
.arg
,curWallTime
);
401 int unused
= CmiSwitchToPE(old
);
407 void CcdCallBacksReset(void *ignored
,double curWallTime
);
410 * Initialize the callback containers
412 void CcdModuleInit(char **ignored
)
416 CpvInitialize(ccd_heap_elem
*, ccd_heap
);
417 CpvInitialize(ccd_cond_callbacks
, conds
);
418 CpvInitialize(ccd_periodic_callbacks
, pcb
);
419 CpvInitialize(int, ccd_heaplen
);
420 CpvInitialize(int, ccd_heapmaxlen
);
421 CpvInitialize(int, _ccd_numchecks
);
423 CpvAccess(ccd_heaplen
) = 0;
424 CpvAccess(ccd_heapmaxlen
) = MAXTIMERHEAPENTRIES
;
425 CpvAccess(ccd_heap
) =
426 (ccd_heap_elem
*) malloc(sizeof(ccd_heap_elem
)*2*(MAXTIMERHEAPENTRIES
+ 1));
427 _MEMCHECK(CpvAccess(ccd_heap
));
428 for(i
=0;i
<MAXNUMCONDS
;i
++) {
429 init_cblist(&(CpvAccess(conds
).condcb
[i
]), CBLIST_INIT_LEN
);
430 init_cblist(&(CpvAccess(conds
).condcb_keep
[i
]), CBLIST_INIT_LEN
);
432 CpvAccess(_ccd_numchecks
) = 1;
433 CpvAccess(pcb
).nSkip
= 1;
434 curTime
=CmiWallTimer();
435 CpvAccess(pcb
).lastCheck
= curTime
;
436 for (i
=0;i
<CCD_PERIODIC_MAX
;i
++)
437 CpvAccess(pcb
).nextCall
[i
]=curTime
+periodicCallInterval
[i
];
438 CpvAccess(pcb
).resolution
= CCD_DEFAULT_RESOLUTION
;
439 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE
,CcdCallBacksReset
,0);
440 CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE
,CcdCallBacksReset
,0);
446 * Register a callback function that will be triggered when the specified
447 * condition is raised the next time
449 int CcdCallOnCondition(int condnum
, CcdVoidFn fnp
, void *arg
)
451 CmiAssert(condnum
< MAXNUMCONDS
);
452 return append_elem(&(CpvAccess(conds
).condcb
[condnum
]), fnp
, arg
, CcdIGNOREPE
);
456 * Register a callback function that will be triggered on the specified PE
457 * when the specified condition is raised the next time
459 int CcdCallOnConditionOnPE(int condnum
, CcdVoidFn fnp
, void *arg
, int pe
)
461 CmiAssert(condnum
< MAXNUMCONDS
);
462 return append_elem(&(CpvAccess(conds
).condcb
[condnum
]), fnp
, arg
, pe
);
466 * Register a callback function that will be triggered *whenever* the specified
467 * condition is raised
469 int CcdCallOnConditionKeep(int condnum
, CcdVoidFn fnp
, void *arg
)
471 CmiAssert(condnum
< MAXNUMCONDS
);
472 return append_elem(&(CpvAccess(conds
).condcb_keep
[condnum
]), fnp
, arg
, CcdIGNOREPE
);
476 * Register a callback function that will be triggered on the specified PE
477 * *whenever* the specified condition is raised
479 int CcdCallOnConditionKeepOnPE(int condnum
, CcdVoidFn fnp
, void *arg
, int pe
)
481 CmiAssert(condnum
< MAXNUMCONDS
);
482 return append_elem(&(CpvAccess(conds
).condcb_keep
[condnum
]), fnp
, arg
, pe
);
487 * Cancel a previously registered conditional callback
489 void CcdCancelCallOnCondition(int condnum
, int idx
)
491 CmiAssert(condnum
< MAXNUMCONDS
);
492 remove_elem(&(CpvAccess(conds
).condcb
[condnum
]), idx
);
497 * Cancel a previously registered conditional callback
499 void CcdCancelCallOnConditionKeep(int condnum
, int idx
)
501 CmiAssert(condnum
< MAXNUMCONDS
);
502 remove_elem(&(CpvAccess(conds
).condcb_keep
[condnum
]), idx
);
507 * Register a callback function that will be triggered on the specified PE
508 * after a minimum delay of deltaT
510 void CcdCallFnAfterOnPE(CcdVoidFn fnp
, void *arg
, double deltaT
, int pe
)
512 double ctime
= CmiWallTimer();
513 double tcall
= ctime
+ deltaT
/1000.0;
514 ccd_heap_insert(tcall
, fnp
, arg
, pe
);
518 * Register a callback function that will be triggered after a minimum
521 void CcdCallFnAfter(CcdVoidFn fnp
, void *arg
, double deltaT
)
523 CcdCallFnAfterOnPE(fnp
, arg
, deltaT
, CcdIGNOREPE
);
528 * Raise a condition causing all registered callbacks corresponding to
529 * that condition to be triggered
531 void CcdRaiseCondition(int condnum
)
533 CmiAssert(condnum
< MAXNUMCONDS
);
534 double curWallTime
=CmiWallTimer();
535 call_cblist_remove(&(CpvAccess(conds
).condcb
[condnum
]),curWallTime
);
536 call_cblist_keep(&(CpvAccess(conds
).condcb_keep
[condnum
]),curWallTime
);
541 * Internal helper function that updates the polling resolution for time
542 * based callbacks to minimum of two arguments and ensures appropriate
543 * counters etc are reset
545 double CcdSetMinResolution(double newResolution
, double minResolution
) {
546 ccd_periodic_callbacks
* o
= &CpvAccess(pcb
);
547 double oldResolution
= o
->resolution
;
549 o
->resolution
= fmin(newResolution
, minResolution
);
551 // Ensure we don't miss the new quantum
552 if (o
->resolution
< oldResolution
) {
553 CcdCallBacksReset(NULL
, CmiWallTimer());
556 return oldResolution
;
560 * Set the polling resolution for time based callbacks
562 double CcdSetResolution(double newResolution
) {
563 return CcdSetMinResolution(newResolution
, CCD_DEFAULT_RESOLUTION
);
567 * Reset the polling resolution for time based callbacks to its default value
569 double CcdResetResolution() {
570 ccd_periodic_callbacks
* o
= &CpvAccess(pcb
);
571 double oldResolution
= o
->resolution
;
573 o
->resolution
= CCD_DEFAULT_RESOLUTION
;
575 return oldResolution
;
579 * "Safe" operation that only ever increases the polling resolution for time
582 double CcdIncreaseResolution(double newResolution
) {
583 return CcdSetMinResolution(newResolution
, CpvAccess(pcb
).resolution
);
587 * Trigger callbacks periodically, and also the time-indexed
588 * functions if their time has arrived
590 void CcdCallBacks(void)
593 ccd_periodic_callbacks
*o
=&CpvAccess(pcb
);
595 /* Figure out how many times to skip Ccd processing */
596 double curWallTime
= CmiWallTimer();
598 unsigned int nSkip
=o
->nSkip
;
600 /* Dynamically adjust the number of messages to skip */
601 double elapsed
= curWallTime
- o
->lastCheck
;
602 // Adjust the number of skipped messages by a multiple between .5, if we
603 // skipped too many messages last time, and 2, if we skipped too few.
604 // Ideally elapsed = resolution and we keep nSkip the same i.e. multiply by 1
606 nSkip
= (int)(nSkip
* fmax(0.5, fmin(2.0, o
->resolution
/ elapsed
)));
609 /* Keep skipping within a sensible range */
612 if (nSkip
<minSkip
) nSkip
=minSkip
;
613 else if (nSkip
>maxSkip
) nSkip
=maxSkip
;
615 /* Always skip a fixed number of messages */
619 CpvAccess(_ccd_numchecks
)=o
->nSkip
=nSkip
;
620 o
->lastCheck
=curWallTime
;
622 ccd_heap_update(curWallTime
);
624 for (i
=0;i
<CCD_PERIODIC_MAX
;i
++)
625 if (o
->nextCall
[i
]<=curWallTime
) {
626 CcdRaiseCondition(CcdPERIODIC
+i
);
627 o
->nextCall
[i
]=curWallTime
+periodicCallInterval
[i
];
630 break; /*<- because intervals are multiples of one another*/
636 * Called when something drastic changes-- restart ccd_num_checks
638 void CcdCallBacksReset(void *ignored
,double curWallTime
)
640 ccd_periodic_callbacks
*o
=&CpvAccess(pcb
);
641 CpvAccess(_ccd_numchecks
)=o
->nSkip
=1;
642 o
->lastCheck
=curWallTime
;