Fix memory leaks in trace projections and summary
[charm.git] / src / ck-core / ckobjQ.C
bloba9efa7a163bb4c3de0193fb2a8f08c5361d1fd5b
1 /*
2   implements per object message queue - every object maintains its own 
3   queue of messages.
5   potentially good for:
6   1. load balancer to know the number of pending messages waiting in its queue;
7   2. improve cache performance by executing all messages belonging to one obj.
9   There are two ways to use it:
10   1. by default, no queue is created for any object. Using "+objq" runtime
11      option will create queue for every object
12   2. in a Chare's constructor, call CkUsesObjQ() to turn on the object queue
13      for that calling Chare.
15   created by Gengbin Zheng, gzheng@uiucedu on 12/29/2003
17 #include "charm++.h"
18 #include "envelope.h"
19 #include "queueing.h"
20 #include "ckobjQ.h"
22 CkpvDeclare(TokenPool*, _tokenPool);
24 extern int index_tokenHandler;
26 extern CkMigratable * CkArrayMessageObjectPtr(envelope *env);
28 #define OBJQ_FIFO   1
30 // turn on object queue
31 void CkObjectMsgQ::create() { 
32   if (!objQ) 
33 #if OBJQ_FIFO
34     objQ = (void *)CdsFifo_Create();
35 #else
36     objQ = (void *)CqsCreate(); 
37 #endif
40 int CkObjectMsgQ::length() const { 
41 #if OBJQ_FIFO
42   return objQ?CdsFifo_Length((CdsFifo)objQ):0; 
43 #else
44   return objQ?CqsLength((Queue)objQ):0; 
45 #endif
48 CkObjectMsgQ::~CkObjectMsgQ() 
50   if (objQ) {
51     process();
52     // delete objQ
53 #if OBJQ_FIFO
54     CdsFifo_Destroy(objQ);
55 #else
56     CqsDelete((Queue)objQ);
57 #endif
58   }
61 // must be re-entrant
62 void CkObjectMsgQ::process() 
64 #if CMK_OBJECT_QUEUE_AVAILABLE
65   if (objQ == NULL) return;
66 //  if (inprocessing) return;
67   int mlen = length();
68   if (mlen == 0) return;
70   ObjectToken *tok;
71 #if OBJQ_FIFO
72   tok = (ObjectToken*)CdsFifo_Dequeue(objQ);
73 #else
74   CqsDequeue((Queue)objQ, (void **)&tok);
75 #endif
76   while (tok != NULL) {
77     envelope *env = (envelope *)tok->message;
78     if (env) {
79       // release messages in the queue
80       tok->message = NULL;
82       CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
83       env, env->getQueueing(),env->getPriobits(),
84       (unsigned int *)env->getPrioPtr());
86       // not be able to call inline, enqueue to the obj msg queue
87       CdsFifo_Enqueue(CpvAccess(CsdObjQueue), env);
88     }
89     else
90       CkpvAccess(_tokenPool)->put(tok);
91 #if OBJQ_FIFO
92     tok = (ObjectToken*)CdsFifo_Dequeue(objQ);
93 #else
94     CqsDequeue((Queue)objQ, (void **)&tok);
95 #endif
96   }
97 #endif
100 // find out the object pointer from a charm message envelope
101 Chare * CkFindObjectPtr(envelope *env)
103   Chare *obj = NULL;
104   switch(env->getMsgtype()) {
105     case BocInitMsg:
106     case NodeBocInitMsg:
107     case ArrayEltInitMsg:
108     case NewChareMsg:
109     case NewVChareMsg:
110     case ForVidMsg:
111     case FillVidMsg:
112       break;
113     case ForArrayEltMsg:
114       obj = CkArrayMessageObjectPtr(env);
115       break;
116     case ForChareMsg : {
117       // FIXME, chare calls CldEnqueue  which bypass the object queue
118       obj = (Chare*)env->getObjPtr();
119       break;
120     }
121     case ForBocMsg : {
122       obj = _localBranch(env->getGroupNum());
123       break;
124     }
125     case ForNodeBocMsg : {
126       obj = (Chare*)(CksvAccess(_nodeGroupTable)->find(env->getGroupNum()).getObj());
127       break;
128     }
129     default:
130       CmiAbort("Fatal Charm++ Error> Unknown msg-type in CkFindObjectPtr.\n");
131   }
132   return obj;
135 #if CMK_OBJECT_QUEUE_AVAILABLE
136 // insert an envelope msg into object queue
137 void _enqObjQueue(Chare *obj, envelope *env)
139     ObjectToken *token = CkpvAccess(_tokenPool)->get();
140     CmiAssert(token);
141     token->message = env;
142     token->objPtr = obj;
143   
144     CmiSetHandler(token, index_tokenHandler);
145     // enq to charm sched queue
146     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
147         token, env->getQueueing(),env->getPriobits(),
148         (unsigned int *)env->getPrioPtr());
149     // also enq to object queue
150 #if OBJQ_FIFO
151     CdsFifo_Enqueue(obj->CkGetObjQueue().queue(), token);
152 #else
153     CqsEnqueueGeneral((Queue)(obj->CkGetObjQueue().queue()),
154         token, env->getQueueing(),env->getPriobits(),
155         (unsigned int *)env->getPrioPtr());
156 #endif
158 #endif
161 // converseMsg is a real message
162 void _ObjectQHandler(void *converseMsg)
164 #if CMK_OBJECT_QUEUE_AVAILABLE
165   register envelope *env = (envelope *)(converseMsg);
166   Chare *obj = CkFindObjectPtr(env);
167   // swap handler back
168 //  CmiSetHandler(env, CmiGetXHandler(env));
169   // I can do this because this message is always a charm+ message
170   CmiSetHandler(env, _charmHandlerIdx);
171   if (obj && obj->CkGetObjQueue().queue()) {  // queue enabled
172     _enqObjQueue(obj, env);
173   }
174   else {   // obj queue not enabled
175     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
176         env, env->getQueueing(),env->getPriobits(),
177         (unsigned int *)env->getPrioPtr());
178   }
179 #else
180   CmiAbort("Invalide _ObjectQHandler called!");
181 #endif
184 // normally from sched queue
185 void _TokenHandler(void *tokenMsg)
187 #if CMK_OBJECT_QUEUE_AVAILABLE
188   ObjectToken *token = (ObjectToken*)tokenMsg;
189   Chare *obj = token->objPtr;
190   void *message = token->message;
191   // we are done with this token out of sched queue
192   CkpvAccess(_tokenPool)->put(token);
193   if (message == NULL) {    // message already consumed
194     return;
195   }
196   CkObjectMsgQ &objQ = obj->CkGetObjQueue();
197   objQ.process();
198 #else
199   CmiAbort("Invalide _TokenHandler called!");
200 #endif