LRTS Comm Thread Tracing in message recieve
[charm.git] / src / ck-core / sdag.h
blob0f6f7544c03db8a3300088d6f7e503a907407aea
1 #ifndef _sdag_H_
2 #define _sdag_H_
4 #include "pup.h"
6 namespace SDAG {
7 struct Closure : public PUP::able {
8 virtual void pup(PUP::er& p) = 0;
9 PUPable_abstract(Closure);
10 int continuations;
11 // reference count and self-destruct when no continuations have a reference
12 void ref() { continuations++; }
13 void deref() { if (--continuations <= 0) delete this; }
14 // done this way to keep Closure abstract for PUP reasons
15 // these must be called by descendents of Closure
16 void packClosure(PUP::er& p) { p | continuations; }
17 void init() { continuations = 1; }
18 virtual ~Closure() { }
22 #include <vector>
23 #include <list>
24 #include <map>
25 #include <set>
27 #include <pup_stl.h>
28 #include <envelope.h>
29 #include <debug-charm.h>
31 class CkMessage;
33 namespace SDAG {
34 struct TransportableBigSimLog : public Closure {
35 void* log;
36 TransportableBigSimLog() : log(0) { init(); }
37 TransportableBigSimLog(CkMigrateMessage*) : log(0) { init(); }
39 TransportableBigSimLog(void* log)
40 : log(log) { init(); }
42 void pup(PUP::er& p) {
43 if (p.isUnpacking()) log = 0;
44 else if (log != 0)
45 CkAbort("BigSim logs stored by SDAG are not migratable\n");
46 packClosure(p);
48 PUPable_decl(TransportableBigSimLog);
51 struct ForallClosure : public Closure {
52 int val;
53 ForallClosure() : val(0) { init(); }
54 ForallClosure(CkMigrateMessage*) : val(0) { init(); }
55 ForallClosure(int val) : val(val) { init(); }
57 void pup(PUP::er& p) {
58 p | val;
59 packClosure(p);
61 PUPable_decl(ForallClosure);
62 int& getP0() { return val; }
65 struct MsgClosure : public Closure {
66 void* msg;
68 MsgClosure() : msg(0) { init(); continuations = 0; }
69 MsgClosure(CkMigrateMessage*) : msg(0) { init(); continuations = 0; }
71 MsgClosure(void* msg)
72 : msg(msg) {
73 init();
74 continuations = 0;
75 CmiReference(UsrToEnv(msg));
78 void pup(PUP::er& p) {
79 bool hasMsg = msg;
80 p | hasMsg;
81 if (hasMsg) CkPupMessage(p, (void**)&msg);
82 if (hasMsg && p.isUnpacking())
83 CmiReference(UsrToEnv(msg));
84 packClosure(p);
87 virtual ~MsgClosure() {
88 if (msg) CmiFree(UsrToEnv(msg));
91 PUPable_decl(MsgClosure);
94 class CCounter : public Closure {
95 private:
96 unsigned int count;
97 public:
98 CCounter() { init(); }
99 CCounter(CkMigrateMessage*) { init(); }
100 CCounter(int c) : count(c) { init(); }
101 CCounter(int first, int last, int stride) {
102 init();
103 count = ((last - first) / stride) + 1;
105 void decrement(void) { count--; }
106 int isDone(void) { return (count == 0); }
108 void pup(PUP::er& p) {
109 p | count;
110 packClosure(p);
112 PUPable_decl(CCounter);
115 struct CSpeculator : public Closure {
116 int speculationIndex;
118 CSpeculator() : speculationIndex(0) { init(); }
119 CSpeculator(CkMigrateMessage*) : speculationIndex(0) { init(); }
121 CSpeculator(int speculationIndex_)
122 : speculationIndex(speculationIndex_) { init(); }
124 void pup(PUP::er& p) {
125 p | speculationIndex;
126 packClosure(p);
128 PUPable_decl(CSpeculator);
131 struct Continuation : public PUP::able {
132 int whenID;
133 std::vector<Closure*> closure;
134 std::vector<CMK_REFNUM_TYPE> entries, refnums;
135 std::vector<int> anyEntries;
136 int speculationIndex;
138 Continuation() : speculationIndex(-1) { }
139 Continuation(CkMigrateMessage*) : speculationIndex(-1) { }
141 Continuation(int whenID)
142 : whenID(whenID)
143 , speculationIndex(-1) { }
145 void pup(PUP::er& p) {
146 p | whenID;
147 p | closure;
148 p | entries;
149 p | refnums;
150 p | anyEntries;
151 p | speculationIndex;
154 void addClosure(Closure* cl) {
155 if (cl) cl->ref();
156 closure.push_back(cl);
159 virtual ~Continuation() {
160 for (int i = 0; i < closure.size(); i++)
161 if (closure[i])
162 closure[i]->deref();
165 PUPable_decl(Continuation);
168 struct Buffer : public PUP::able {
169 int entry;
170 CMK_REFNUM_TYPE refnum;
171 Closure* cl;
173 #if CMK_BIGSIM_CHARM
174 void *bgLog1, *bgLog2;
175 #endif
177 Buffer(CkMigrateMessage*) { }
179 Buffer(int entry, Closure* cl, CMK_REFNUM_TYPE refnum)
180 : entry(entry)
181 , refnum(refnum)
182 , cl(cl)
183 #if CMK_BIGSIM_CHARM
184 , bgLog1(0)
185 , bgLog2(0)
186 #endif
188 if (cl) cl->ref();
191 void pup(PUP::er& p) {
192 p | entry;
193 p | refnum;
194 bool hasCl = cl;
195 p | hasCl;
196 if (hasCl)
197 p | cl;
198 else
199 cl = 0;
200 #if CMK_BIGSIM_CHARM
201 if (p.isUnpacking())
202 bgLog1 = bgLog2 = 0;
203 else if (bgLog1 != 0 && bgLog2 != 0)
204 CkAbort("BigSim logs stored by SDAG are not migratable\n");
205 #endif
208 virtual ~Buffer() {
209 if (cl) cl->deref();
212 PUPable_decl(Buffer);
215 struct Dependency {
216 std::vector<std::list<int> > entryToWhen;
217 std::vector<std::list<Continuation*> > whenToContinuation;
219 // entry -> lst of buffers
220 // @todo this will have sequential lookup time for specific reference
221 // numbers
222 std::vector<std::list<Buffer*> > buffer;
224 int curSpeculationIndex;
226 void pup(PUP::er& p) {
227 p | curSpeculationIndex;
228 p | entryToWhen;
229 p | buffer;
230 p | whenToContinuation;
233 Dependency(int numEntries, int numWhens)
234 : entryToWhen(numEntries)
235 , whenToContinuation(numWhens)
236 , buffer(numEntries)
237 , curSpeculationIndex(0)
240 // after a migration free the structures
241 ~Dependency() {
242 for (std::vector<std::list<Buffer*> >::iterator iter = buffer.begin();
243 iter != buffer.end(); ++iter) {
244 std::list<Buffer*> lst = *iter;
245 for (std::list<Buffer*>::iterator iter2 = lst.begin();
246 iter2 != lst.end(); ++iter2) {
247 delete *iter2;
251 for (int i = 0; i < whenToContinuation.size(); i++) {
252 for (std::list<Continuation*>::iterator iter2 = whenToContinuation[i].begin();
253 iter2 != whenToContinuation[i].end(); ++iter2) {
254 delete *iter2;
259 void addDepends(int whenID, int entry) {
260 entryToWhen[entry].push_back(whenID);
263 void reg(Continuation *c) {
264 //printf("registering new continuation %p, whenID = %d\n", c, c->whenID);
265 whenToContinuation[c->whenID].push_back(c);
268 void dereg(Continuation *c) {
269 CkAssert(c->whenID < whenToContinuation.size());
270 std::list<Continuation*>& lst = whenToContinuation[c->whenID];
271 lst.remove(c);
274 Buffer* pushBuffer(int entry, Closure *cl, CMK_REFNUM_TYPE refnum) {
275 Buffer* buf = new Buffer(entry, cl, refnum);
276 buffer[entry].push_back(buf);
277 return buf;
280 Continuation *tryFindContinuation(int entry) {
281 for (std::list<int>::iterator iter = entryToWhen[entry].begin();
282 iter != entryToWhen[entry].end();
283 ++iter) {
284 int whenID = *iter;
286 for (std::list<Continuation*>::iterator iter2 = whenToContinuation[whenID].begin();
287 iter2 != whenToContinuation[whenID].end();
288 iter2++) {
289 Continuation* c = *iter2;
290 if (searchBufferedMatching(c)) {
291 dereg(c);
292 return c;
296 //printf("no continuation found\n");
297 return 0;
300 bool searchBufferedMatching(Continuation* t) {
301 CkAssert(t->entries.size() == t->refnums.size());
302 for (int i = 0; i < t->entries.size(); i++) {
303 if (!tryFindMessage(t->entries[i], true, t->refnums[i], 0)) {
304 return false;
307 for (int i = 0; i < t->anyEntries.size(); i++) {
308 if (!tryFindMessage(t->anyEntries[i], false, 0, 0)) {
309 return false;
312 return true;
315 Buffer* tryFindMessage(int entry, bool hasRef, CMK_REFNUM_TYPE refnum, std::set<Buffer*>* ignore) {
316 // @todo sequential lookup for buffer with reference number or ignore set
317 for (std::list<Buffer*>::iterator iter = buffer[entry].begin();
318 iter != buffer[entry].end();
319 ++iter) {
320 if ((!hasRef || (*iter)->refnum == refnum) &&
321 (!ignore || ignore->find(*iter) == ignore->end()))
322 return *iter;
324 return 0;
327 Buffer* tryFindMessage(int entry) {
328 if (buffer[entry].size() == 0)
329 return 0;
330 else
331 return buffer[entry].front();
334 void removeMessage(Buffer *buf) {
335 buffer[buf->entry].remove(buf);
338 int getAndIncrementSpeculationIndex() {
339 return curSpeculationIndex++;
342 void removeAllSpeculationIndex(int speculationIndex) {
343 for (std::vector<std::list<Continuation*> >::iterator iter = whenToContinuation.begin();
344 iter != whenToContinuation.end();
345 ++iter) {
346 std::list<Continuation*>& lst = *iter;
348 for (std::list<Continuation*>::iterator iter2 = lst.begin();
349 iter2 != lst.end();
350 //cppcheck-suppress StlMissingComparison
352 if ((*iter2)->speculationIndex == speculationIndex) {
353 Continuation *cancelled = *iter2;
354 //cppcheck-suppress StlMissingComparison
355 iter2 = lst.erase(iter2);
356 delete cancelled;
357 } else {
358 iter2++;
365 void registerPUPables();
368 #endif