Clean up C linkage specific to the C++ migration
[charm.git] / src / conv-ccs / middle-ccs.C
blob74c11aa75c2239550ba4f3c4d5e8d4972e35da93
1 #include <unistd.h>
2 #include "middle.h"
4 #if CMK_BIGSIM_CHARM
5 #include "bgconverse.h"
6 #endif
7 #include "ccs-server.h"
8 #include "conv-ccs.h"
10 #ifdef _WIN32
11 # include <sys/types.h>
12 # include <io.h>
13 # include <process.h>
14 # define write _write
15 # define getpid _getpid
16 #endif
18 #if CMK_CCS_AVAILABLE
19 void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData);
21 void req_fw_handler(char *msg)
23   int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
24   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
25   int destPE = (int)ChMessageInt(hdr->pe);
26   if (CmiMyPe() == 0 && destPE == -1) {
27     /* Broadcast message to all other processors */
28     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len);
29     CmiSyncBroadcast(len, msg);
30   }
31   else if (destPE < -1) {
32     /* Multicast the message to your children */
33     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len)-destPE*sizeof(ChMessageInt_t);
34     int index, child, i;
35     int *pes = (int*)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
36     ChMessageInt_t *pes_nbo = (ChMessageInt_t *)pes;
37     offset -= destPE * sizeof(ChMessageInt_t);
38     if (ChMessageInt(pes_nbo[0]) == CmiMyPe()) {
39       for (index=0; index<-destPE; ++index) pes[index] = ChMessageInt(pes_nbo[index]);
40     }
41     for (index=0; index<-destPE; ++index) {
42       if (pes[index] == CmiMyPe()) break;
43     }
44     child = (index << 2) + 1;
45     for (i=0; i<4; ++i) {
46       if (child+i < -destPE) {
47         CmiSyncSend(pes[child+i], len, msg);
48       }
49     }
50   }
51   CcsHandleRequest(hdr, msg+offset);
52   CmiFree(msg);
55 extern int rep_fw_handler_idx;
56 /**
57  * Decide if the reply is ready to be forwarded to the waiting client,
58  * or if combination is required (for broadcast/multicast CCS requests.
59  */
60 int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
61   int repPE = (int)ChMessageInt(rep->pe);
62   if (repPE <= -1) {
63     /* Reduce the message to get the final reply */
64     CcsHandlerRec *fn;
65     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+repLen;
66     char *msg=(char*)CmiAlloc(len);
67     char *r=msg+CmiReservedHeaderSize;
68     char *handlerStr;
69     rep->len = ChMessageInt_new(repLen);
70     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
71     memcpy(r,repData,repLen);
72     CmiSetHandler(msg,rep_fw_handler_idx);
73     handlerStr=rep->handler;
74     fn=(CcsHandlerRec *)CcsGetHandler(handlerStr);
75     if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
76     if (repPE == -1) {
77       /* CCS Broadcast */
78       CkReduce(msg, len, fn->mergeFn);
79     } else {
80       /* CCS Multicast */
81       CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
82     }
83   } else {
84     if (_conditionalDelivery == 0) CcsImpl_reply(rep, repLen, repData);
85     else {
86       /* We are the child of a conditional delivery, write to the parent the reply */
87       if (write(conditionalPipe[1], &repLen, 4) != 4) {
88         CmiAbort("CCS> writing reply length to parent failed!");
89       }
90       if (write(conditionalPipe[1], repData, repLen) != repLen) {
91         CmiAbort("CCS> writing reply data to parent failed!");
92       }
93     }
94   }
95   return 0;
99 /**********************************************
100   "ccs_getinfo"-- takes no data
101     Return the number of parallel nodes, and
102       the number of processors per node as an array
103       of 4-byte big-endian ints.
106 void ccs_getinfo(char *msg)
108   int nNode=CmiNumNodes();
109   int len=(1+nNode)*sizeof(ChMessageInt_t);
110   ChMessageInt_t *table=(ChMessageInt_t *)malloc(len);
111   int n;
112   table[0]=ChMessageInt_new(nNode);
113   for (n=0;n<nNode;n++)
114     table[1+n]=ChMessageInt_new(CmiNodeSize(n));
115   CcsSendReply(len,(const char *)table);
116   free(table);
117   CmiFree(msg);
120 ///////////////////////////////// middle-debug.C
121 #endif
122 #if ! CMK_HAS_GETPID
123 typedef int pid_t;
124 #endif
126 CpvCExtern(void *, debugQueue);
127 CpvDeclare(void *, debugQueue);
128 CpvCExtern(int, freezeModeFlag);
129 CpvDeclare(int, freezeModeFlag);
132  Start the freeze-- call will not return until unfrozen
133  via a CCS request.
134  */
135 void CpdFreeze(void)
137   pid_t pid = 0;
138 #if CMK_HAS_GETPID
139   pid = getpid();
140 #endif
141   CpdNotify(CPD_FREEZE,pid);
142   if (CpvAccess(freezeModeFlag)) return; /*Already frozen*/
143   CpvAccess(freezeModeFlag) = 1;
144 #if ! CMK_BIGSIM_CHARM
145   CpdFreezeModeScheduler();
146 #endif
149 void CpdUnFreeze(void)
151   CpvAccess(freezeModeFlag) = 0;
154 int CpdIsFrozen(void) {
155   return CpvAccess(freezeModeFlag);
158 #if CMK_BIGSIM_CHARM
159 #include "blue_impl.h"
160 void BgProcessMessageFreezeMode(threadInfo *t, char *msg) {
161 //  CmiPrintf("BgProcessMessageFreezeMode\n");
162 #if CMK_CCS_AVAILABLE
163   void *debugQ=CpvAccess(debugQueue);
164   CmiAssert(msg!=NULL);
165   int processImmediately = CpdIsDebugMessage(msg);
166   if (processImmediately) BgProcessMessageDefault(t, msg);
167   while (!CpvAccess(freezeModeFlag) && !CdsFifo_Empty(debugQ)) {
168     BgProcessMessageDefault(t, (char*)CdsFifo_Dequeue(debugQ));
169   }
170   if (!processImmediately) {
171     if (!CpvAccess(freezeModeFlag)) BgProcessMessageDefault(t, msg); 
172     else CdsFifo_Enqueue(debugQ, msg);
173   }
174 #else
175   BgProcessMessageDefault(t, msg);
176 #endif
178 #endif
180 void PrintDebugStackTrace(void *);
181 void * MemoryToSlot(void *ptr);
182 int Slot_StackTrace(void *s, void ***stack);
183 int Slot_ChareOwner(void *s);
185 #include <stdarg.h>
186 void CpdNotify(int type, ...) {
187   void *ptr; int integer, i;
188   pid_t pid=0;
189   int levels=64;
190   void *stackPtrs[64];
191   void *sl;
192   va_list list;
193   va_start(list, type);
194   switch (type) {
195   case CPD_ABORT:
196     CmiPrintf("CPD: %d Abort %s\n",CmiMyPe(), va_arg(list, char*));
197     break;
198   case CPD_SIGNAL:
199     CmiPrintf("CPD: %d Signal %d\n",CmiMyPe(), va_arg(list, int));
200     break;
201   case CPD_FREEZE:
202 #if CMK_HAS_GETPID
203     pid = getpid();
204 #endif
205     CmiPrintf("CPD: %d Freeze %d\n",CmiMyPe(),pid);
206     break;
207   case CPD_BREAKPOINT:
208     CmiPrintf("CPD: %d BP %s\n",CmiMyPe(), va_arg(list, char*));
209     break;
210   case CPD_CROSSCORRUPTION:
211     ptr = va_arg(list, void*);
212     integer = va_arg(list, int);
213     CmiPrintf("CPD: %d Cross %p %d ",CmiMyPe(), ptr, integer);
214     sl = MemoryToSlot(ptr);
215     if (sl != NULL) {
216       int stackLen; void **stackTrace;
217       stackLen = Slot_StackTrace(sl, &stackTrace);
218       CmiPrintf("%d %d ",Slot_ChareOwner(sl),stackLen);
219       for (i=0; i<stackLen; ++i) CmiPrintf("%p ",stackTrace[i]);
220     } else {
221       CmiPrintf("0 ");
222     }
223     CmiBacktraceRecord(stackPtrs,1,&levels);
224     CmiPrintf("%d ",levels);
225     for (i=0; i<levels; ++i) CmiPrintf("%p ",stackPtrs[i]);
226     CmiPrintf("\n");
227     break;
228   }
229   va_end(list);