6 #define MAX_BUFFERS 1000
8 typedef struct pvmc_item_s
{
13 struct pvmc_item_s
*nxt
;
16 typedef struct pvmc_buffer_s
{
23 pvmc_item
*first_item
;
26 struct pvmc_buffer_s
*nxt_free
;
30 CpvStaticDeclare(pvmc_buffer
*,pvmc_bufarray
);
31 CpvStaticDeclare(pvmc_buffer
*,pvmc_freebufs
);
32 CpvStaticDeclare(int,pvmc_sbufid
);
33 CpvStaticDeclare(int,pvmc_rbufid
);
35 void pvmc_init_bufs(void)
40 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_init_bufs() initializing buffer array\n",
41 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
44 CpvInitialize(pvmc_buffer
*,pvmc_bufarray
);
45 CpvAccess(pvmc_bufarray
)=(pvmc_buffer
*)MALLOC(sizeof(pvmc_buffer
)*MAX_BUFFERS
);
46 if (CpvAccess(pvmc_bufarray
)==NULL
) {
47 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_init_bufs() can't alloc buffer array\n",
48 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
52 CpvInitialize(pvmc_buffer
*,pvmc_freebufs
);
53 CpvAccess(pvmc_freebufs
)=&(CpvAccess(pvmc_bufarray
)[1]); /* throw away first bufid */
55 for(i
=0;i
<MAX_BUFFERS
;i
++) {
56 CpvAccess(pvmc_bufarray
)[i
].bufid
=i
;
57 CpvAccess(pvmc_bufarray
)[i
].bytes
=0;
58 CpvAccess(pvmc_bufarray
)[i
].tag
=0;
59 CpvAccess(pvmc_bufarray
)[i
].tid
=-1;
60 CpvAccess(pvmc_bufarray
)[i
].num_items
=-1;
61 CpvAccess(pvmc_bufarray
)[i
].refcount
=0;
62 CpvAccess(pvmc_bufarray
)[i
].first_item
=(pvmc_item
*)NULL
;
63 CpvAccess(pvmc_bufarray
)[i
].cur_item
=(pvmc_item
*)NULL
;
64 CpvAccess(pvmc_bufarray
)[i
].last_item
=(pvmc_item
*)NULL
;
66 CpvAccess(pvmc_bufarray
)[i
].nxt_free
=(pvmc_buffer
*)NULL
;
68 CpvAccess(pvmc_bufarray
)[i
].nxt_free
=&(CpvAccess(pvmc_bufarray
)[i
+1]);
70 CpvAccess(pvmc_bufarray
)[i
].data_buf
=(char *)NULL
;
73 CpvInitialize(int,pvmc_sbufid
);
74 CpvAccess(pvmc_sbufid
) = 0;
76 CpvInitialize(int,pvmc_rbufid
);
77 CpvAccess(pvmc_rbufid
) = 0;
80 int pvm_mkbuf(int encoding
)
85 PRINTF("Pe(%d) tid=%d:pvm_mkbuf(%d)\n",
86 MYPE(),pvm_mytid(),encoding
);
88 if (encoding
!= PvmDataRaw
)
89 PRINTF("Pe(%d) tid=%d:%s:%d Warning: only encoding=PvmDataRaw supported\n",
90 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
93 new_buf
= CpvAccess(pvmc_freebufs
);
94 if (new_buf
== NULL
) {
95 PRINTF("Pe(%d) tid=%d:%s:%d pvm_mkbuf() no more buffers\n",
96 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
100 CpvAccess(pvmc_freebufs
)=CpvAccess(pvmc_freebufs
)->nxt_free
;
103 new_buf
->tid
=pvm_mytid();
104 new_buf
->num_items
= 0;
105 if ((new_buf
->first_item
=
106 (pvmc_item
*)MALLOC(sizeof(pvmc_item
))) == NULL
) {
107 PRINTF("Pe(%d) tid=%d:%s:%d pvm_mkbuf() MALLOC failed\n",
108 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
111 new_buf
->first_item
->type
=0;
112 new_buf
->first_item
->size
=0;
113 new_buf
->first_item
->free_data
=FALSE
;
114 new_buf
->first_item
->data
=(char *)NULL
;
115 new_buf
->first_item
->nxt
=(pvmc_item
*)NULL
;
117 new_buf
->cur_item
=new_buf
->first_item
;
118 new_buf
->last_item
=new_buf
->first_item
;
120 return new_buf
->bufid
;
123 static void pvmc_emptybuf(pvmc_buffer
*cur_buf
)
125 pvmc_item
*nxt_item
, *prv_item
;
127 if (cur_buf
->data_buf
) {
128 FREE(cur_buf
->data_buf
);
129 cur_buf
->data_buf
=(char *)NULL
;
132 nxt_item
=cur_buf
->first_item
;
135 nxt_item
=nxt_item
->nxt
;
136 if (prv_item
->free_data
)
137 FREE(prv_item
->data
);
143 cur_buf
->num_items
=0;
144 cur_buf
->first_item
=(pvmc_item
*)NULL
;
145 cur_buf
->cur_item
=(pvmc_item
*)NULL
;
146 cur_buf
->last_item
=(pvmc_item
*)NULL
;
149 int pvm_freebuf(int bufid
)
151 pvmc_buffer
*cur_buf
;
155 PRINTF("Pe(%d) tid=%d:pvm_freebuf(%d)\n",MYPE(),pvm_mytid(),bufid
);
158 if ((bufid
<=0) || (bufid
>=MAX_BUFFERS
)) {
159 PRINTF("Pe(%d) tid=%d:%s:%d pvm_freebuf() attempted to free out of range bufid\n",
160 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
164 cur_buf
= &(CpvAccess(pvmc_bufarray
)[bufid
]);
166 if (cur_buf
->refcount
< 1) {
167 PRINTF("Pe(%d) tid=%d:%s:%d pvm_freebuf(%d) refcount=%d, i'm confused\n",
168 MYPE(),pvm_mytid(),__FILE__
,__LINE__
,bufid
,cur_buf
->refcount
);
173 if (cur_buf
->refcount
==0) {
174 pvmc_emptybuf(cur_buf
);
175 cur_buf
->nxt_free
=CpvAccess(pvmc_freebufs
);
176 CpvAccess(pvmc_freebufs
)=cur_buf
;
184 struct pvmc_buffer_s
*FreeList
;
185 /* find the number that we think are free */
186 for(x
=0; x
<MAX_BUFFERS
; x
++)
188 if (CpvAccess(pvmc_bufarray
)[x
].refcount
== 0) Counter
++;
190 /* find the number that are linked as free */
191 FreeList
= CpvAccess(pvmc_freebufs
);
192 while(FreeList
!= NULL
)
195 FreeList
= FreeList
->nxt_free
;
197 /* show the results */
198 PRINTF("Pe(%d) tid=%d:%s:%d unused=(%d) sizeof(freelist)=%d\n",
199 MYPE(),pvm_mytid(),__FILE__
,__LINE__
,Counter
,FreeCounter
);
206 static int pvmc_getbuf(int bufid
)
208 pvmc_buffer
*cur_buf
;
210 if ((bufid
<=0) || (bufid
>=MAX_BUFFERS
)) {
211 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_getbuf(%d) attempted to get out of range bufid\n",
212 MYPE(),pvm_mytid(),__FILE__
,__LINE__
,bufid
);
216 cur_buf
= &(CpvAccess(pvmc_bufarray
)[bufid
]);
218 if (cur_buf
->refcount
<1) {
219 PRINTF("Pe(%d) tid=%d:%s:%d pvm_getbuf() trying with refcount=%d, i'm confused\n",
220 MYPE(),pvm_mytid(),__FILE__
,__LINE__
,cur_buf
->refcount
);
228 int pvm_getsbuf(void)
231 PRINTF("Pe(%d) tid=%d:pvm_getsbuf()\n",MYPE(),pvm_mytid());
233 return CpvAccess(pvmc_sbufid
);
236 int pvm_setsbuf(int bufid
)
241 PRINTF("Pe(%d) tid=%d:pvm_setsbuf(%d)\n",MYPE(),pvm_mytid(),bufid
);
244 prv_sbufid
=CpvAccess(pvmc_sbufid
);
248 pvmc_getbuf(prv_sbufid);
249 pvm_freebuf(prv_sbufid);
253 CpvAccess(pvmc_sbufid
)=bufid
;
258 int pvm_getrbuf(void)
261 PRINTF("Pe(%d) tid=%d:pvm_getrbuf()\n",MYPE(),pvm_mytid());
263 return CpvAccess(pvmc_rbufid
);
266 int pvm_setrbuf(int bufid
)
271 PRINTF("Pe(%d) tid=%d:pvm_setrbuf(%d)\n",MYPE(),pvm_mytid(),bufid
);
273 prv_rbufid
=CpvAccess(pvmc_rbufid
);
277 pvmc_getbuf(prv_rbufid);
278 pvm_freebuf(prv_rbufid);
283 CpvAccess(pvmc_rbufid
)=bufid
;
288 int pvm_initsend(int encoding
)
293 PRINTF("Pe(%d) tid=%d:pvm_initsend(%d)\n",MYPE(),pvm_mytid(),encoding
);
295 if (CpvAccess(pvmc_sbufid
) > 0)
296 pvm_freebuf(CpvAccess(pvmc_sbufid
));
298 newbufid
=pvm_mkbuf(encoding
);
301 PRINTF("Pe(%d) tid=%d:%s:%d pvm_initsend() couldn't alloc new buffer\n",
302 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
304 CpvAccess(pvmc_sbufid
)=newbufid
;
305 return CpvAccess(pvmc_sbufid
);
308 int pvm_bufinfo(int bufid
, int *bytes
, int *msgtag
, int *tid
)
311 PRINTF("Pe(%d) tid=%d:pvm_bufinfo(%d,0x%x,0x%x,0x%x)\n",
312 pvm_mytid(),bufid
,bytes
,msgtag
,tid
);
315 if ((bufid
<=0) || (bufid
>= MAX_BUFFERS
) ||
316 (CpvAccess(pvmc_bufarray
)[bufid
].refcount
<= 0)) {
317 PRINTF("Pe(%d) tid=%d:%s:%d pvm_bufinfo(%d) info requested about unused buffer\n",
318 MYPE(),pvm_mytid(),__FILE__
,__LINE__
,bufid
);
322 *bytes
=CpvAccess(pvmc_bufarray
)[bufid
].bytes
;
324 *msgtag
=CpvAccess(pvmc_bufarray
)[bufid
].tag
;
326 *tid
=CpvAccess(pvmc_bufarray
)[bufid
].tid
;
330 int pvmc_sendmsgsz(void)
333 pvmc_buffer
*cur_buf
;
336 if ((CpvAccess(pvmc_sbufid
)<=0) || (CpvAccess(pvmc_sbufid
) >= MAX_BUFFERS
) ||
337 (CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_sbufid
)].refcount
<= 0)) {
338 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_sendmsgsz() size requested for unused send buffer\n",
339 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
343 cur_buf
= &(CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_sbufid
)]);
345 msgsz
=sizeof(cur_buf
->bytes
)+sizeof(cur_buf
->tag
)+
346 sizeof(cur_buf
->tid
)+sizeof(cur_buf
->num_items
);
348 cur_item
=cur_buf
->first_item
;
349 while (cur_item
!= cur_buf
->last_item
) {
350 msgsz
+= cur_item
->size
+sizeof(cur_item
->type
)+sizeof(cur_item
->size
);
351 cur_item
= cur_item
->nxt
;
357 int pvmc_settidtag(int pvm_tid
, int tag
)
359 pvmc_buffer
*cur_buf
;
361 if ((CpvAccess(pvmc_sbufid
)<=0) || (CpvAccess(pvmc_sbufid
) >= MAX_BUFFERS
) ||
362 (CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_sbufid
)].refcount
<= 0)) {
363 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_setidtag() unused send buffer\n",
364 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
367 cur_buf
=&(CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_sbufid
)]);
369 cur_buf
->tid
= pvm_tid
;
374 int pvmc_packmsg(void *msgbuf
)
376 pvmc_buffer
*cur_buf
;
380 if ((CpvAccess(pvmc_sbufid
)<=0) || (CpvAccess(pvmc_sbufid
) >= MAX_BUFFERS
) ||
381 (CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_sbufid
)].refcount
<= 0)) {
382 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_packmsg() unused send buffer\n",
383 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
386 cur_buf
=&(CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_sbufid
)]);
387 *((int *)((char *)msgbuf
+bytes_packed
)) = cur_buf
->bytes
;
388 bytes_packed
+=sizeof(int);
389 *((int *)((char *)msgbuf
+bytes_packed
)) = cur_buf
->tag
;
390 bytes_packed
+=sizeof(int);
391 *((int *)((char *)msgbuf
+bytes_packed
)) = cur_buf
->tid
;
392 bytes_packed
+=sizeof(int);
393 *((int *)((char *)msgbuf
+bytes_packed
)) = cur_buf
->num_items
;
394 bytes_packed
+=sizeof(int);
397 PRINTF("Pe(%d) pvmc_packmsg: %d items packed for tag %d\n",
398 MYPE(),cur_buf
->num_items
,cur_buf
->tag
);
400 cur_item
=cur_buf
->first_item
;
401 while(cur_item
!=cur_buf
->last_item
) {
402 *((int *)((char *)msgbuf
+bytes_packed
)) = cur_item
->type
;
403 bytes_packed
+=sizeof(int);
404 *((int *)((char *)msgbuf
+bytes_packed
)) = cur_item
->size
;
405 bytes_packed
+=sizeof(int);
406 cur_item
=cur_item
->nxt
;
409 cur_item
=cur_buf
->first_item
;
410 while(cur_item
!=cur_buf
->last_item
) {
411 if (cur_item
->size
> 0) {
412 memcpy((void *)((char *)msgbuf
+bytes_packed
),cur_item
->data
,
414 bytes_packed
+=cur_item
->size
;
416 cur_item
=cur_item
->nxt
;
421 int pvmc_unpackmsg(void *msgbuf
, void *start_of_msg
)
423 pvmc_buffer
*cur_buf
;
424 pvmc_item
*cur_item
, *nxt_item
;
425 int bytes_unpacked
=0;
428 if ((CpvAccess(pvmc_rbufid
)<=0) || (CpvAccess(pvmc_rbufid
) >= MAX_BUFFERS
) ||
429 (CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_rbufid
)].refcount
<= 0)) {
430 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_unpackmsg() uninitialized recv buffer\n",
431 MYPE(),__FILE__
,__LINE__
);
434 cur_buf
= &(CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_rbufid
)]);
435 pvmc_emptybuf(cur_buf
);
437 cur_buf
->bytes
= *((int *)((char *)start_of_msg
+bytes_unpacked
));
438 bytes_unpacked
+= sizeof(int);
439 cur_buf
->tag
= *((int *)((char *)start_of_msg
+bytes_unpacked
));
440 bytes_unpacked
+= sizeof(int);
441 cur_buf
->tid
= *((int *)((char *)start_of_msg
+bytes_unpacked
));
442 bytes_unpacked
+= sizeof(int);
443 cur_buf
->num_items
= *((int *)((char *)start_of_msg
+bytes_unpacked
));
444 bytes_unpacked
+= sizeof(int);
447 PRINTF("Pe(%d) pvmc_unpackmsg: %d items unpacked for tag %d\n",
448 MYPE(),cur_buf
->num_items
,cur_buf
->tag
);
451 cur_buf
->data_buf
= (char *)msgbuf
;
452 else cur_buf
->data_buf
= (char *)NULL
;
454 cur_item
=(pvmc_item
*)MALLOC(sizeof(pvmc_item
));
455 cur_buf
->first_item
=cur_item
;
456 cur_buf
->cur_item
=cur_item
;
458 if (cur_item
==(pvmc_item
*)NULL
) {
459 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_unpackmsg() can't allocate memory\n",
460 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
465 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_unpackmsg() unpacking %d messages.\n",
466 MYPE(),pvm_mytid(),__FILE__
,__LINE__
,cur_buf
->num_items
);
469 for(i
=0;i
<cur_buf
->num_items
;i
++) {
470 cur_item
->type
= *((int *)((char *)start_of_msg
+bytes_unpacked
));
471 bytes_unpacked
+=sizeof(int);
472 cur_item
->size
= *((int *)((char *)start_of_msg
+bytes_unpacked
));
473 bytes_unpacked
+=sizeof(int);
475 nxt_item
=(pvmc_item
*)MALLOC(sizeof(pvmc_item
));
478 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_unpackmsg() can't allocate memory\n",
479 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
482 cur_item
->nxt
= nxt_item
;
488 cur_item
->free_data
= FALSE
;
489 cur_item
->data
= (char *) NULL
;
490 cur_item
->nxt
= (pvmc_item
*) NULL
;
492 cur_buf
->last_item
= cur_item
;
494 cur_item
= cur_buf
->first_item
;
495 while(cur_item
!=cur_buf
->last_item
) {
496 if (cur_item
->size
> 0) {
497 cur_item
->free_data
=FALSE
;
498 cur_item
->data
= (char *)start_of_msg
+bytes_unpacked
;
499 bytes_unpacked
+=cur_item
->size
;
501 else cur_item
->data
=NULL
;
502 cur_item
= cur_item
->nxt
;
505 return bytes_unpacked
;
508 int pvmc_gettag(void *msgbuf
)
510 return *((int *)msgbuf
+1);
513 void *pvmc_mkitem(int nbytes
, int type
)
518 if ((CpvAccess(pvmc_sbufid
)<=0) || (CpvAccess(pvmc_sbufid
) >= MAX_BUFFERS
) ||
519 (CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_sbufid
)].refcount
<= 0)) {
520 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_mkitem() unused send buffer\n",
521 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
524 buf
= &(CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_sbufid
)]);
526 buf
->last_item
->type
=type
;
527 buf
->last_item
->size
=nbytes
;
528 databuf
=MALLOC(nbytes
);
530 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_mkitem() can't allocate data space\n",
531 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
534 buf
->last_item
->free_data
=TRUE
;
535 buf
->last_item
->data
=(char *)databuf
;
536 buf
->last_item
->nxt
=(pvmc_item
*)MALLOC(sizeof(pvmc_item
));
537 if (buf
->last_item
->nxt
==NULL
) {
538 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_mkitem() can't allocate new item\n",
539 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
543 buf
->last_item
=buf
->last_item
->nxt
;
544 buf
->last_item
->type
=0;
545 buf
->last_item
->size
=0;
546 buf
->last_item
->free_data
=FALSE
;
547 buf
->last_item
->data
=(char *)NULL
;
548 buf
->last_item
->nxt
=(pvmc_item
*)NULL
;
554 void *pvmc_getitem(int n_bytes
, int type
)
560 if ((CpvAccess(pvmc_rbufid
)<=0) || (CpvAccess(pvmc_rbufid
) >= MAX_BUFFERS
) ||
561 (CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_rbufid
)].refcount
<= 0)) {
562 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_getitem() uninitialized recv buffer\n",
563 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
566 buf
= &(CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_rbufid
)]);
568 item
= buf
->cur_item
;
570 if (item
==buf
->last_item
) {
571 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_getitem() no more items\n",
572 MYPE(),pvm_mytid(), __FILE__
,__LINE__
);
574 } else if (item
->data
==(void *)NULL
) {
575 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_getitem() uninitialized data\n",
576 MYPE(),pvm_mytid(), __FILE__
,__LINE__
);
578 } else if (item
->size
< n_bytes
) {
579 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_getitem() data size mismatch\n",
580 MYPE(),pvm_mytid(), __FILE__
,__LINE__
);
582 } else if (item
->type
!= type
) {
583 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_getitem() type mismatch\n",
584 MYPE(),pvm_mytid(), __FILE__
,__LINE__
);
590 buf
->cur_item
= buf
->cur_item
->nxt
;
594 void *pvmc_getstritem(int *n_bytes
)
600 if ((CpvAccess(pvmc_rbufid
)<=0) || (CpvAccess(pvmc_rbufid
) >= MAX_BUFFERS
) ||
601 (CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_rbufid
)].refcount
<= 0)) {
602 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_getstritem() uninitialized recv buffer\n",
603 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
606 buf
= &(CpvAccess(pvmc_bufarray
)[CpvAccess(pvmc_rbufid
)]);
608 item
= buf
->cur_item
;
609 *n_bytes
= item
->size
;
611 if (item
==buf
->last_item
) {
612 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_getstritem() no more items\n",
613 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
615 } else if (item
->data
==(void *)NULL
) {
616 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_getstritem() uninitialized data\n",
617 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
619 } else if (item
->type
!= PVM_STR
) {
620 PRINTF("Pe(%d) tid=%d:%s:%d pvmc_getstritem() type mismatch\n",
621 MYPE(),pvm_mytid(),__FILE__
,__LINE__
);
627 buf
->cur_item
= buf
->cur_item
->nxt
;