Avoid building user-driven-interop example on non-LRTS layers
[charm.git] / src / langs / pvmc / pvmc_comm.c
blob2041b825ec802314a5c2c59a8b6716a75a45b980
1 #include <stddef.h>
2 #include "converse.h"
3 #include "pvmc.h"
5 CpvStaticDeclare(CmmTable,seq_table);
7 CpvStaticDeclare(int,pvmc_control_handler);
8 CpvStaticDeclare(int,pvmc_msg_handler);
9 CpvStaticDeclare(int*,send_seq_num);
10 CpvStaticDeclare(int*,recv_seq_num);
12 CpvExtern(int,pvmc_barrier_num);
13 CpvExtern(int,pvmc_at_barrier_num);
15 typedef struct msg_hdr_struct {
16 char handler[CmiMsgHeaderSizeBytes];
17 int sender;
18 unsigned int seq_num;
19 } msg_hdr;
21 typedef struct control_msg_struct {
22 char handler[CmiMsgHeaderSizeBytes];
23 int type;
24 } control_msg;
26 static void pvmc_control_handler_func(void *);
27 static void pvmc_msg_handler_func(void *);
29 void pvmc_init_comm(void)
31 int i;
33 #ifdef PVM_DEBUG
34 PRINTF("Pe(%d) tid=%d:pvm_init_comm()\n",MYPE(),pvm_mytid());
35 #endif
37 CpvInitialize(CmmTable,seq_table);
38 CpvAccess(seq_table) = CmmNew();
40 CpvInitialize(int,pvmc_control_handler);
41 CpvAccess(pvmc_control_handler)=
42 CmiRegisterHandler(pvmc_control_handler_func);
44 CpvInitialize(int,pvmc_msg_handler);
45 CpvAccess(pvmc_msg_handler)=CmiRegisterHandler(pvmc_msg_handler_func);
47 CpvInitialize(int*,recv_seq_num);
48 CpvAccess(recv_seq_num)=(int *)MALLOC(CmiNumPes()*sizeof(int));
50 if (CpvAccess(recv_seq_num)==NULL) {
51 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_init_comm() can't allocate seq buffer\n",
52 MYPE(),pvm_mytid(),__FILE__,__LINE__);
53 exit(1);
55 for(i=0; i<CmiNumPes(); i++)
56 CpvAccess(recv_seq_num)=0;
58 CpvInitialize(int*,send_seq_num);
59 CpvAccess(send_seq_num)=(int *)MALLOC(CmiNumPes()*sizeof(int));
61 if (CpvAccess(send_seq_num)==NULL) {
62 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_init_comm() can't allocate seq buffer\n",
63 MYPE(),pvm_mytid(),__FILE__,__LINE__);
64 exit(1);
66 for(i=0; i<CmiNumPes(); i++)
67 CpvAccess(send_seq_num)[i]=0;
71 void pvmc_send_control_msg(int type, int pe)
73 control_msg *msg;
75 msg=(control_msg *)CmiAlloc(sizeof(control_msg));
76 msg->type=type;
77 CmiSetHandler(msg,CpvAccess(pvmc_control_handler));
78 CmiSyncSendAndFree(pe,sizeof(control_msg),msg);
81 static void pvmc_control_handler_func(void *voidmsg)
83 control_msg *msg = (control_msg *)voidmsg;
84 switch (msg->type) {
85 case PVMC_CTRL_AT_BARRIER:
86 CpvAccess(pvmc_at_barrier_num)++;
87 break;
88 case PVMC_CTRL_THROUGH_BARRIER:
89 CpvAccess(pvmc_barrier_num)++;
90 break;
91 case PVMC_CTRL_KILL:
92 ConverseExit();
93 exit(0);
94 break;
95 default:
96 PRINTF("WARNING: %s:%d, Illegal control message\n",__FILE__,__LINE__);
100 static void pvmc_msg_handler_func(void *msg)
102 int seq_num;
103 int sender;
104 int pvm_tag;
105 int tags[2];
106 int rtags[2];
108 sender=((msg_hdr *)msg)->sender;
109 seq_num=((msg_hdr *)msg)->seq_num;
111 tags[0]=sender;
112 tags[1]=pvmc_gettag((char *)msg+sizeof(msg_hdr));
113 CmmPut(CpvAccess(seq_table),2,tags,msg);
116 int pvm_kill(int tid)
118 control_msg *exit_msg;
120 #ifdef PVM_DEBUG
121 PRINTF("Pe(%d) tid=%d:pvm_kill(%d)\n",
122 MYPE(),pvm_mytid(),tid);
123 #endif
124 pvmc_send_control_msg(PVMC_CTRL_KILL,TID2PE(tid));
126 return 0;
129 int pvm_send(int pvm_tid, int tag)
131 void *msg;
132 int msg_sz, conv_tid, conv_tag;
134 #ifdef PVM_DEBUG
135 PRINTF("Pe(%d) tid=%d:pvm_send(%d,%d)\n",
136 MYPE(),pvm_mytid(),pvm_tid,tag);
137 #endif
138 pvmc_settidtag(pvm_mytid(),tag);
140 if ((pvm_tid<1) || ( pvm_tid > CmiNumPes())) {
141 PRINTF("Pe(%d) tid=%d:%s:%d pvm_send() illegal tid %d\n",
142 MYPE(),pvm_mytid(),__FILE__,__LINE__,pvm_tid);
143 return -1;
144 } else conv_tid = pvm_tid-1;
146 if (tag<0) {
147 PRINTF("Pe(%d) tid=%d:%s:%d pvm_send() illegal tag\n",
148 MYPE(),pvm_mytid(),__FILE__,__LINE__);
149 return -1;
150 } else conv_tag = tag;
152 msg_sz = sizeof(msg_hdr)+pvmc_sendmsgsz();
153 msg = CmiAlloc(msg_sz);
155 if (msg==NULL) {
156 PRINTF("Pe(%d) tid=%d:%s:%d pvm_send() can't alloc msg buffer\n",
157 MYPE(),pvm_mytid(),__FILE__,__LINE__);
158 return -1;
161 CmiSetHandler(msg,CpvAccess(pvmc_msg_handler));
162 ((msg_hdr *)msg)->sender=MYPE();
163 ((msg_hdr *)msg)->seq_num=CpvAccess(send_seq_num)[conv_tid];
164 CpvAccess(send_seq_num)[conv_tid]++;
166 pvmc_packmsg((char *)msg + (int)sizeof(msg_hdr));
167 CmiSyncSendAndFree(conv_tid,msg_sz,msg);
168 return 0;
171 int pvm_mcast(int *tids, int ntask, int msgtag)
173 int i;
175 #ifdef PVM_DEBUG
176 PRINTF("Pe(%d) tid=%d:pvm_mcast(%x,%d,%d)\n",
177 MYPE(),pvm_mytid(),tids,ntask,msgtag);
178 #endif
179 for(i=0;i<ntask;i++)
180 pvm_send(tids[i],msgtag);
182 return 0;
185 int pvm_nrecv(int tid, int tag)
187 int conv_tid, conv_tag;
188 void *msg;
189 int tags[2];
190 int rtags[2];
191 int sender, seq_num;
192 int rbuf;
194 #ifdef PVM_DEBUG
195 PRINTF("Pe(%d) tid=%d:pvm_nrecv(%d,%d)\n",
196 MYPE(),pvm_mytid(),tid,tag);
197 #endif
199 if (tid==-1)
200 conv_tid=CmmWildCard;
201 else conv_tid=tid-1;
203 if (tag==-1)
204 conv_tag=CmmWildCard;
205 else conv_tag=tag;
208 * Empty messages from machine layer.
211 while(CmiDeliverMsgs(1)==0)
215 * See if the message is already in the tag table and extract it.
218 tags[0]=conv_tid;
219 tags[1]=conv_tag;
220 msg=CmmGet(CpvAccess(seq_table),2,tags,rtags);
221 if (msg!=NULL) {
222 sender = rtags[0];
224 seq_num = CpvAccess(recv_seq_num)[sender];
226 if ((((msg_hdr *)msg)->seq_num) != seq_num)
227 PRINTF("tid=%d:%s:%d pvm_recv() seq number mismatch, I'm confused\n",
228 tid,__FILE__,__LINE__);
229 else CpvAccess(recv_seq_num)[sender]++;
233 rbuf=pvm_setrbuf(pvm_mkbuf(PvmDataRaw));
234 if (rbuf > 0)
236 #ifdef PVM_DEBUG
237 PRINTF("Pe(%d) tid=%d:%s:%d pvm_nrecv() says pvm_setrbuf=%d\n",
238 MYPE(),tid,__FILE__,__LINE__,rbuf);
239 #endif
240 pvm_freebuf(rbuf);
242 pvmc_unpackmsg(msg,(char *)msg+sizeof(msg_hdr));
244 #ifdef PVM_DEBUG
245 PRINTF("Pe(%d) tid=%d:%s:%d pvm_nrecv() returning pvm_getrbuf()=%d\n",
246 MYPE(),tid,__FILE__,__LINE__,pvm_getrbuf());
247 #endif
248 return pvm_getrbuf();
250 else return 0; /* Non blocking receive returns immediately. */
253 int pvm_recv(int tid, int tag)
255 int bufid=0;
257 #ifdef PVM_DEBUG
258 PRINTF("Pe(%d) tid=%d:pvm_recv(%d,%d)\n",
259 MYPE(),pvm_mytid(),tid,tag);
260 #endif
261 while (bufid==0)
262 bufid=pvm_nrecv(tid,tag);
264 #ifdef PVM_DEBUG
265 PRINTF("Pe(%d) tid=%d:pvm_recv(%d,%d) returning %d\n",
266 MYPE(),pvm_mytid(),tid,tag,bufid);
267 #endif
269 return bufid;
272 int pvm_probe(int tid, int tag)
274 int conv_tid, conv_tag;
275 void *msg;
276 int tags[2];
277 int rtags[2];
278 int sender, seq_num;
280 #ifdef PVM_DEBUG
281 PRINTF("Pe(%d) tid=%d:pvm_probe(%d,%d)\n",
282 MYPE(),pvm_mytid(),tid,tag);
283 #endif
284 if (tid==-1)
285 conv_tid=CmmWildCard;
286 else conv_tid=tid;
288 if (tag==-1)
289 conv_tag=CmmWildCard;
290 else conv_tag=tag;
293 * Empty messages from machine layer.
296 while(CmiDeliverMsgs(1)==0)
300 * See if the message is already in the tag table
303 tags[0]=conv_tid;
304 tags[1]=conv_tag;
305 msg=CmmProbe(CpvAccess(seq_table),2,tags,rtags);
306 if (msg!=NULL) {
308 sender = rtag[0];
309 seq_num = CpvAccess(recv_seq_num)[sender];
311 if ((((msg_hdr *)msg)->seq_num) != seq_num)
312 PRINTF("Pe(%d) tid=%d:%s:%d pvm_recv() seq num mismatch, I'm confused\n",
313 MYPE(),pvm_mytid(),__FILE__,__LINE__);
314 else CpvAccess(recv_seq_num)[sender]++;
318 * We will just unpack the message, so bufinfo works, but this
319 * should really just set up what bufinfo needs and unpack the
320 * rest later
322 pvmc_unpackmsg(msg,(char *)msg+sizeof(msg_hdr));
325 #ifdef PVM_DEBUG
326 PRINTF("Pe(%d) tid=%d:%s:%d pvm_probe() returning pvm_getrbuf()=%d\n",
327 MYPE(),tid,__FILE__,__LINE__,pvm_getrbuf());
328 #endif
329 return pvm_getrbuf();
331 else return 0; /* Probe returns immediately. */