2 Charm++ File: Reduction Library
3 added 3/27/2000 by Orion Sky Lawlor, olawlor@acm.org
4 modified 02/21/2003 by Sayantan Chakravorty
7 A reduction takes some sort of inputs (contributions)
8 from some set of objects scattered across all PE's,
9 and combines (reduces) all the contributions onto one
10 PE. This library provides several different kinds of
11 combination routines (reducers), and various utilities
14 The calls needed to use the reduction manager are:
15 -Create with CProxy_CkReduction::ckNew.
18 #ifndef _CKREDUCTION_H
19 #define _CKREDUCTION_H
21 #include "CkReduction.decl.h"
23 #ifdef _PIPELINED_ALLREDUCE_
24 #define FRAG_SIZE 131072
25 #define FRAG_THRESHOLD 131072
28 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
29 #define MAX_INT 5000000
30 #define _MLOG_REDUCE_P2P_ 0
33 //This message is sent between group objects on a single PE
34 // to let each know the other has been created.
35 class CkGroupCallbackMsg
:public CMessage_CkGroupCallbackMsg
{
37 typedef void (*callbackType
)(void *param
);
38 CkGroupCallbackMsg(callbackType Ncallback
,void *Nparam
)
39 {callback
=Ncallback
;param
=Nparam
;}
40 void call(void) {(*callback
)(param
);}
42 callbackType callback
;
46 class CkGroupInitCallback
: public IrrGroup
{
48 CkGroupInitCallback(void);
49 CkGroupInitCallback(CkMigrateMessage
*m
):IrrGroup(m
) {}
50 void callMeBack(CkGroupCallbackMsg
*m
);
51 void pup(PUP::er
& p
){ IrrGroup::pup(p
); }
55 class CkGroupReadyCallback
: public IrrGroup
{
58 CkQ
<CkGroupCallbackMsg
*> _msgs
;
59 void callBuffered(void);
61 CkGroupReadyCallback(void);
62 CkGroupReadyCallback(CkMigrateMessage
*m
):IrrGroup(m
) {}
63 void callMeBack(CkGroupCallbackMsg
*m
);
64 bool isReady(void) { return _isReady
; }
66 void setReady(void) {_isReady
= true; callBuffered(); }
67 void setNotReady(void) {_isReady
= false; }
70 class CkReductionNumberMsg
:public CMessage_CkReductionNumberMsg
{
73 CkReductionNumberMsg(int n
) {num
=n
;}
77 class CkReductionInactiveMsg
:public CMessage_CkReductionInactiveMsg
{
80 CkReductionInactiveMsg(int i
, int r
) {id
=i
; redno
= r
;}
84 /**some data classes used by both ckreductionmgr and cknodereductionmgr**/
85 class contributorInfo
{
87 int redNo
;//Current reduction number
88 contributorInfo() {redNo
=0;}
89 //Migration utilities:
93 class countAdjustment
{
95 int gcount
;//Adjustment to global count (applied at reduction end)
96 int lcount
;//Adjustment to local count (applied continually)
97 countAdjustment(int ignored
=0) {(void)ignored
; gcount
=0; lcount
=0;}
98 void pup(PUP::er
& p
){ p
|gcount
; p
|lcount
; }
101 /** @todo: Fwd decl for a temporary class. Remove after
102 * delegated cross-array reductions are implemented more optimally
104 namespace ck
{ namespace impl
{ class XArraySectionReducer
; } }
106 //CkReduction is just a "namespace class" for the user-visible
107 // parts of the reduction system.
110 /*These are the reducers you can use,
111 in addition to any user-defined reducers.*/
113 /* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
114 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
116 remember to update CkReduction::reducerTable
118 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
119 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! */
122 //A placeholder invalid reduction type
125 //Compute the sum the numbers passed by each element.
126 sum_char
,sum_short
,sum_int
,sum_long
,sum_long_long
,
127 sum_uchar
,sum_ushort
,sum_uint
,sum_ulong
,
128 sum_ulong_long
,sum_float
,sum_double
,
130 //Compute the product the numbers passed by each element.
131 product_char
,product_short
,product_int
,product_long
,product_long_long
,
132 product_uchar
,product_ushort
,product_uint
,product_ulong
,
133 product_ulong_long
,product_float
,product_double
,
135 //Compute the largest number passed by any element.
136 max_char
,max_short
,max_int
,max_long
,max_long_long
,
137 max_uchar
,max_ushort
,max_uint
,max_ulong
,
138 max_ulong_long
,max_float
,max_double
,
140 //Compute the smallest number passed by any element.
141 min_char
,min_short
,min_int
,min_long
,min_long_long
,
142 min_uchar
,min_ushort
,min_uint
,min_ulong
,
143 min_ulong_long
,min_float
,min_double
,
145 //Compute the logical AND of the values passed by each element.
146 // The resulting value will be zero if any source value is zero.
147 logical_and
, // Deprecated: same as logical_and_int
148 logical_and_int
,logical_and_bool
,
150 //Compute the logical OR of the values passed by each element.
151 // The resulting value will be 1 if any source value is nonzero.
152 logical_or
, // Deprecated: same as logical_or_int
153 logical_or_int
,logical_or_bool
,
155 //Compute the logical XOR of the values passed by each element.
156 // The resulting value will be 1 if an odd number of source value is nonzero.
157 logical_xor_int
,logical_xor_bool
,
159 // Compute the logical bitvector AND of the values passed by each element.
160 bitvec_and
, // Deprecated: same as bitvec_and_int
161 bitvec_and_int
,bitvec_and_bool
,
163 // Compute the logical bitvector OR of the values passed by each element.
164 bitvec_or
, // Deprecated: same as bitvec_or_int
165 bitvec_or_int
,bitvec_or_bool
,
167 // Compute the logical bitvector XOR of the values passed by each element.
168 bitvec_xor
, // Deprecated: same as bitvec_xor_int
169 bitvec_xor_int
,bitvec_xor_bool
,
171 // Select one message at random to pass on
174 //Concatenate the (arbitrary) data passed by each element
177 //Combine the data passed by each element into an list of setElements.
178 // Each element may contribute arbitrary data (with arbitrary length).
181 // Calculate the count, mean, and variance / standard deviation of the data
184 // Combine multiple data/reducer pairs into one reduction
188 //This structure is used with the set reducer above,
189 // and contains the data from one contribution.
192 int dataSize
;//The allocated length of the `data' array, in bytes
193 char data
[1];//The beginning of the array of data
194 //Utility routine: get the next setElement,
195 // or return NULL if there are none.
196 setElement
*next(void);
199 // Structure containing the payload of a statistics reduction
200 struct statisticsElement
{
204 statisticsElement(double initialValue
);
205 double variance() const { return count
> 1 ? m2
/ (double(count
) - 1.0) : 0.0; }
206 double stddev() const { return sqrt(variance()); }
209 struct tupleElement
{
212 CkReduction::reducerType reducer
;
215 tupleElement(size_t dataSize
, void* data
, CkReduction::reducerType reducer
);
216 tupleElement(CkReduction::tupleElement
&& rhs_move
);
217 tupleElement
& operator=(CkReduction::tupleElement
&& rhs_move
);
220 inline void* getData(void) { return data
; }
221 void pup(PUP::er
&p
);
224 //Support for adding new reducerTypes:
225 //A reducerFunction is used to combine several contributions
226 //into a single summed contribution:
227 // nMsg gives the number of messages to reduce.
228 // msgs[i] contains a contribution or summed contribution.
229 typedef CkReductionMsg
*(*reducerFn
)(int nMsg
,CkReductionMsg
**msgs
);
231 struct reducerStruct
{
234 reducerStruct(reducerFn f
=NULL
, bool s
=false) : fn(f
), streamable(s
) {}
237 //Add the given reducer to the list. Returns the new reducer's
238 // reducerType. Must be called in the same order on every node.
239 static reducerType
addReducer(reducerFn fn
, bool streamable
=false);
242 friend class CkReductionMgr
;
243 friend class CkNodeReductionMgr
;
244 friend class CkMulticastMgr
;
245 friend class ck::impl::XArraySectionReducer
;
246 //System-level interface
248 //Reducer table: maps reducerTypes to reducerFns.
249 static std::vector
<reducerStruct
>& reducerTable();
250 static std::vector
<reducerStruct
> initReducerTable();
252 // tupleReduction needs access to the reducerTable that lives in this namespace
253 // so it is not a standalone function in ckreduction.C like other reduction implementations
254 static CkReductionMsg
* tupleReduction(int nMsgs
, CkReductionMsg
** msgs
);
256 //Don't instantiate a CkReduction object-- it's just a namespace.
259 PUPbytes(CkReduction::reducerType
)
261 //A CkReductionMsg is sent up the reduction tree-- it
262 // carries a contribution, or several reduced contributions.
263 class CkReductionMsg
: public CMessage_CkReductionMsg
265 friend class CkReduction
;
266 friend class CkReductionMgr
;
267 friend class CkNodeReductionMgr
;
268 friend class CkMulticastMgr
;
269 #ifdef _PIPELINED_ALLREDUCE_
270 friend class ArrayElement
;
271 friend class AllreduceMgr
;
273 friend class ck::impl::XArraySectionReducer
;
276 //Publically-accessible fields:
277 //"Constructor"-- builds and returns a new CkReductionMsg.
278 // the "srcData" array you specify will be copied into this object (unless NULL).
279 static CkReductionMsg
*buildNew(int NdataSize
,const void *srcData
,
280 CkReduction::reducerType reducer
=CkReduction::invalid
,
281 CkReductionMsg
*buf
= NULL
);
283 inline int getLength(void) const {return dataSize
;}
284 inline int getSize(void) const {return dataSize
;}
285 inline void *getData(void) {return data
;}
286 inline const void *getData(void) const {return data
;}
288 inline int getGcount(void){return gcount
;}
289 inline CkReduction::reducerType
getReducer(void){return reducer
;}
290 inline int getRedNo(void){return redNo
;}
292 inline CMK_REFNUM_TYPE
getUserFlag(void) const {return userFlag
;}
293 inline void setUserFlag(CMK_REFNUM_TYPE f
) { userFlag
=f
;}
295 inline void setCallback(const CkCallback
&cb
) { callback
=cb
; }
297 //Return true if this message came straight from a contribute call--
298 // if it didn't come from a previous reduction function.
299 inline bool isFromUser(void) const {return sourceFlag
==-1;}
301 inline bool isMigratableContributor(void) const {return migratableContributor
;}
302 inline void setMigratableContributor(bool _mig
){ migratableContributor
= _mig
;}
305 static CkReductionMsg
* buildFromTuple(CkReduction::tupleElement
* reductions
, int num_reductions
);
306 void toTuple(CkReduction::tupleElement
** out_reductions
, int* num_reductions
);
310 //Implementation-only fields (don't access these directly!)
311 //Msg runtime support
312 static void *alloc(int msgnum
, size_t size
, int *reqSize
, int priobits
);
313 static void *pack(CkReductionMsg
*);
314 static CkReductionMsg
*unpack(void *in
);
317 /* AMPI reductions use bare CkReductionMsg's instead of AmpiMsg's */
318 void *event
; // the event point that corresponds to this message
319 int eventPe
; // the PE that the event is located on
323 int dataSize
;//Length of array below, in bytes
324 void *data
;//Reduction data
325 CMK_REFNUM_TYPE userFlag
; //Some sort of identifying flag, for client use
326 CkCallback callback
; //What to do when done
327 bool migratableContributor
; // are the contributors migratable
329 int sourceFlag
;/*Flag:
330 0 indicates this is a placeholder message (meaning: nothing to report)
331 -1 indicates this is a single (non-reduced) contribution.
332 >0 indicates this is a reduced contribution.
334 int nSources(void) {return sourceFlag
<0?-sourceFlag
:sourceFlag
;}
335 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_ )
336 int sourceProcessorCount
;
343 CkReduction::reducerType reducer
;
344 //contributorInfo *ci;//Source contributor, or NULL if none
345 int redNo
;//The serial number of this reduction
346 int gcount
;//Contribution to the global contributor count
347 // for section multicast/reduction library
348 CkSectionInfo sid
; // section cookie for multicast
349 char rebuilt
; // indicate if the multicast tree needs rebuilt
351 int fragNo
; // fragment of a reduction msg (when pipelined)
352 // value = 0 to nFrags-1
353 double dataStorage
;//Start of data array (so it's double-aligned)
355 //Default constructor is private so you must use "buildNew", above
360 #define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
361 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
362 CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
363 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
364 const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
365 void contribute(CkReductionMsg *msg); \
366 void contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
367 void contribute(CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);
369 #define CK_BARRIER_CONTRIBUTE_METHODS_DECL \
370 void barrier(const CkCallback &cb);\
373 * One CkReductionMgr runs a non-overlapping set of reductions.
374 * It collects messages from all local contributors, then sends
375 * the reduced message up the reduction tree to node zero, where
376 * they're passed to the user's client function.
378 class CkNodeReductionMgr
: public IrrGroup
{
380 CProxy_CkNodeReductionMgr thisProxy
;
382 CkNodeReductionMgr(void);
383 CkNodeReductionMgr(CkMigrateMessage
*m
) : IrrGroup(m
) {
384 storedCallback
= NULL
;
386 ~CkNodeReductionMgr();
388 typedef CkReductionClientFn clientFn
;
391 * Add the given client function. Overwrites any previous client.
392 * This manager will dispose of the callback when replaced or done.
394 void ckSetReductionClient(CkCallback
*cb
);
396 //Contribute-- the given msg can contain any data. The reducerType
397 // field of the message must be valid.
398 // Each contributor must contribute exactly once to each reduction.
399 void contribute(contributorInfo
*ci
,CkReductionMsg
*msg
);
400 void contributeWithCounter(contributorInfo
*ci
,CkReductionMsg
*m
,int count
);
401 //Communication (library-private)
402 //Sent up the reduction tree with reduced data
403 void RecvMsg(CkReductionMsg
*m
);
404 void doRecvMsg(CkReductionMsg
*m
);
405 void LateMigrantMsg(CkReductionMsg
*m
);
407 virtual void flushStates(); // flush state varaibles
409 virtual int getTotalGCount(){return 0;};
413 //Stored callback function (may be NULL if none has been set)
414 CkCallback
*storedCallback
;
416 int redNo
;//Number of current reduction (incremented at end)
417 bool inProgress
;//Is a reduction started, but not complete?
418 bool creating
;//Are elements still being created?
419 bool startRequested
;//Should we start the next reduction when creation finished?
421 int lcount
;//Number of local contributors
423 //Current local and remote contributions
424 int nContrib
,nRemote
;
425 //Contributions queued for the current reduction
426 CkMsgQ
<CkReductionMsg
> msgs
;
427 //Contributions queued for future reductions (sent to us too early)
428 CkMsgQ
<CkReductionMsg
> futureMsgs
;
429 //Remote messages queued for future reductions (sent to us too early)
430 CkMsgQ
<CkReductionMsg
> futureRemoteMsgs
;
431 //Late migrant messages queued for future reductions
432 CkMsgQ
<CkReductionMsg
> futureLateMigrantMsgs
;
435 CmiNodeLock lockEverything
;
437 bool interrupt
; /* flag for use in non-smp: false means interrupt can occur, true means not (also acts as a lock) */
439 /*vector storing the children of this node*/
443 void startReduction(int number
,int srcPE
);
444 void doAddContribution(CkReductionMsg
*m
);
445 void finishReduction(void);
447 void addContribution(CkReductionMsg
*m
);
451 //Reduction tree utilities
452 /* for binomial trees*/
458 void init_BinomialTree();
461 void init_BinaryTree();
463 int treeRoot(void);//Root PE
464 bool hasParent(void);
465 int treeParent(void);//My parent PE
466 int firstKid(void);//My first child PE
467 int treeKids(void);//Number of children in tree
469 //Combine (& free) the current message vector.
470 CkReductionMsg
*reduceMessages(void);
472 //Map reduction number to a time
473 bool isPast(int num
) const {return (bool)(num
<redNo
);}
474 bool isPresent(int num
) const {return (bool)(num
==redNo
);}
475 bool isFuture(int num
) const {return (bool)(num
>redNo
);}
481 int additionalGCount
,newAdditionalGCount
; //gcount that gets passed to u from the node u replace
483 CkMsgQ
<CkReductionMsg
> bufferedMsgs
;
484 CkMsgQ
<CkReductionMsg
> bufferedRemoteMsgs
;
485 enum {OLDPARENT
,OLDCHILDREN
,NEWPARENT
,LEAFPARENT
};
486 int numModificationReplies
;
487 int maxModificationRedNo
;
488 int tempModificationRedNo
;
492 //Checkpointing utilities
494 virtual void pup(PUP::er
&p
);
496 virtual void evacuate();
497 virtual void doneEvacuate();
498 void DeleteChild(int deletedChild
);
499 void DeleteNewChild(int deletedChild
);
500 void collectMaxRedNo(int maxRedNo
);
501 void unblockNode(int maxRedNo
);
502 void modifyTree(int code
,int size
,int *data
);
506 void clearBlockedMsgs();
511 //A NodeGroup that contribute to reductions
512 class NodeGroup
: public CkNodeReductionMgr
{
514 contributorInfo reductionInfo
;//My reduction information
516 CmiNodeLock __nodelock
;
519 NodeGroup(CkMigrateMessage
* m
):CkNodeReductionMgr(m
),thisIndex(CkMyNode()) { __nodelock
=CmiCreateLock(); }
522 inline const CkGroupID
&ckGetGroupID(void) const {return thisgroup
;}
523 inline CkGroupID
CkGetNodeGroupID(void) const {return thisgroup
;}
524 virtual bool isNodeGroup() { return true; }
526 virtual void pup(PUP::er
&p
);
527 virtual void flushStates() {
528 CkNodeReductionMgr::flushStates();
529 reductionInfo
.redNo
= 0;
532 CK_REDUCTION_CONTRIBUTE_METHODS_DECL
533 void contributeWithCounter(CkReductionMsg
*msg
,int count
);
537 class CkReductionMgr
: public CkGroupInitCallback
{
539 CProxy_CkReductionMgr thisProxy
;
543 CkReductionMgr(CkMigrateMessage
*m
);
546 typedef CkReductionClientFn clientFn
;
549 * Add the given client function. Overwrites any previous client.
550 * This manager will dispose of the callback when replaced or done.
552 void ckSetReductionClient(CkCallback
*cb
);
554 //Contributors keep a copy of this structure:
557 //Contributor list maintainance:
558 //These just set and clear the "creating" flag to prevent
559 // reductions from finishing early because not all elements
560 // have been created.
561 void creatingContributors(void);
562 void doneCreatingContributors(void);
563 //Initializes a new contributor
564 void contributorStamped(contributorInfo
*ci
);//Increment global number
565 void contributorCreated(contributorInfo
*ci
);//Increment local number
566 void contributorDied(contributorInfo
*ci
);//Don't expect more contributions
568 void contributorLeaving(contributorInfo
*ci
);
570 void contributorArriving(contributorInfo
*ci
);
572 //Contribute-- the given msg can contain any data. The reducerType
573 // field of the message must be valid.
574 // Each contributor must contribute exactly once to each reduction.
575 void contribute(contributorInfo
*ci
,CkReductionMsg
*msg
);
577 //Communication (library-private)
578 //Sent down the reduction tree (used by barren PEs)
579 void ReductionStarting(CkReductionNumberMsg
*m
);
580 //Sent to root of the reduction tree with late migrant data
581 void LateMigrantMsg(CkReductionMsg
*m
);
582 //A late migrating contributor will never contribute
583 void MigrantDied(CkReductionNumberMsg
*m
);
585 void RecvMsg(CkReductionMsg
*m
);
586 void AddToInactiveList(CkReductionInactiveMsg
*m
);
588 // simple barrier for FT
589 void barrier(CkReductionMsg
* msg
);
590 void Barrier_RecvMsg(CkReductionMsg
*m
);
591 void addBarrier(CkReductionMsg
*m
);
592 void finishBarrier(void);
594 virtual bool isReductionMgr(void){ return true; }
595 virtual void flushStates();
596 /*FAULT_EVAC: used to get the gcount on a processor when
598 TODO: It needs to be fixed as it should return the gcount
599 and the adjustment information for objects that might have
600 contributed and died.
601 The current implementation lets us get by in the case
602 when there are no gcount
604 int getGCount(){return gcount
;};
605 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
606 void decGCount(){gcount
--;}
607 void incNumImmigrantRecObjs(){
608 numImmigrantRecObjs
++;
610 void decNumImmigrantRecObjs(){
611 numImmigrantRecObjs
--;
613 void incNumEmigrantRecObjs(){
614 numEmigrantRecObjs
++;
616 void decNumEmigrantRecObjs(){
617 numEmigrantRecObjs
--;
624 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
625 int numImmigrantRecObjs
;
626 int numEmigrantRecObjs
;
630 //Stored callback function (may be NULL if none has been set)
631 CkCallback storedCallback
;
633 int redNo
;//Number of current reduction (incremented at end) to be deposited with NodeGroups
634 int completedRedNo
;//Number of reduction Completed ie recieved callback from NodeGroups
635 bool inProgress
;//Is a reduction started, but not complete?
636 bool creating
;//Are elements still being created?
637 bool startRequested
;//Should we start the next reduction when creation finished?
638 int gcount
;//=el't created here - el't deleted here
639 int lcount
;//Number of local contributors
640 int maxStartRequest
; // the highest future ReductionStarting message received
642 //Current local and remote contributions
643 int nContrib
,nRemote
;
648 CkCallback barrier_storedCallback
;
651 int barrier_nContrib
,barrier_nRemote
;
653 //Contributions queued for the current reduction
654 CkMsgQ
<CkReductionMsg
> msgs
;
656 //Contributions queued for future reductions (sent to us too early)
657 CkMsgQ
<CkReductionMsg
> futureMsgs
;
658 //Remote messages queued for future reductions (sent to us too early)
659 CkMsgQ
<CkReductionMsg
> futureRemoteMsgs
;
661 CkMsgQ
<CkReductionMsg
> finalMsgs
;
662 std::map
<int, int> inactiveList
;
665 void startReduction(int number
,int srcPE
);
666 void addContribution(CkReductionMsg
*m
);
667 void finishReduction(void);
668 void checkIsActive();
669 void informParentInactive();
670 void checkAndAddToInactiveList(int id
, int red_no
);
671 void checkAndRemoveFromInactiveList(int id
, int red_no
);
672 void sendReductionStartingToKids(int red_no
);
674 //Reduction tree utilities
679 /*vector storing the children of this node*/
682 void init_BinomialTree();
684 void init_BinaryTree();
686 int treeRoot(void);//Root PE
687 bool hasParent(void);
688 int treeParent(void);//My parent PE
689 int firstKid(void);//My first child PE
690 int treeKids(void);//Number of children in tree
692 //Combine (& free) the current message vector.
693 CkReductionMsg
*reduceMessages(void);
695 //Map reduction number to a time
696 bool isPast(int num
) const {return (bool)(num
<redNo
);}
697 bool isPresent(int num
) const {return (bool)(num
==redNo
);}
698 bool isFuture(int num
) const {return (bool)(num
>redNo
);}
701 //This vector of adjustments is indexed by redNo,
702 // starting from the current redNo.
703 CkVec
<countAdjustment
> adjVec
;
704 //Return the countAdjustment struct for the given redNo:
705 countAdjustment
&adj(int number
);
706 //Shift the list of countAdjustments down
707 void shiftAdjVec(void);
710 //whether to notify children that reduction starts
711 bool disableNotifyChildrenStart
;
712 void resetCountersWhenFlushingStates() { gcount
= lcount
= 0; }
715 //Checkpointing utilities
717 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
718 int *perProcessorCounts
;
721 int numberReductionMessages(){
729 virtual void pup(PUP::er
&p
);
730 static bool isIrreducible(){ return false;}
731 void contributeViaMessage(CkReductionMsg
*m
);
734 //Define methods used to contribute to the given reduction type.
735 // Data is copied, not deleted.
736 /*#define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
737 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
738 CMK_REFNUM_TYPE userFlag=-1); \
739 void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
740 const CkCallback &cb,CMK_REFNUM_TYPE userFlag=-1); \
741 void contribute(CkReductionMsg *msg);\*/
743 #define CkReductionTarget(me, method) \
744 CkIndex_##me::redn_wrapper_##method(NULL)
746 #define CK_REDUCTION_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
747 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
748 CMK_REFNUM_TYPE userFlag)\
750 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
751 msg->setUserFlag(userFlag);\
752 msg->setMigratableContributor(migratable);\
753 myRednMgr->contribute(&myRednInfo,msg);\
755 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
756 const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
758 CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
759 msg->setUserFlag(userFlag);\
760 msg->setCallback(cb);\
761 msg->setMigratableContributor(migratable);\
762 myRednMgr->contribute(&myRednInfo,msg);\
764 void me::contribute(CkReductionMsg *msg) \
766 msg->setMigratableContributor(migratable);\
767 myRednMgr->contribute(&myRednInfo,msg);\
769 void me::contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
771 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
772 msg->setUserFlag(userFlag);\
773 msg->setCallback(cb);\
774 msg->setMigratableContributor(migratable);\
775 myRednMgr->contribute(&myRednInfo,msg);\
777 void me::contribute(CMK_REFNUM_TYPE userFlag)\
779 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
780 msg->setUserFlag(userFlag);\
781 msg->setMigratableContributor(migratable);\
782 myRednMgr->contribute(&myRednInfo,msg);\
785 #define CK_BARRIER_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
786 void me::barrier(const CkCallback &cb)\
788 CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
789 msg->setCallback(cb);\
790 msg->setMigratableContributor(migratable);\
791 myRednMgr->barrier(msg);\
795 //A group that can contribute to reductions
796 class Group
: public CkReductionMgr
798 contributorInfo reductionInfo
;//My reduction information
802 Group(CkMigrateMessage
*msg
);
803 virtual bool isNodeGroup() { return false; }
804 virtual void pup(PUP::er
&p
);
805 virtual void flushStates() {
806 CkReductionMgr::flushStates();
807 reductionInfo
.redNo
= 0;
809 virtual void CkAddThreadListeners(CthThread tid
, void *msg
);
811 CK_REDUCTION_CONTRIBUTE_METHODS_DECL
812 CK_BARRIER_CONTRIBUTE_METHODS_DECL
815 #ifdef _PIPELINED_ALLREDUCE_
819 AllreduceMgr() { fragsRecieved
=0; size
=0; }
820 friend class ArrayElement
;
821 // recieve an allreduce message
822 void allreduce_recieve(CkReductionMsg
* msg
)
824 // allred_msgs.enq(msg);
828 data
= new char[FRAG_SIZE
*msg
->nFrags
];
830 memcpy(data
+msg
->fragNo
*FRAG_SIZE
, msg
->data
, msg
->dataSize
);
831 size
+= msg
->dataSize
;
833 if(fragsRecieved
==msg
->nFrags
) {
834 CkReductionMsg
* ret
= CkReductionMsg::buildNew(size
, data
);
836 fragsRecieved
=0; size
=0;
841 // TODO: check for same reduction
846 // CkMsgQ<CkReductionMsg> allred_msgs;
848 #endif // _PIPELINED_ALLREDUCE_
850 #endif //_CKREDUCTION_H