Enable topology (or physical node) aware spanning tree for array reductions
[charm.git] / src / ck-core / ckreduction.h
blobc912b9ec5adcf670c2aa37997ceef0f9081f9092
1 /*
2 Charm++ File: Reduction Library
3 added 3/27/2000 by Orion Sky Lawlor, olawlor@acm.org
4 modified 02/21/2003 by Sayantan Chakravorty
7 A reduction takes some sort of inputs (contributions)
8 from some set of objects scattered across all PE's,
9 and combines (reduces) all the contributions onto one
10 PE. This library provides several different kinds of
11 combination routines (reducers), and various utilities
12 for supporting them.
14 The calls needed to use the reduction manager are:
15 -Create with CProxy_CkReduction::ckNew.
18 #ifndef _CKREDUCTION_H
19 #define _CKREDUCTION_H
21 #include "CkReduction.decl.h"
23 #ifdef _PIPELINED_ALLREDUCE_
24 #define FRAG_SIZE 131072
25 #define FRAG_THRESHOLD 131072
26 #endif
28 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
29 #define MAX_INT 5000000
30 #define _MLOG_REDUCE_P2P_ 0
31 #endif
33 //This message is sent between group objects on a single PE
34 // to let each know the other has been created.
35 class CkGroupCallbackMsg:public CMessage_CkGroupCallbackMsg {
36 public:
37 typedef void (*callbackType)(void *param);
38 CkGroupCallbackMsg(callbackType Ncallback,void *Nparam)
39 {callback=Ncallback;param=Nparam;}
40 void call(void) {(*callback)(param);}
41 private:
42 callbackType callback;
43 void *param;
46 class CkGroupInitCallback : public IrrGroup {
47 public:
48 CkGroupInitCallback(void);
49 CkGroupInitCallback(CkMigrateMessage *m):IrrGroup(m) {}
50 void callMeBack(CkGroupCallbackMsg *m);
51 void pup(PUP::er& p){ IrrGroup::pup(p); }
55 class CkGroupReadyCallback : public IrrGroup {
56 private:
57 bool _isReady;
58 CkQ<CkGroupCallbackMsg *> _msgs;
59 void callBuffered(void);
60 public:
61 CkGroupReadyCallback(void);
62 CkGroupReadyCallback(CkMigrateMessage *m):IrrGroup(m) {}
63 void callMeBack(CkGroupCallbackMsg *m);
64 bool isReady(void) { return _isReady; }
65 protected:
66 void setReady(void) {_isReady = true; callBuffered(); }
67 void setNotReady(void) {_isReady = false; }
70 class CkReductionNumberMsg:public CMessage_CkReductionNumberMsg {
71 public:
72 int num;
73 CkReductionNumberMsg(int n) {num=n;}
77 class CkReductionInactiveMsg:public CMessage_CkReductionInactiveMsg {
78 public:
79 int id, redno;
80 CkReductionInactiveMsg(int i, int r) {id=i; redno = r;}
84 /**some data classes used by both ckreductionmgr and cknodereductionmgr**/
85 class contributorInfo {
86 public:
87 int redNo;//Current reduction number
88 contributorInfo() {redNo=0;}
89 //Migration utilities:
90 void pup(PUP::er &p);
93 class countAdjustment {
94 public:
95 int gcount;//Adjustment to global count (applied at reduction end)
96 int lcount;//Adjustment to local count (applied continually)
97 countAdjustment(int ignored=0) {(void)ignored; gcount=0; lcount=0;}
98 void pup(PUP::er& p){ p|gcount; p|lcount; }
101 /** @todo: Fwd decl for a temporary class. Remove after
102 * delegated cross-array reductions are implemented more optimally
104 namespace ck { namespace impl { class XArraySectionReducer; } }
106 //CkReduction is just a "namespace class" for the user-visible
107 // parts of the reduction system.
108 class CkReduction {
109 public:
110 /*These are the reducers you can use,
111 in addition to any user-defined reducers.*/
113 /* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
114 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
116 remember to update CkReduction::reducerTable
118 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
119 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! */
121 typedef enum {
122 //A placeholder invalid reduction type
123 invalid=0,
124 nop,
125 //Compute the sum the numbers passed by each element.
126 sum_char,sum_short,sum_int,sum_long,sum_long_long,
127 sum_uchar,sum_ushort,sum_uint,sum_ulong,
128 sum_ulong_long,sum_float,sum_double,
130 //Compute the product the numbers passed by each element.
131 product_char,product_short,product_int,product_long,product_long_long,
132 product_uchar,product_ushort,product_uint,product_ulong,
133 product_ulong_long,product_float,product_double,
135 //Compute the largest number passed by any element.
136 max_char,max_short,max_int,max_long,max_long_long,
137 max_uchar,max_ushort,max_uint,max_ulong,
138 max_ulong_long,max_float,max_double,
140 //Compute the smallest number passed by any element.
141 min_char,min_short,min_int,min_long,min_long_long,
142 min_uchar,min_ushort,min_uint,min_ulong,
143 min_ulong_long,min_float,min_double,
145 //Compute the logical AND of the values passed by each element.
146 // The resulting value will be zero if any source value is zero.
147 logical_and, // Deprecated: same as logical_and_int
148 logical_and_int,logical_and_bool,
150 //Compute the logical OR of the values passed by each element.
151 // The resulting value will be 1 if any source value is nonzero.
152 logical_or, // Deprecated: same as logical_or_int
153 logical_or_int,logical_or_bool,
155 //Compute the logical XOR of the values passed by each element.
156 // The resulting value will be 1 if an odd number of source value is nonzero.
157 logical_xor_int,logical_xor_bool,
159 // Compute the logical bitvector AND of the values passed by each element.
160 bitvec_and, // Deprecated: same as bitvec_and_int
161 bitvec_and_int,bitvec_and_bool,
163 // Compute the logical bitvector OR of the values passed by each element.
164 bitvec_or, // Deprecated: same as bitvec_or_int
165 bitvec_or_int,bitvec_or_bool,
167 // Compute the logical bitvector XOR of the values passed by each element.
168 bitvec_xor, // Deprecated: same as bitvec_xor_int
169 bitvec_xor_int,bitvec_xor_bool,
171 // Select one message at random to pass on
172 random,
174 //Concatenate the (arbitrary) data passed by each element
175 concat,
177 //Combine the data passed by each element into an list of setElements.
178 // Each element may contribute arbitrary data (with arbitrary length).
179 set,
181 // Calculate the count, mean, and variance / standard deviation of the data
182 statistics,
184 // Combine multiple data/reducer pairs into one reduction
185 tuple
186 } reducerType;
188 //This structure is used with the set reducer above,
189 // and contains the data from one contribution.
190 class setElement {
191 public:
192 int dataSize;//The allocated length of the `data' array, in bytes
193 char data[1];//The beginning of the array of data
194 //Utility routine: get the next setElement,
195 // or return NULL if there are none.
196 setElement *next(void);
199 // Structure containing the payload of a statistics reduction
200 struct statisticsElement {
201 int count;
202 double mean;
203 double m2;
204 statisticsElement(double initialValue);
205 double variance() const { return count > 1 ? m2 / (double(count) - 1.0) : 0.0; }
206 double stddev() const { return sqrt(variance()); }
209 struct tupleElement {
210 size_t dataSize;
211 char* data;
212 CkReduction::reducerType reducer;
213 bool owns_data;
214 tupleElement();
215 tupleElement(size_t dataSize, void* data, CkReduction::reducerType reducer);
216 tupleElement(CkReduction::tupleElement&& rhs_move);
217 tupleElement& operator=(CkReduction::tupleElement&& rhs_move);
218 ~tupleElement();
220 inline void* getData(void) { return data; }
221 void pup(PUP::er &p);
224 //Support for adding new reducerTypes:
225 //A reducerFunction is used to combine several contributions
226 //into a single summed contribution:
227 // nMsg gives the number of messages to reduce.
228 // msgs[i] contains a contribution or summed contribution.
229 typedef CkReductionMsg *(*reducerFn)(int nMsg,CkReductionMsg **msgs);
231 struct reducerStruct {
232 reducerFn fn;
233 bool streamable;
234 reducerStruct(reducerFn f=NULL, bool s=false) : fn(f), streamable(s) {}
237 //Add the given reducer to the list. Returns the new reducer's
238 // reducerType. Must be called in the same order on every node.
239 static reducerType addReducer(reducerFn fn, bool streamable=false);
241 private:
242 friend class CkReductionMgr;
243 friend class CkNodeReductionMgr;
244 friend class CkMulticastMgr;
245 friend class ck::impl::XArraySectionReducer;
246 //System-level interface
248 //Reducer table: maps reducerTypes to reducerFns.
249 static std::vector<reducerStruct>& reducerTable();
250 static std::vector<reducerStruct> initReducerTable();
252 // tupleReduction needs access to the reducerTable that lives in this namespace
253 // so it is not a standalone function in ckreduction.C like other reduction implementations
254 static CkReductionMsg* tupleReduction(int nMsgs, CkReductionMsg** msgs);
256 //Don't instantiate a CkReduction object-- it's just a namespace.
257 CkReduction();
259 PUPbytes(CkReduction::reducerType)
261 //A CkReductionMsg is sent up the reduction tree-- it
262 // carries a contribution, or several reduced contributions.
263 class CkReductionMsg : public CMessage_CkReductionMsg
265 friend class CkReduction;
266 friend class CkReductionMgr;
267 friend class CkNodeReductionMgr;
268 friend class CkMulticastMgr;
269 #ifdef _PIPELINED_ALLREDUCE_
270 friend class ArrayElement;
271 friend class AllreduceMgr;
272 #endif
273 friend class ck::impl::XArraySectionReducer;
274 public:
276 //Publically-accessible fields:
277 //"Constructor"-- builds and returns a new CkReductionMsg.
278 // the "srcData" array you specify will be copied into this object (unless NULL).
279 static CkReductionMsg *buildNew(int NdataSize,const void *srcData,
280 CkReduction::reducerType reducer=CkReduction::invalid,
281 CkReductionMsg *buf = NULL);
283 inline int getLength(void) const {return dataSize;}
284 inline int getSize(void) const {return dataSize;}
285 inline void *getData(void) {return data;}
286 inline const void *getData(void) const {return data;}
288 inline int getGcount(void){return gcount;}
289 inline CkReduction::reducerType getReducer(void){return reducer;}
290 inline int getRedNo(void){return redNo;}
292 inline CMK_REFNUM_TYPE getUserFlag(void) const {return userFlag;}
293 inline void setUserFlag(CMK_REFNUM_TYPE f) { userFlag=f;}
295 inline void setCallback(const CkCallback &cb) { callback=cb; }
297 //Return true if this message came straight from a contribute call--
298 // if it didn't come from a previous reduction function.
299 inline bool isFromUser(void) const {return sourceFlag==-1;}
301 inline bool isMigratableContributor(void) const {return migratableContributor;}
302 inline void setMigratableContributor(bool _mig){ migratableContributor = _mig;}
304 // Tuple reduction
305 static CkReductionMsg* buildFromTuple(CkReduction::tupleElement* reductions, int num_reductions);
306 void toTuple(CkReduction::tupleElement** out_reductions, int* num_reductions);
308 ~CkReductionMsg();
310 //Implementation-only fields (don't access these directly!)
311 //Msg runtime support
312 static void *alloc(int msgnum, size_t size, int *reqSize, int priobits);
313 static void *pack(CkReductionMsg *);
314 static CkReductionMsg *unpack(void *in);
316 #if CMK_BIGSIM_CHARM
317 /* AMPI reductions use bare CkReductionMsg's instead of AmpiMsg's */
318 void *event; // the event point that corresponds to this message
319 int eventPe; // the PE that the event is located on
320 #endif
322 private:
323 int dataSize;//Length of array below, in bytes
324 void *data;//Reduction data
325 CMK_REFNUM_TYPE userFlag; //Some sort of identifying flag, for client use
326 CkCallback callback; //What to do when done
327 bool migratableContributor; // are the contributors migratable
329 int sourceFlag;/*Flag:
330 0 indicates this is a placeholder message (meaning: nothing to report)
331 -1 indicates this is a single (non-reduced) contribution.
332 >0 indicates this is a reduced contribution.
334 int nSources(void) {return sourceFlag<0?-sourceFlag:sourceFlag;}
335 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_ )
336 int sourceProcessorCount;
337 #endif
338 int fromPE;
339 private:
340 #if CMK_BIGSIM_CHARM
341 void *log;
342 #endif
343 CkReduction::reducerType reducer;
344 //contributorInfo *ci;//Source contributor, or NULL if none
345 int redNo;//The serial number of this reduction
346 int gcount;//Contribution to the global contributor count
347 // for section multicast/reduction library
348 CkSectionInfo sid; // section cookie for multicast
349 char rebuilt; // indicate if the multicast tree needs rebuilt
350 int nFrags;
351 int fragNo; // fragment of a reduction msg (when pipelined)
352 // value = 0 to nFrags-1
353 double dataStorage;//Start of data array (so it's double-aligned)
355 //Default constructor is private so you must use "buildNew", above
356 CkReductionMsg();
360 #define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
361 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
362 CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
363 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
364 const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
365 void contribute(CkReductionMsg *msg); \
366 void contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
367 void contribute(CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);
369 #define CK_BARRIER_CONTRIBUTE_METHODS_DECL \
370 void barrier(const CkCallback &cb);\
373 * One CkReductionMgr runs a non-overlapping set of reductions.
374 * It collects messages from all local contributors, then sends
375 * the reduced message up the reduction tree to node zero, where
376 * they're passed to the user's client function.
378 class CkNodeReductionMgr : public IrrGroup {
379 public:
380 CProxy_CkNodeReductionMgr thisProxy;
381 public:
382 CkNodeReductionMgr(void);
383 CkNodeReductionMgr(CkMigrateMessage *m) : IrrGroup(m) {
384 storedCallback = NULL;
386 ~CkNodeReductionMgr();
388 typedef CkReductionClientFn clientFn;
391 * Add the given client function. Overwrites any previous client.
392 * This manager will dispose of the callback when replaced or done.
394 void ckSetReductionClient(CkCallback *cb);
396 //Contribute-- the given msg can contain any data. The reducerType
397 // field of the message must be valid.
398 // Each contributor must contribute exactly once to each reduction.
399 void contribute(contributorInfo *ci,CkReductionMsg *msg);
400 void contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count);
401 //Communication (library-private)
402 //Sent up the reduction tree with reduced data
403 void RecvMsg(CkReductionMsg *m);
404 void doRecvMsg(CkReductionMsg *m);
405 void LateMigrantMsg(CkReductionMsg *m);
407 virtual void flushStates(); // flush state varaibles
409 virtual int getTotalGCount(){return 0;};
411 private:
412 //Data members
413 //Stored callback function (may be NULL if none has been set)
414 CkCallback *storedCallback;
416 int redNo;//Number of current reduction (incremented at end)
417 bool inProgress;//Is a reduction started, but not complete?
418 bool creating;//Are elements still being created?
419 bool startRequested;//Should we start the next reduction when creation finished?
420 int gcount;
421 int lcount;//Number of local contributors
423 //Current local and remote contributions
424 int nContrib,nRemote;
425 //Contributions queued for the current reduction
426 CkMsgQ<CkReductionMsg> msgs;
427 //Contributions queued for future reductions (sent to us too early)
428 CkMsgQ<CkReductionMsg> futureMsgs;
429 //Remote messages queued for future reductions (sent to us too early)
430 CkMsgQ<CkReductionMsg> futureRemoteMsgs;
431 //Late migrant messages queued for future reductions
432 CkMsgQ<CkReductionMsg> futureLateMigrantMsgs;
434 //My Big LOCK
435 CmiNodeLock lockEverything;
437 bool interrupt; /* flag for use in non-smp: false means interrupt can occur, true means not (also acts as a lock) */
439 /*vector storing the children of this node*/
440 CkVec<int> kids;
442 //State:
443 void startReduction(int number,int srcPE);
444 void doAddContribution(CkReductionMsg *m);
445 void finishReduction(void);
446 protected:
447 void addContribution(CkReductionMsg *m);
449 private:
451 //Reduction tree utilities
452 /* for binomial trees*/
453 unsigned upperSize;
454 unsigned label;
455 int parent;
456 int numKids;
457 // int *kids;
458 void init_BinomialTree();
460 void init_TopoTree();
461 void init_BinaryTree();
462 enum {TREE_WID=2};
463 int treeRoot(void);//Root PE
464 bool hasParent(void);
465 int treeParent(void);//My parent PE
466 int firstKid(void);//My first child PE
467 int treeKids(void);//Number of children in tree
469 //Combine (& free) the current message vector.
470 CkReductionMsg *reduceMessages(void);
472 //Map reduction number to a time
473 bool isPast(int num) const {return (bool)(num<redNo);}
474 bool isPresent(int num) const {return (bool)(num==redNo);}
475 bool isFuture(int num) const {return (bool)(num>redNo);}
477 /*FAULT_EVAC*/
478 bool oldleaf;
479 bool blocked;
480 int newParent;
481 int additionalGCount,newAdditionalGCount; //gcount that gets passed to u from the node u replace
482 CkVec<int> newKids;
483 CkMsgQ<CkReductionMsg> bufferedMsgs;
484 CkMsgQ<CkReductionMsg> bufferedRemoteMsgs;
485 enum {OLDPARENT,OLDCHILDREN,NEWPARENT,LEAFPARENT};
486 int numModificationReplies;
487 int maxModificationRedNo;
488 int tempModificationRedNo;
489 bool readyDeletion;
490 bool killed;
492 //Checkpointing utilities
493 public:
494 virtual void pup(PUP::er &p);
495 /*FAULT_EVAC*/
496 virtual void evacuate();
497 virtual void doneEvacuate();
498 void DeleteChild(int deletedChild);
499 void DeleteNewChild(int deletedChild);
500 void collectMaxRedNo(int maxRedNo);
501 void unblockNode(int maxRedNo);
502 void modifyTree(int code,int size,int *data);
503 private:
504 int findMaxRedNo();
505 void updateTree();
506 void clearBlockedMsgs();
511 //A NodeGroup that contribute to reductions
512 class NodeGroup : public CkNodeReductionMgr {
513 protected:
514 contributorInfo reductionInfo;//My reduction information
515 public:
516 CmiNodeLock __nodelock;
517 const int thisIndex;
518 NodeGroup();
519 NodeGroup(CkMigrateMessage* m):CkNodeReductionMgr(m),thisIndex(CkMyNode()) { __nodelock=CmiCreateLock(); }
521 ~NodeGroup();
522 inline const CkGroupID &ckGetGroupID(void) const {return thisgroup;}
523 inline CkGroupID CkGetNodeGroupID(void) const {return thisgroup;}
524 virtual bool isNodeGroup() { return true; }
526 virtual void pup(PUP::er &p);
527 virtual void flushStates() {
528 CkNodeReductionMgr::flushStates();
529 reductionInfo.redNo = 0;
532 CK_REDUCTION_CONTRIBUTE_METHODS_DECL
533 void contributeWithCounter(CkReductionMsg *msg,int count);
537 class CkReductionMgr : public CkGroupInitCallback {
538 public:
539 CProxy_CkReductionMgr thisProxy;
541 public:
542 CkReductionMgr();
543 CkReductionMgr(CkMigrateMessage *m);
544 ~CkReductionMgr();
546 typedef CkReductionClientFn clientFn;
549 * Add the given client function. Overwrites any previous client.
550 * This manager will dispose of the callback when replaced or done.
552 void ckSetReductionClient(CkCallback *cb);
554 //Contributors keep a copy of this structure:
557 //Contributor list maintainance:
558 //These just set and clear the "creating" flag to prevent
559 // reductions from finishing early because not all elements
560 // have been created.
561 void creatingContributors(void);
562 void doneCreatingContributors(void);
563 //Initializes a new contributor
564 void contributorStamped(contributorInfo *ci);//Increment global number
565 void contributorCreated(contributorInfo *ci);//Increment local number
566 void contributorDied(contributorInfo *ci);//Don't expect more contributions
567 //Migrating away
568 void contributorLeaving(contributorInfo *ci);
569 //Migrating in
570 void contributorArriving(contributorInfo *ci);
572 //Contribute-- the given msg can contain any data. The reducerType
573 // field of the message must be valid.
574 // Each contributor must contribute exactly once to each reduction.
575 void contribute(contributorInfo *ci,CkReductionMsg *msg);
577 //Communication (library-private)
578 //Sent down the reduction tree (used by barren PEs)
579 void ReductionStarting(CkReductionNumberMsg *m);
580 //Sent to root of the reduction tree with late migrant data
581 void LateMigrantMsg(CkReductionMsg *m);
582 //A late migrating contributor will never contribute
583 void MigrantDied(CkReductionNumberMsg *m);
585 void RecvMsg(CkReductionMsg *m);
586 void AddToInactiveList(CkReductionInactiveMsg *m);
588 // simple barrier for FT
589 void barrier(CkReductionMsg * msg);
590 void Barrier_RecvMsg(CkReductionMsg *m);
591 void addBarrier(CkReductionMsg *m);
592 void finishBarrier(void);
594 virtual bool isReductionMgr(void){ return true; }
595 virtual void flushStates();
596 /*FAULT_EVAC: used to get the gcount on a processor when
597 it is evacuated.
598 TODO: It needs to be fixed as it should return the gcount
599 and the adjustment information for objects that might have
600 contributed and died.
601 The current implementation lets us get by in the case
602 when there are no gcount
604 int getGCount(){return gcount;};
605 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
606 void decGCount(){gcount--;}
607 void incNumImmigrantRecObjs(){
608 numImmigrantRecObjs++;
610 void decNumImmigrantRecObjs(){
611 numImmigrantRecObjs--;
613 void incNumEmigrantRecObjs(){
614 numEmigrantRecObjs++;
616 void decNumEmigrantRecObjs(){
617 numEmigrantRecObjs--;
620 #endif
622 private:
624 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
625 int numImmigrantRecObjs;
626 int numEmigrantRecObjs;
627 #endif
629 //Data members
630 //Stored callback function (may be NULL if none has been set)
631 CkCallback storedCallback;
633 int redNo;//Number of current reduction (incremented at end) to be deposited with NodeGroups
634 int completedRedNo;//Number of reduction Completed ie recieved callback from NodeGroups
635 bool inProgress;//Is a reduction started, but not complete?
636 bool creating;//Are elements still being created?
637 bool startRequested;//Should we start the next reduction when creation finished?
638 int gcount;//=el't created here - el't deleted here
639 int lcount;//Number of local contributors
640 int maxStartRequest; // the highest future ReductionStarting message received
642 //Current local and remote contributions
643 int nContrib,nRemote;
644 // Is it inactive
645 bool is_inactive;
647 // simple barrier
648 CkCallback barrier_storedCallback;
649 int barrier_gCount;
650 int barrier_nSource;
651 int barrier_nContrib,barrier_nRemote;
653 //Contributions queued for the current reduction
654 CkMsgQ<CkReductionMsg> msgs;
656 //Contributions queued for future reductions (sent to us too early)
657 CkMsgQ<CkReductionMsg> futureMsgs;
658 //Remote messages queued for future reductions (sent to us too early)
659 CkMsgQ<CkReductionMsg> futureRemoteMsgs;
661 CkMsgQ<CkReductionMsg> finalMsgs;
662 std::map<int, int> inactiveList;
664 //State:
665 void startReduction(int number,int srcPE);
666 void addContribution(CkReductionMsg *m);
667 void finishReduction(void);
668 void checkIsActive();
669 void informParentInactive();
670 void checkAndAddToInactiveList(int id, int red_no);
671 void checkAndRemoveFromInactiveList(int id, int red_no);
672 void sendReductionStartingToKids(int red_no);
674 //Reduction tree utilities
675 unsigned upperSize;
676 unsigned label;
677 int parent;
678 int numKids;
679 /*vector storing the children of this node*/
680 CkVec<int> newKids;
681 CkVec<int> kids;
682 void init_BinomialTree();
684 void init_TopoTree();
685 void init_BinaryTree();
686 enum {TREE_WID=2};
687 int treeRoot(void);//Root PE
689 //Combine (& free) the current message vector.
690 CkReductionMsg *reduceMessages(void);
692 //Map reduction number to a time
693 bool isPast(int num) const {return (bool)(num<redNo);}
694 bool isPresent(int num) const {return (bool)(num==redNo);}
695 bool isFuture(int num) const {return (bool)(num>redNo);}
698 //This vector of adjustments is indexed by redNo,
699 // starting from the current redNo.
700 CkVec<countAdjustment> adjVec;
701 //Return the countAdjustment struct for the given redNo:
702 countAdjustment &adj(int number);
703 //Shift the list of countAdjustments down
704 void shiftAdjVec(void);
706 protected:
707 bool hasParent(void);
708 int treeParent(void);//My parent PE
709 int firstKid(void);//My first child PE
710 int treeKids(void);//Number of children in tree
712 //whether to notify children that reduction starts
713 bool disableNotifyChildrenStart;
714 void resetCountersWhenFlushingStates() { gcount = lcount = 0; }
715 bool isDestroying;
717 //Checkpointing utilities
718 public:
719 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
720 int *perProcessorCounts;
721 int processorCount;
722 int totalCount;
723 int numberReductionMessages(){
724 if(totalCount != 0){
725 return totalCount;
726 }else{
727 return MAX_INT;
730 #endif
731 virtual void pup(PUP::er &p);
732 static bool isIrreducible(){ return false;}
733 void contributeViaMessage(CkReductionMsg *m);
736 //Define methods used to contribute to the given reduction type.
737 // Data is copied, not deleted.
738 /*#define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
739 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
740 CMK_REFNUM_TYPE userFlag=-1); \
741 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
742 const CkCallback &cb,CMK_REFNUM_TYPE userFlag=-1); \
743 void contribute(CkReductionMsg *msg);\*/
745 #define CkReductionTarget(me, method) \
746 CkIndex_##me::redn_wrapper_##method(NULL)
748 #define CK_REDUCTION_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
749 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
750 CMK_REFNUM_TYPE userFlag)\
752 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
753 msg->setUserFlag(userFlag);\
754 msg->setMigratableContributor(migratable);\
755 myRednMgr->contribute(&myRednInfo,msg);\
757 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
758 const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
760 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
761 msg->setUserFlag(userFlag);\
762 msg->setCallback(cb);\
763 msg->setMigratableContributor(migratable);\
764 myRednMgr->contribute(&myRednInfo,msg);\
766 void me::contribute(CkReductionMsg *msg) \
768 msg->setMigratableContributor(migratable);\
769 myRednMgr->contribute(&myRednInfo,msg);\
771 void me::contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
773 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
774 msg->setUserFlag(userFlag);\
775 msg->setCallback(cb);\
776 msg->setMigratableContributor(migratable);\
777 myRednMgr->contribute(&myRednInfo,msg);\
779 void me::contribute(CMK_REFNUM_TYPE userFlag)\
781 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
782 msg->setUserFlag(userFlag);\
783 msg->setMigratableContributor(migratable);\
784 myRednMgr->contribute(&myRednInfo,msg);\
787 #define CK_BARRIER_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
788 void me::barrier(const CkCallback &cb)\
790 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
791 msg->setCallback(cb);\
792 msg->setMigratableContributor(migratable);\
793 myRednMgr->barrier(msg);\
797 //A group that can contribute to reductions
798 class Group : public CkReductionMgr
800 contributorInfo reductionInfo;//My reduction information
801 public:
802 const int thisIndex;
803 Group();
804 Group(CkMigrateMessage *msg);
805 virtual bool isNodeGroup() { return false; }
806 virtual void pup(PUP::er &p);
807 virtual void flushStates() {
808 CkReductionMgr::flushStates();
809 reductionInfo.redNo = 0;
811 virtual void CkAddThreadListeners(CthThread tid, void *msg);
813 CK_REDUCTION_CONTRIBUTE_METHODS_DECL
814 CK_BARRIER_CONTRIBUTE_METHODS_DECL
817 #ifdef _PIPELINED_ALLREDUCE_
818 class AllreduceMgr
820 public:
821 AllreduceMgr() { fragsRecieved=0; size=0; }
822 friend class ArrayElement;
823 // recieve an allreduce message
824 void allreduce_recieve(CkReductionMsg* msg)
826 // allred_msgs.enq(msg);
827 fragsRecieved++;
828 if(fragsRecieved==1)
830 data = new char[FRAG_SIZE*msg->nFrags];
832 memcpy(data+msg->fragNo*FRAG_SIZE, msg->data, msg->dataSize);
833 size += msg->dataSize;
835 if(fragsRecieved==msg->nFrags) {
836 CkReductionMsg* ret = CkReductionMsg::buildNew(size, data);
837 cb.send(ret);
838 fragsRecieved=0; size=0;
839 delete [] data;
843 // TODO: check for same reduction
844 CkCallback cb;
845 int size;
846 char* data;
847 int fragsRecieved;
848 // CkMsgQ<CkReductionMsg> allred_msgs;
850 #endif // _PIPELINED_ALLREDUCE_
852 #endif //_CKREDUCTION_H