5 CpvStaticDeclare(CmmTable
,seq_table
);
7 CpvStaticDeclare(int,pvmc_control_handler
);
8 CpvStaticDeclare(int,pvmc_msg_handler
);
9 CpvStaticDeclare(int*,send_seq_num
);
10 CpvStaticDeclare(int*,recv_seq_num
);
12 CpvExtern(int,pvmc_barrier_num
);
13 CpvExtern(int,pvmc_at_barrier_num
);
15 typedef struct msg_hdr_struct
{
16 char handler
[CmiMsgHeaderSizeBytes
];
21 typedef struct control_msg_struct
{
22 char handler
[CmiMsgHeaderSizeBytes
];
26 static void pvmc_control_handler_func(void *);
27 static void pvmc_msg_handler_func(void *);
29 void pvmc_init_comm(void)
34 PRINTF("Pe(%d) tid=%d:pvm_init_comm()\n",MYPE(),pvm_mytid());
37 CpvInitialize(CmmTable
,seq_table
);
38 CpvAccess(seq_table
) = CmmNew();
40 CpvInitialize(int,pvmc_control_handler
);
41 CpvAccess(pvmc_control_handler
)=
42 CmiRegisterHandler(pvmc_control_handler_func
);
44 CpvInitialize(int,pvmc_msg_handler
);
45 CpvAccess(pvmc_msg_handler
)=CmiRegisterHandler(pvmc_msg_handler_func
);
47 CpvInitialize(int*,recv_seq_num
);
48 CpvAccess(recv_seq_num
)=(int *)MALLOC(CmiNumPes()*sizeof(int));
50 if (CpvAccess(recv_seq_num
)==NULL
) {
51 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_init_comm() can't allocate seq buffer\n",
52 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
55 for(i
=0; i
<CmiNumPes(); i
++)
56 CpvAccess(recv_seq_num
)=0;
58 CpvInitialize(int*,send_seq_num
);
59 CpvAccess(send_seq_num
)=(int *)MALLOC(CmiNumPes()*sizeof(int));
61 if (CpvAccess(send_seq_num
)==NULL
) {
62 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_init_comm() can't allocate seq buffer\n",
63 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
66 for(i
=0; i
<CmiNumPes(); i
++)
67 CpvAccess(send_seq_num
)[i
]=0;
71 void pvmc_send_control_msg(int type
, int pe
)
75 msg
=(control_msg
*)CmiAlloc(sizeof(control_msg
));
77 CmiSetHandler(msg
,CpvAccess(pvmc_control_handler
));
78 CmiSyncSendAndFree(pe
,sizeof(control_msg
),msg
);
81 static void pvmc_control_handler_func(void *voidmsg
)
83 control_msg
*msg
= (control_msg
*)voidmsg
;
85 case PVMC_CTRL_AT_BARRIER
:
86 CpvAccess(pvmc_at_barrier_num
)++;
88 case PVMC_CTRL_THROUGH_BARRIER
:
89 CpvAccess(pvmc_barrier_num
)++;
96 PRINTF("WARNING: %s:%d, Illegal control message\n",__FILE__
,__LINE__
);
100 static void pvmc_msg_handler_func(void *msg
)
108 sender
=((msg_hdr
*)msg
)->sender
;
109 seq_num
=((msg_hdr
*)msg
)->seq_num
;
112 tags
[1]=pvmc_gettag((char *)msg
+sizeof(msg_hdr
));
113 CmmPut(CpvAccess(seq_table
),2,tags
,msg
);
116 int pvm_kill(int tid
)
118 control_msg
*exit_msg
;
121 PRINTF("Pe(%d) tid=%d:pvm_kill(%d)\n",
122 MYPE(),pvm_mytid(),tid
);
124 pvmc_send_control_msg(PVMC_CTRL_KILL
,TID2PE(tid
));
129 int pvm_send(int pvm_tid
, int tag
)
132 int msg_sz
, conv_tid
, conv_tag
;
135 PRINTF("Pe(%d) tid=%d:pvm_send(%d,%d)\n",
136 MYPE(),pvm_mytid(),pvm_tid
,tag
);
138 pvmc_settidtag(pvm_mytid(),tag
);
140 if ((pvm_tid
<1) || ( pvm_tid
> CmiNumPes())) {
141 PRINTF("Pe(%d) tid=%d:%s:%d pvm_send() illegal tid %d\n",
142 MYPE(),pvm_mytid(),__FILE__
,__LINE__
,pvm_tid
);
144 } else conv_tid
= pvm_tid
-1;
147 PRINTF("Pe(%d) tid=%d:%s:%d pvm_send() illegal tag\n",
148 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
150 } else conv_tag
= tag
;
152 msg_sz
= sizeof(msg_hdr
)+pvmc_sendmsgsz();
153 msg
= CmiAlloc(msg_sz
);
156 PRINTF("Pe(%d) tid=%d:%s:%d pvm_send() can't alloc msg buffer\n",
157 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
161 CmiSetHandler(msg
,CpvAccess(pvmc_msg_handler
));
162 ((msg_hdr
*)msg
)->sender
=MYPE();
163 ((msg_hdr
*)msg
)->seq_num
=CpvAccess(send_seq_num
)[conv_tid
];
164 CpvAccess(send_seq_num
)[conv_tid
]++;
166 pvmc_packmsg((char *)msg
+ (int)sizeof(msg_hdr
));
167 CmiSyncSendAndFree(conv_tid
,msg_sz
,msg
);
171 int pvm_mcast(int *tids
, int ntask
, int msgtag
)
176 PRINTF("Pe(%d) tid=%d:pvm_mcast(%x,%d,%d)\n",
177 MYPE(),pvm_mytid(),tids
,ntask
,msgtag
);
180 pvm_send(tids
[i
],msgtag
);
185 int pvm_nrecv(int tid
, int tag
)
187 int conv_tid
, conv_tag
;
195 PRINTF("Pe(%d) tid=%d:pvm_nrecv(%d,%d)\n",
196 MYPE(),pvm_mytid(),tid
,tag
);
200 conv_tid
=CmmWildCard
;
204 conv_tag
=CmmWildCard
;
208 * Empty messages from machine layer.
211 while(CmiDeliverMsgs(1)==0)
215 * See if the message is already in the tag table and extract it.
220 msg
=CmmGet(CpvAccess(seq_table
),2,tags
,rtags
);
224 seq_num = CpvAccess(recv_seq_num)[sender];
226 if ((((msg_hdr *)msg)->seq_num) != seq_num)
227 PRINTF("tid=%d:%s:%d pvm_recv() seq number mismatch, I'm confused\n",
228 tid,__FILE__,__LINE__);
229 else CpvAccess(recv_seq_num)[sender]++;
233 rbuf
=pvm_setrbuf(pvm_mkbuf(PvmDataRaw
));
237 PRINTF("Pe(%d) tid=%d:%s:%d pvm_nrecv() says pvm_setrbuf=%d\n",
238 MYPE(),tid
,__FILE__
,__LINE__
,rbuf
);
242 pvmc_unpackmsg(msg
,(char *)msg
+sizeof(msg_hdr
));
245 PRINTF("Pe(%d) tid=%d:%s:%d pvm_nrecv() returning pvm_getrbuf()=%d\n",
246 MYPE(),tid
,__FILE__
,__LINE__
,pvm_getrbuf());
248 return pvm_getrbuf();
250 else return 0; /* Non blocking receive returns immediately. */
253 int pvm_recv(int tid
, int tag
)
258 PRINTF("Pe(%d) tid=%d:pvm_recv(%d,%d)\n",
259 MYPE(),pvm_mytid(),tid
,tag
);
262 bufid
=pvm_nrecv(tid
,tag
);
265 PRINTF("Pe(%d) tid=%d:pvm_recv(%d,%d) returning %d\n",
266 MYPE(),pvm_mytid(),tid
,tag
,bufid
);
272 int pvm_probe(int tid
, int tag
)
274 int conv_tid
, conv_tag
;
281 PRINTF("Pe(%d) tid=%d:pvm_probe(%d,%d)\n",
282 MYPE(),pvm_mytid(),tid
,tag
);
285 conv_tid
=CmmWildCard
;
289 conv_tag
=CmmWildCard
;
293 * Empty messages from machine layer.
296 while(CmiDeliverMsgs(1)==0)
300 * See if the message is already in the tag table
305 msg
=CmmProbe(CpvAccess(seq_table
),2,tags
,rtags
);
309 seq_num = CpvAccess(recv_seq_num)[sender];
311 if ((((msg_hdr *)msg)->seq_num) != seq_num)
312 PRINTF("Pe(%d) tid=%d:%s:%d pvm_recv() seq num mismatch, I'm confused\n",
313 MYPE(),pvm_mytid(),__FILE__,__LINE__);
314 else CpvAccess(recv_seq_num)[sender]++;
318 * We will just unpack the message, so bufinfo works, but this
319 * should really just set up what bufinfo needs and unpack the
322 pvmc_unpackmsg(msg
,(char *)msg
+sizeof(msg_hdr
));
326 PRINTF("Pe(%d) tid=%d:%s:%d pvm_probe() returning pvm_getrbuf()=%d\n",
327 MYPE(),tid
,__FILE__
,__LINE__
,pvm_getrbuf());
329 return pvm_getrbuf();
331 else return 0; /* Probe returns immediately. */