2 ** Ammasso implementation of Converse NET version
3 ** Contains Ammasso specific
6 ** CmiCommunicationInit()
9 ** CommunicationServer()
12 ** Written By: Isaac Dooley idooley2@uiuc.edu
13 ** 03/12/05 Esteban Pauli etpauli2@uiuc.edu
14 ** Filippo Gioachin gioachin@uiuc.edu
15 ** David Kunzman kunzman2@uiuc.edu
18 ** 03/12/05 : DMK : Initial Version
19 ** 04/30/05 : Filippo : Revised Version
26 #define ALIGN8(x) (int)((~7)&((x)+7))
29 /* DYNAMIC ALLOCATOR: Limit of the allowed pinned memory */
30 #define MAX_PINNED_MEMORY 100000000
31 /* DYNAMIC ALLOCATOR END */
33 #define WASTE_TIME 600
34 // In order to use CC_POST_CHECK, the last argument to cc_qp_post_sq must be "nWR"
35 #define CC_POST_CHECK(routine,args,nodeTo) {\
37 while (ammasso_check_post_err(routine args, #routine, __LINE__, &nWR, nodeTo, retry) == 1) { \
39 retry += WASTE_TIME; \
40 for (i=0; i<=WASTE_TIME; ++i) { retry --; } \
44 #define CC_CHECK(routine,args) \
45 ammasso_check_err(routine args, #routine, __LINE__);
47 static void ammasso_check_err(cc_status_t returnCode
,const char *routine
,int line
) {
48 if (returnCode
!=CC_OK
) {
50 char *errMsg
= cc_status_to_string(returnCode
);
52 // Attempt to close the RNIC
53 cc_rnic_close(contextBlock
->rnic
);
55 // Let the user know what happened and bail
56 MACHSTATE3(5,"Fatal CC error while executing %s at %s:%d\n", routine
, __FILE__
, line
);
57 MACHSTATE2(5," Description: %d, %s\n",returnCode
,errMsg
);
58 sprintf(buf
,"Fatal CC error while executing %s at %s:%d\n"
59 " Description: %d, %s\n", routine
, __FILE__
, line
,returnCode
,errMsg
);
64 // We pass the pointer used in the cc_qp_post_sq call as last parameter, since
65 // when this function is called, cc_pq_post_sq has already been called
66 static int ammasso_check_post_err(cc_status_t returnCode
,const char *routine
,int line
, int *nWR
, int nodeTo
, int retry
) {
67 if (returnCode
== CCERR_TOO_MANY_WRS_POSTED
&& *nWR
!= 1 && retry
>0) {
69 // drain the send completion queue and retry
70 while (cc_cq_poll(contextBlock
->rnic
, nodes
[nodeTo
].send_cq
, &wc
) == CC_OK
) {
71 MACHSTATE1(5, "Error posting send request - INFO: Send completed with node %d... now waiting for acknowledge...", nodeTo
);
73 MACHSTATE(5, "Error posting send request - Retrying...");
77 if (returnCode
!= CC_OK
|| *nWR
!= 1) {
79 char *errMsg
= cc_status_to_string(returnCode
);
81 // Attempt to close the RNIC
82 cc_rnic_close(contextBlock
->rnic
);
84 // Let the user know what happened and bail
85 MACHSTATE3(5,"Fatal CC error while executing %s at %s:%d\n", routine
, __FILE__
, line
);
86 MACHSTATE3(5," Description: %d, %s (nWR = %d)\n",returnCode
,errMsg
,nWR
);
87 sprintf(buf
,"Fatal CC error while executing %s at %s:%d\n"
88 " Description: %d, %s (nWR = %d)\n", routine
, __FILE__
, line
,returnCode
,errMsg
,*nWR
);
95 ////////////////////////////////////////////////////////////////////////////////////////////////////
96 // Function ProtoTypes /////////////////////////////////////////////////////////////////////////////
98 void CmiMachineInit(char **argv
);
100 void CmiAmmassoNodeAddressesStoreHandler(int pe
, struct sockaddr_in
*addr
, int port
);
102 void AmmassoDoIdle(void);
103 void CmiNotifyIdle(void);
104 static CmiIdleState
* CmiNotifyGetState(void);
105 static void CmiNotifyBeginIdle(CmiIdleState
*s
);
106 static void CmiNotifyStillIdle(CmiIdleState
*s
);
108 void sendAck(OtherNode node
);
109 AmmassoToken
*getQPSendToken(OtherNode node
);
110 int sendDataOnQP(char* data
, int len
, OtherNode node
, char flags
);
111 void DeliverViaNetwork(OutgoingMsg msg
, OtherNode otherNode
, int rank
, unsigned int broot
, int copy
);
112 static void CommunicationServer(int withDelayMs
, int where
);
113 void CmiMachineExit(void);
115 void AsynchronousEventHandler(cc_rnic_handle_t rnic
, cc_event_record_t
*eventRecord
, void *cb
);
116 void CheckRecvBufForMessage(OtherNode node
);
117 //void CompletionEventHandler(cc_rnic_handle_t rnic, cc_cq_handle_t cq, void *cb);
118 //void CompletionEventHandlerWithAckFlag(cc_rnic_handle_t rnic, cc_cq_handle_t cq, void *cb, int breakOnAck);
120 void CmiAmmassoOpenQueuePairs(void);
122 void processAmmassoControlMessage(char* msg
, int len
, Tailer
*tail
, OtherNode from
);
123 int ProcessMessage(char* msg
, int len
, Tailer
*tail
, OtherNode from
);
125 OtherNode
getNodeFromQPId(cc_qp_id_t qp_id
);
126 OtherNode
getNodeFromQPHandle(cc_qp_handle_t qp
);
128 void establishQPConnection(OtherNode node
, int reuseQPFlag
);
129 void reestablishQPConnection(OtherNode node
);
130 void closeQPConnection(OtherNode node
, int destroyQPFlag
);
132 void BufferAlloc(int n
);
133 void TokenAlloc(int n
);
134 void RequestTokens(OtherNode node
, int n
);
135 void GrantTokens(OtherNode node
, int n
);
136 void RequestReleaseTokens(OtherNode node
, int n
);
137 void ReleaseTokens(OtherNode node
, int n
);
139 ////////////////////////////////////////////////////////////////////////////////////////////////////
140 // Function Bodies /////////////////////////////////////////////////////////////////////////////////
142 /* Callbacks used by the DYNAMIC ALLOCATOR */
143 void AllocatorCheck (void) {
146 for (i
=0; i
<contextBlock
->numNodes
; ++i
) {
147 if (i
==contextBlock
->myNode
) continue;
148 limit
= nodes
[i
].num_sendTokens
- nodes
[i
].max_used_tokens
- 10;
149 CmiPrintf("[%d] AllocatorCheck called: node %d, limit %d\n",CmiMyPe(),i
,limit
);
151 ReleaseTokens(&nodes
[i
], limit
);
152 CmiPrintf("[%d] Releasing %d tokens to %d\n", CmiMyPe(), limit
, i
);
153 nodes
[i
].max_used_tokens
= 0;
157 /* DYNAMIC ALLOCATOR END */
159 void BufferAlloc(int n
) {
162 cc_stag_index_t newStagIndex
;
163 AmmassoBuffer
*newBuffers
;
165 MACHSTATE1(3, "Allocating %d new Receive Buffers",n
);
167 // Try to allocate the memory for n receiving buffers
168 newBuffers
= (AmmassoBuffer
*) CmiAlloc(n
*sizeof(AmmassoBuffer
));
170 if (newBuffers
== NULL
) {
172 // Attempt to close the RNIC
173 cc_rnic_close(contextBlock
->rnic
);
175 // Let the user know what happened and bail
176 MACHSTATE(5, "BufferAlloc() - ERROR: Unable to allocate memory for RECV buffers");
177 sprintf(buf
, "BufferAlloc() - ERROR: Unable to allocate memory for RECV buffers");
181 contextBlock
->pinnedMemory
+= n
*sizeof(AmmassoBuffer
);
182 CC_CHECK(cc_nsmr_register_virt
,(contextBlock
->rnic
,
183 CC_ADDR_TYPE_VA_BASED
,
184 (cc_byte_t
*)newBuffers
,
185 n
*sizeof(AmmassoBuffer
),
188 CC_ACF_LOCAL_READ
| CC_ACF_LOCAL_WRITE
| CC_ACF_REMOTE_WRITE
,
192 for (i
=0; i
<n
; ++i
) {
193 newBuffers
[i
].tail
.length
= 0;
194 newBuffers
[i
].next
= &(newBuffers
[i
+1]);
195 newBuffers
[i
].stag
= newStagIndex
;
197 newBuffers
[n
-1].next
= NULL
;
198 if (contextBlock
->freeRecvBuffers
== NULL
) {
199 contextBlock
->freeRecvBuffers
= newBuffers
;
201 contextBlock
->last_freeRecvBuffers
->next
= newBuffers
;
203 contextBlock
->last_freeRecvBuffers
= &newBuffers
[n
-1];
204 contextBlock
->num_freeRecvBuffers
+= n
;
207 void TokenAlloc(int n
) {
210 cc_stag_index_t newStagIndex
;
211 AmmassoToken
*sendToken
, *tokenScanner
;
212 cc_data_addr_t
*sendSgl
;
213 AmmassoBuffer
*sendBuffer
;
215 MACHSTATE1(3, "Allocating %d new Tokens",n
);
217 // Try to allocate the memory for n sending buffers
218 sendBuffer
= (AmmassoBuffer
*) CmiAlloc(n
*sizeof(AmmassoBuffer
));
220 if (sendBuffer
== NULL
) {
222 // Attempt to close the RNIC
223 cc_rnic_close(contextBlock
->rnic
);
225 // Let the user know what happened and bail
226 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for SEND buffers");
227 sprintf(buf
, "TokenAlloc() - ERROR: Unable to allocate memory for SEND buffers");
231 contextBlock
->pinnedMemory
+= n
*sizeof(AmmassoBuffer
);
232 CC_CHECK(cc_nsmr_register_virt
,(contextBlock
->rnic
,
233 CC_ADDR_TYPE_VA_BASED
,
234 (cc_byte_t
*)sendBuffer
,
235 n
*sizeof(AmmassoBuffer
),
238 CC_ACF_LOCAL_READ
| CC_ACF_LOCAL_WRITE
,
242 // Allocate the send tokens
243 sendToken
= (AmmassoToken
*) CmiAlloc(n
*ALIGN8(sizeof(AmmassoToken
)));
245 if (sendToken
== NULL
) {
247 // Attempt to close the RNIC
248 cc_rnic_close(contextBlock
->rnic
);
250 // Let the user know what happened and bail
251 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for send TOKEN buffers");
252 sprintf(buf
, "TokenAlloc() - ERROR: Unable to allocate memory for send TOKEN buffers");
256 sendSgl
= (cc_data_addr_t
*) CmiAlloc(n
*ALIGN8(sizeof(cc_data_addr_t
)));
258 if (sendSgl
== NULL
) {
260 // Attempt to close the RNIC
261 cc_rnic_close(contextBlock
->rnic
);
263 // Let the user know what happened and bail
264 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for send SGL buffers");
265 sprintf(buf
, "TokenAlloc() - ERROR: Unable to allocate memory for send SGL buffers");
269 tokenScanner
= sendToken
;
270 for (i
=0; i
<n
; ++i
) {
271 sendSgl
->stag
= newStagIndex
;
272 sendSgl
->length
= AMMASSO_BUFSIZE
+ sizeof(Tailer
);
273 sendSgl
->to
= (unsigned long)&(sendBuffer
[i
]);
274 tokenScanner
->wr
.wr_id
= (unsigned long)tokenScanner
;
275 tokenScanner
->wr
.wr_type
= CC_WR_TYPE_RDMA_WRITE
;
276 tokenScanner
->wr
.wr_u
.rdma_write
.local_sgl
.sge_count
= 1;
277 tokenScanner
->wr
.wr_u
.rdma_write
.local_sgl
.sge_list
= sendSgl
;
278 tokenScanner
->wr
.signaled
= 1;
279 tokenScanner
->localBuf
= (AmmassoBuffer
*)&(sendBuffer
[i
]);
280 LIST_ENQUEUE(contextBlock
->,freeTokens
,tokenScanner
);
281 sendSgl
= (cc_data_addr_t
*)(((char*)sendSgl
)+ALIGN8(sizeof(cc_data_addr_t
)));
282 tokenScanner
= (AmmassoToken
*)(((char*)tokenScanner
)+ALIGN8(sizeof(AmmassoToken
)));
286 void RequestTokens(OtherNode node
, int n
) {
289 sendDataOnQP(buf
, sizeof(int), node
, AMMASSO_MOREBUFFERS
);
292 void GrantTokens(OtherNode node
, int n
) {
295 AmmassoBuffer
*buffer
;
296 AmmassoBuffer
*prebuffer
;
297 AmmassoTokenDescription
*tokenDesc
;
298 if (node
->pending
!= NULL
) return;
299 if (n
*sizeof(AmmassoTokenDescription
) + sizeof(int) > AMMASSO_BUFSIZE
) {
300 n
= (AMMASSO_BUFSIZE
-sizeof(int)) / sizeof(AmmassoTokenDescription
);
302 if (contextBlock
->num_freeRecvBuffers
< n
) {
303 int quantity
= (n
- contextBlock
->num_freeRecvBuffers
+ 1023) & (~1023);
304 BufferAlloc(quantity
);
306 buf
= (char*) CmiAlloc(n
*sizeof(AmmassoTokenDescription
) + sizeof(int));
308 tokenDesc
= (AmmassoTokenDescription
*)(buf
+sizeof(int));
309 buffer
= contextBlock
->freeRecvBuffers
;
310 for (i
=0; i
<n
; ++i
) {
311 tokenDesc
[i
].stag
= buffer
->stag
;
312 tokenDesc
[i
].to
= (unsigned long)buffer
;
314 buffer
= buffer
->next
;
316 node
->pending
= contextBlock
->freeRecvBuffers
;
317 node
->last_pending
= prebuffer
;
318 node
->num_pending
= n
;
319 prebuffer
->next
= NULL
;
320 contextBlock
->num_freeRecvBuffers
-= n
;
321 contextBlock
->freeRecvBuffers
= buffer
;
322 sendDataOnQP(buf
, n
*sizeof(AmmassoTokenDescription
) + sizeof(int), node
, AMMASSO_ALLOCATE
);
326 void RequestReleaseTokens(OtherNode node
, int n
) {
329 sendDataOnQP(buf
, sizeof(int), node
, AMMASSO_RELEASE
);
332 void ReleaseTokens(OtherNode node
, int n
) {
335 AmmassoBuffer
*tokenBuf
;
336 cc_data_addr_t
*tokenSgl
;
338 if (node
->num_sendTokens
< n
) n
= node
->num_sendTokens
- 1;
340 token
= node
->sendTokens
;
341 tokenBuf
= token
->localBuf
;
343 tokenBuf
->tail
.length
= 1;
344 tokenBuf
->tail
.ack
= 0; // do not send any ACK with this message
345 tokenBuf
->tail
.flags
= AMMASSO_RELEASED
;
347 // Setup the local SGL
348 tokenSgl
= token
->wr
.wr_u
.rdma_write
.local_sgl
.sge_list
;
349 tokenSgl
->length
= sizeof(Tailer
);
350 tokenSgl
->to
= (unsigned long)&tokenBuf
->tail
;
351 token
->wr
.wr_u
.rdma_write
.remote_to
= (unsigned long)&token
->remoteBuf
->tail
;
353 CC_POST_CHECK(cc_qp_post_sq
,(contextBlock
->rnic
, node
->qp
, &token
->wr
, 1, &nWR
),node
->myNode
);
355 if (contextBlock
->freeTokens
== NULL
) {
356 contextBlock
->freeTokens
= node
->sendTokens
;
358 contextBlock
->last_freeTokens
->next
= node
->sendTokens
;
360 for (i
=1; i
<n
; ++i
) token
= token
->next
;
361 contextBlock
->last_freeTokens
= token
;
362 node
->sendTokens
= token
->next
;
364 contextBlock
->num_freeTokens
+= n
;
365 node
->num_sendTokens
-= n
;
369 * This is called as the node is starting up. It does some initialization of the machine layer.
371 void CmiMachineInit(char **argv
) {
378 AMMASSO_STATS_START(MachineInit
)
380 MACHSTATE(2, "CmiMachineInit() - INFO: (***** Ammasso Specific*****) - Called... Initializing RNIC...");
381 MACHSTATE1(1, "CmiMachineInit() - INFO: Cmi_charmrun_pid = %d", Cmi_charmrun_pid
);
384 //CcdCallOnConditionKeep(CcdPERIODIC, (CcdVoidFn)periodicFunc, NULL);
387 // Allocate a context block that will be used throughout this machine layer
388 if (contextBlock
!= NULL
) {
389 MACHSTATE(5, "CmiMachineInit() - ERROR: contextBlock != NULL");
390 sprintf(buf
, "CmiMachineInit() - ERROR: contextBlock != NULL");
393 contextBlock
= (mycb_t
*)malloc(sizeof(mycb_t
));
394 if (contextBlock
== NULL
) {
395 MACHSTATE(5, "CmiMachineInit() - ERROR: Unable to malloc memory for contextBlock");
396 sprintf(buf
, "CmiMachineInit() - ERROR: Unable to malloc memory for contextBlock");
400 // Initialize the contextBlock by zero-ing everything out and then setting anything special
401 memset(contextBlock
, 0, sizeof(mycb_t
));
402 contextBlock
->rnic
= -1;
404 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-OPEN_RNIC)");
406 // Check to see if in stand-alone mode
407 if (Cmi_charmrun_pid
!= 0) {
409 // Try to Open the RNIC
410 // TODO : Look-up the difference between CC_PBL_PAGE_MODE and CC_PBL_BLOCK_MODE
411 // TODO : Would a call to cc_rnic_enum or cc_rnic_query do any good here?
412 rtn
= cc_rnic_open(0, CC_PBL_PAGE_MODE
, contextBlock
, &(contextBlock
->rnic
));
414 MACHSTATE2(5, "CmiMachineInit() - ERROR: Unable to open RNIC: %d, \"%s\"", rtn
, cc_status_to_string(rtn
));
415 sprintf(buf
, "CmiMachineInit() - ERROR: Unable to open RNIC: %d, \"%s\"", rtn
, cc_status_to_string(rtn
));
419 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-SET-ASYNC-HANDLER)");
421 // Set the asynchronous event handler function
422 CC_CHECK(cc_eh_set_async_handler
,(contextBlock
->rnic
, AsynchronousEventHandler
, contextBlock
));
425 MACHSTATE(3, "CmiMachineInit() - INFO: (PRE-SET-CE-HANDLER)");
427 // Set the Completion Event Handler
428 contextBlock->eh_id = 0;
429 CC_CHECK(cc_eh_set_ce_handler,(contextBlock->rnic, CompletionEventHandler, &(contextBlock->eh_id)));
432 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-PD-ALLOC)");
434 // Allocate the Protection Domain
435 CC_CHECK(cc_pd_alloc
,(contextBlock
->rnic
, &(contextBlock
->pd_id
)));
437 MACHSTATE(1, "CmiMachineInit() - INFO: RNIC Open For Business!!!");
439 } else { // Otherwise, not in stand-alone mode
441 // Flag the rnic variable as invalid
442 contextBlock
->rnic
= -1;
445 MACHSTATE(2, "CmiMachineInit() - INFO: Completed Successfully !!!");
447 AMMASSO_STATS_END(MachineInit
)
450 void CmiCommunicationInit(char **argv
)
454 void CmiAmmassoNodeAddressesStoreHandler(int pe
, struct sockaddr_in
*addr
, int port
) {
456 // DMK : NOTE : The hope is that this can be used to request the RMDA addresses of the other nodes after the
457 // initial addresses from charmrun are given to the node. Get the address here, use that to request
458 // the RDMA address, use the RDMA address to create the QP connection (in establishQPConnection(), which
459 // only subtracts one from the address at the moment... the way our cluster is setup).
461 MACHSTATE1(2, "CmiNodeAddressesStoreHandler() - INFO: pe = %d", pe
);
462 MACHSTATE1(1, " addr = { sin_family = %d,", addr
->sin_family
);
463 MACHSTATE1(1, " sin_port = %d,", addr
->sin_port
);
464 MACHSTATE4(1, " sin_addr.s_addr = %d.%d.%d.%d }", (addr
->sin_addr
.s_addr
& 0xFF), ((addr
->sin_addr
.s_addr
>> 8) & 0xFF), ((addr
->sin_addr
.s_addr
>> 16) & 0xFF), ((addr
->sin_addr
.s_addr
>> 24) & 0xFF));
465 MACHSTATE1(1, " port = %d", port
);
469 void AmmassoDoIdle(void) {
474 AMMASSO_STATS_START(AmmassoDoIdle
)
476 /* DYNAMIC ALLOCATOR: Callbacks */
477 /*if (contextBlock->conditionRegistered == 0) {
478 CcdCallOnConditionKeep(CcdPERIODIC_1s, (CcdVoidFn) AllocatorCheck, NULL);
479 //CcdCallFnAfter((CcdVoidFn) AllocatorCheck, NULL, 100);
480 contextBlock->conditionRegistered = 1;
482 /* DYNAMIC ALLOCATOR END */
484 for (i
= 0; i
< contextBlock
->numNodes
; i
++) {
485 if (i
== contextBlock
->myNode
) continue;
486 CheckRecvBufForMessage(&(nodes
[i
]));
487 while (cc_cq_poll(contextBlock
->rnic
, nodes
[i
].send_cq
, &wc
) == CC_OK
) {
488 MACHSTATE1(3, "AmmassoDoIdle() - INFO: Send completed with node %d... now waiting for acknowledge...", i
);
492 AMMASSO_STATS_END(AmmassoDoIdle
)
495 void CmiNotifyIdle(void) {
499 static CmiIdleState
* CmiNotifyGetState(void) {
503 static void CmiNotifyBeginIdle(CmiIdleState
*s
) {
507 static void CmiNotifyStillIdle(CmiIdleState
*s
) {
511 /* NOTE: if the ack overflows, we cannot use this method of sending, but we need
512 to send a special message for the purpose */
513 void sendAck(OtherNode node
) {
517 AMMASSO_STATS_START(sendAck
)
519 MACHSTATE2(3, "sendAck() - Ammasso - INFO: Called... sending ACK %d to node %d", *node
->remoteAck
, node
->myNode
);
521 if (*node
->remoteAck
< ACK_MASK
) {
523 // Send an ACK message to the specified QP/Connection/Node
524 CC_POST_CHECK(cc_qp_post_sq
,(contextBlock
->rnic
, node
->qp
, node
->ack_sq_wr
, 1, &nWR
),node
->myNode
);
527 // Rare case: happens only after days of run! In this case, do not update
528 // directly the ack, but zero it on the other side, wait one second to be
529 // safe, and then send a regular message with the ACK
531 AmmassoBuffer
*tokenBuf
;
532 int tmp_ack
= *node
->remoteAck
;
533 *node
->remoteAck
= 0;
534 CC_POST_CHECK(cc_qp_post_sq
,(contextBlock
->rnic
, node
->qp
, node
->ack_sq_wr
, 1, &nWR
),node
->myNode
);
538 token
= getQPSendToken(node
);
539 tokenBuf
= token
->localBuf
;
540 tokenBuf
->tail
.ack
= tmp_ack
;
541 tokenBuf
->tail
.flags
= ACK_WRAPPING
;
542 tokenBuf
->tail
.length
= 1; // So it will be seen by the receiver
543 token
->wr
.wr_u
.rdma_write
.local_sgl
.sge_list
->length
= sizeof(Tailer
);
544 token
->wr
.wr_u
.rdma_write
.local_sgl
.sge_list
->to
= (unsigned long)&tokenBuf
->tail
;
545 token
->wr
.wr_u
.rdma_write
.remote_to
= (unsigned long)&token
->remoteBuf
->tail
;
546 CC_POST_CHECK(cc_qp_post_sq
,(contextBlock
->rnic
, node
->qp
, &token
->wr
, 1, &nWR
),node
->myNode
);
547 LIST_ENQUEUE(node
->,usedTokens
,token
);
548 node
->max_used_tokens
= (node
->num_usedTokens
>node
->max_used_tokens
)?node
->num_usedTokens
:node
->max_used_tokens
;
549 *node
->remoteAck
= tmp_ack
& ACK_MASK
;
552 node
->messagesNotYetAcknowledged
= 0;
554 AMMASSO_STATS_END(sendAck
)
557 /* NOTE, even in SMP versions, only the communication server should be sending
558 messages out, thus no locking should be necessary */
559 /* This function returns the token usable for next communication. If no token is
560 available, it blocks until one becomes available */
561 AmmassoToken
*getQPSendToken(OtherNode node
) {
565 ammasso_ack_t newAck
;
566 while (node
->connectionState
!= QP_CONN_STATE_CONNECTED
||
567 node
->sendTokens
== NULL
) {
568 // Try to see if an ACK has been sent directly, so we free some tokens The
569 // direct token will never be greater than ACK_MASK (by protocol
570 // definition), so we do not need to wrap around
571 MACHSTATE(3, "getQPSendBuffer() - INFO: No tokens available");
572 if (*node
->directAck
> node
->localAck
) {
573 newAck
= *node
->directAck
;
574 for (i
=node
->localAck
; i
<newAck
; ++i
) {
575 LIST_DEQUEUE(node
->,usedTokens
,token
);
576 LIST_ENQUEUE(node
->,sendTokens
,token
);
578 node
->localAck
= newAck
;
581 CheckRecvBufForMessage(node
);
583 while (cc_cq_poll(contextBlock
->rnic
, node
->send_cq
, &wc
) == CC_OK
) {
584 MACHSTATE1(3, "getQPSendBuffer() - INFO: Send completed with node %d... now waiting for acknowledge...", node
->myNode
);
587 LIST_DEQUEUE(node
->,sendTokens
,token
);
592 int getQPSendBuffer(OtherNode node, char force) {
597 AMMASSO_STATS_START(getQPSendBuffer)
599 MACHSTATE1(3, "getQPSendBuffer() - Ammasso - Called (send to node %d)...", node->myNode);
603 AMMASSO_STATS_START(getQPSendBuffer_loop)
607 AMMASSO_STATS_START(getQPSendBuffer_lock)
609 MACHSTATE(3, "getQPSendBuffer() - INFO: Pre-sendBufLock");
610 #if CMK_SHARED_VARS_UNAVAILABLE
611 while (node->sendBufLock != 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
613 CmiLock(node->sendBufLock);
615 AMMASSO_STATS_END(getQPSendBuffer_lock)
617 // If force is set, let the message use any of the available send buffers. Otherwise, there can only
618 // be AMMASSO_NUMMSGBUFS_PER_QP message outstanding (haven't gotten an ACK for) so wait for that.
619 // VERY IMPORTANT !!! Only use force for sending ACKs !!!!!!!! If this is done, it is ensured that there
620 // will always be at least one buffer available when the force code is executed
621 if (node->connectionState == QP_CONN_STATE_CONNECTED) {
623 rtnBufIndex = node->send_UseIndex;
624 node->send_UseIndex++;
625 if (node->send_UseIndex >= AMMASSO_NUMMSGBUFS_PER_QP * 2)
626 node->send_UseIndex = 0;
628 if (node->send_InUseCounter < AMMASSO_NUMMSGBUFS_PER_QP) {
629 rtnBufIndex = node->send_UseIndex;
630 node->send_InUseCounter++;
631 node->send_UseIndex++;
632 if (node->send_UseIndex >= AMMASSO_NUMMSGBUFS_PER_QP * 2)
633 node->send_UseIndex = 0;
638 CmiUnlock(node->sendBufLock);
639 MACHSTATE3(3, "getQPSendBuffer() - INFO: Post-sendBufLock - rtnBufIndex = %d, node->connectionState = %d, node->send_UseIndex = %d", rtnBufIndex, node->connectionState, node->send_UseIndex);
641 if (rtnBufIndex >= 0) {
643 AMMASSO_STATS_END(getQPSendBuffer_loop)
650 AMMASSO_STATS_START(getQPSendBuffer_CEH)
652 CheckRecvBufForMessage(node);
653 //CompletionEventHandlerWithAckFlag(contextBlock->rnic, node->recv_cq, node, 1);
655 //CompletionEventHandler(contextBlock->rnic, node->send_cq, node);
656 while (cc_cq_poll(contextBlock->rnic, node->send_cq, &wc) == CC_OK) {
657 MACHSTATE1(3, "getQPSendBuffer() - INFO: Send completed with node %d... now waiting for acknowledge...", nextNode);
661 AMMASSO_STATS_END(getQPSendBuffer_CEH)
663 AMMASSO_STATS_END(getQPSendBuffer_loop)
667 //// Increment the send_UseIndex counter so another buffer will be used next time
668 //node->send_UseIndex++;
669 //if (node->send_UseIndex >= AMMASSO_NUMMSGBUFS_PER_QP * 2)
670 // node->send_UseIndex = 0;
672 MACHSTATE1(3, "getQPSendBuffer() - Ammasso - Finished (returning buffer index: %d)", rtnBufIndex);
674 AMMASSO_STATS_END(getQPSendBuffer)
681 // NOTE: The force parameter can be thought of as an "is ACK" control message flag (see comments in getQPSendBuffer())
682 int sendDataOnQP(char* data
, int len
, OtherNode node
, char flags
) {
684 AmmassoToken
*sendBufToken
;
685 AmmassoBuffer
*tokenBuf
;
686 cc_data_addr_t
*tokenSgl
;
691 char *origMsgStart
= data
;
696 if (origSize
<= 1024) {
697 AMMASSO_STATS_START(sendDataOnQP_1024
)
698 } else if (origSize
<= 2048) {
699 AMMASSO_STATS_START(sendDataOnQP_2048
)
700 } else if (origSize
<= 4096) {
701 AMMASSO_STATS_START(sendDataOnQP_4096
)
702 } else if (origSize
<= 16384) {
703 AMMASSO_STATS_START(sendDataOnQP_16384
)
705 AMMASSO_STATS_START(sendDataOnQP_over
)
708 AMMASSO_STATS_START(sendDataOnQP
)
710 //CompletionEventHandler(contextBlock->rnic, node->recv_cq, node);
711 //CompletionEventHandler(contextBlock->rnic, node->send_cq, node);
712 while (cc_cq_poll(contextBlock
->rnic
, node
->send_cq
, &wc
) == CC_OK
) {
713 MACHSTATE1(3, "sendDataOnQP() - INFO: Send completed with node %d... now waiting for acknowledge...", node
->myNode
);
717 MACHSTATE2(2, "sendDataOnQP() - Ammasso - INFO: Called (send to node %d, len = %d)...", node
->myNode
, len
);
719 // Assert that control messages will not be fragmented
720 CmiAssert(flags
==0 || len
<=AMMASSO_BUFSIZE
);
722 // DMK : For each message that is fragmented, attach another DGRAM header to
723 // it, (keeping in mind that the control messages are no where near large
724 // enough for this to occur).
728 AMMASSO_STATS_START(sendDataOnQP_pre_send
)
730 // Get a free send buffer (NOTE: This call will block until a send buffer is free)
731 sendBufToken
= getQPSendToken(node
);
732 tokenBuf
= sendBufToken
->localBuf
;
733 // Enqueue the token to the used queue immediately, so it is safe to be
734 // interrupted by other calls
735 LIST_ENQUEUE(node
->,usedTokens
,sendBufToken
);
736 node
->max_used_tokens
= (node
->num_usedTokens
>node
->max_used_tokens
)?node
->num_usedTokens
:node
->max_used_tokens
;
738 // Copy the contents (up to AMMASSO_BUFSIZE worth) of data into the send buffer
740 // The toSendLength includes the DGRAM header size. If the chunk sent is not
741 // the first, the initial DGRAM_HEADER_SIZE bytes need to be contructed from
742 // the DGRAM header of the original message (instead of just being copied
743 // together with the message itself)
747 toSendLength
= len
> AMMASSO_BUFSIZE
? AMMASSO_BUFSIZE
: len
; // MIN of len and AMMASSO_BUFSIZE
748 sendBufBegin
= tokenBuf
->buf
+ AMMASSO_BUFSIZE
- ALIGN8(toSendLength
);
750 memcpy(sendBufBegin
, data
, toSendLength
);
752 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: Sending 1st Fragment - toSendLength = %d...", toSendLength
);
756 toSendLength
= len
> (AMMASSO_BUFSIZE
- DGRAM_HEADER_SIZE
) ? AMMASSO_BUFSIZE
: (len
+DGRAM_HEADER_SIZE
); // MIN of len and AMMASSO_BUFSIZE
757 sendBufBegin
= tokenBuf
->buf
+ AMMASSO_BUFSIZE
- ALIGN8(toSendLength
);
759 memcpy(sendBufBegin
+DGRAM_HEADER_SIZE
, data
, toSendLength
-DGRAM_HEADER_SIZE
);
761 // This dgram header is the same of the original message, except for the
762 // sequence number, so copy the original and just modify the sequence
765 // NOTE: If the message is large enough that fragmentation needs to
766 // happen, the send_next_lock is already owned by the thread executing
768 memcpy(sendBufBegin
, origMsgStart
, DGRAM_HEADER_SIZE
);
770 ((DgramHeader
*)sendBufBegin
)->seqno
= node
->send_next
;
771 node
->send_next
= ((node
->send_next
+1) & DGRAM_SEQNO_MASK
); // Increase the sequence number
773 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: Sending Continuation Fragment - toSendLength = %d...", toSendLength
);
776 // Write the size of the message at the end of the buffer, with the ack and
778 tokenBuf
->tail
.length
= toSendLength
;
779 node
->messagesNotYetAcknowledged
= 0;
780 tokenBuf
->tail
.ack
= *node
->remoteAck
;
781 if (*node
->remoteAck
> ACK_MASK
) {
782 // Rare case of ACK wrapping
783 tokenBuf
->tail
.ack
= 0;
784 // in the rare case that we didn't send an ack with this message because
785 // the ack just wrapped around (rare case), send a full ACK message. It is
786 // safe to send it now, since the queues are consistent between sender and
787 // receiver. The fact that this ack is effectively sent before the regular
788 // message is not important, since on the other side it will be discovered
789 // only after this one is received
792 tokenBuf
->tail
.flags
= flags
;
794 // Setup the local SGL
795 tokenSgl
= sendBufToken
->wr
.wr_u
.rdma_write
.local_sgl
.sge_list
;
796 tokenSgl
->length
= ALIGN8(toSendLength
) + sizeof(Tailer
);
797 tokenSgl
->to
= (unsigned long)sendBufBegin
;
798 sendBufToken
->wr
.wr_u
.rdma_write
.remote_to
= (unsigned long)(((char*)sendBufToken
->remoteBuf
)+AMMASSO_BUFSIZE
-ALIGN8(toSendLength
));
800 // The remote_to and remote_stag are already fixed part of the token
802 AMMASSO_STATS_END(sendDataOnQP_pre_send
)
803 AMMASSO_STATS_START(sendDataOnQP_send
)
805 MACHSTATE(3, "sendDataOnQP() - Ammasso - INFO: Enqueuing RDMA Write WR...");
807 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tokenSgl->to = %p", tokenSgl
->to
);
808 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: sendBufToken->wr.wr_u.rdma_write.remote_to = %p", sendBufToken
->wr
.wr_u
.rdma_write
.remote_to
);
809 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tail.ack = %d", tokenBuf
->tail
.ack
);
810 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tail.flags = %d", tokenBuf
->tail
.flags
);
812 CC_POST_CHECK(cc_qp_post_sq
,(contextBlock
->rnic
, node
->qp
, &sendBufToken
->wr
, 1, &nWR
),node
->myNode
);
814 MACHSTATE(1, "sendDataOnQP() - Ammasso - INFO: RDMA Write WR Enqueue Completed");
816 AMMASSO_STATS_END(sendDataOnQP_send
)
817 AMMASSO_STATS_START(sendDataOnQP_post_send
)
819 // Update the data and len variables for the next while (if fragmenting is needed)
820 data
+= toSendLength
;
823 data
-= DGRAM_HEADER_SIZE
;
824 len
+= DGRAM_HEADER_SIZE
;
828 AMMASSO_STATS_END(sendDataOnQP_post_send
)
831 AMMASSO_STATS_END(sendDataOnQP
)
833 if (origSize
<= 1024) {
834 AMMASSO_STATS_END(sendDataOnQP_1024
)
835 } else if (origSize
<= 2048) {
836 AMMASSO_STATS_END(sendDataOnQP_2048
)
837 } else if (origSize
<= 4096) {
838 AMMASSO_STATS_END(sendDataOnQP_4096
)
839 } else if (origSize
<= 16384) {
840 AMMASSO_STATS_END(sendDataOnQP_16384
)
842 AMMASSO_STATS_END(sendDataOnQP_over
)
848 /* DeliverViaNetwork()
851 void DeliverViaNetwork(OutgoingMsg msg
, OtherNode otherNode
, int rank
, unsigned int broot
, int copy
) {
854 cc_stag_index_t stag
;
857 cc_uint32_t WRsPosted
;
859 AMMASSO_STATS_START(DeliverViaNetwork
)
861 MACHSTATE(2, "DeliverViaNetwork() - Ammasso - INFO: Called...");
863 // We don't need to do this since the message data is being copied into the
864 // send_buf, the OutgoingMsg can be free'd ASAP
866 // The lock will be already held by the calling function in machine.c
869 AMMASSO_STATS_START(DeliverViaNetwork_post_lock
)
871 DgramHeaderMake(msg
->data
, rank
, msg
->src
, Cmi_charmrun_pid
, otherNode
->send_next
, broot
); // Set DGram Header Fields In-Place
872 otherNode
->send_next
= ((otherNode
->send_next
+1) & DGRAM_SEQNO_MASK
); // Increase the sequence number
874 MACHSTATE1(1, "DeliverViaNetwork() - INFO: Sending message to node %d", otherNode
->myNode
);
875 MACHSTATE1(1, "DeliverViaNetwork() - INFO: rank %d", rank
);
876 MACHSTATE1(1, "DeliverViaNetwork() - INFO: broot %d", broot
);
878 AMMASSO_STATS_START(DeliverViaNetwork_send
)
880 sendDataOnQP(msg
->data
, msg
->size
, otherNode
, 0);
882 AMMASSO_STATS_END(DeliverViaNetwork_send
)
884 //CmiUnlock(otherNode->send_next_lock);
885 MACHSTATE(1, "DeliverViaNetwork() - INFO: Post-send_next_lock");
888 // DMK : NOTE : I left this in as an example of how to retister the memory with the RNIC on the fly. Since the ccil
889 // library has a bug which causes it not to de-pin memory that we unregister, it will probably be a better
890 // idea to fragment a message that is too large for a single buffer from the buffer pool.
891 /***************************************************************
892 // DMK : TODO : This is an important optimization area. This is registering the memory where the outgoing message
893 // is located with the RNIC. One balancing act that we will need to do is the cost of copying messages
894 // into memory regions already allocated VS. the cost of registering the memory. The cost of registering
895 // memory might be constant as the memory doesn't have to traversed. If the cost of doing a memcpy on a
896 // small message, since memcpy traverses the memory range, is less than the cost of registering the
897 // memory with the RNIC, it would be better to copy the message into a pre-registered memory location.
899 // Start by registering the memory of the outgoing message with the RNIC
900 rtn = cc_nsmr_register_virt(contextBlock->rnic,
901 CC_ADDR_TYPE_VA_BASED,
902 msg->data + DGRAM_HEADER_SIZE,
903 msg->size - DGRAM_HEADER_SIZE,
906 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE,
910 // Let the user know what happened
911 MACHSTATE2(3, "DeliverViaNetwork() - Ammasso - ERROR - Unable to register OutgoingMsg memory with RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
915 // Setup the Scatter/Gather List
917 sgl.to = (cc_uint64_t)(unsigned long)(msg->data + DGRAM_HEADER_SIZE);
918 sgl.length = msg->size - DGRAM_HEADER_SIZE;
920 // Create the Work Request
921 wr.wr_id = (cc_uint64_t)(unsigned long)&(wr);
922 wr.wr_type = CC_WR_TYPE_SEND;
923 wr.wr_u.send.local_sgl.sge_count = 1;
924 wr.wr_u.send.local_sgl.sge_list = &sgl;
927 // Post the Work Request to the Send Queue
928 // DMK : TODO : Update this code to handle CCERR_QP_TOO_MANY_WRS_POSTED errors (pause breifly and retry)
929 rtn = cc_qp_post_sq(contextBlock->rnic, otherNode->qp, &(wr), 1, &(WRsPosted));
930 if (rtn != CC_OK || WRsPosted != 1) {
931 // Let the user know what happened
932 MACHSTATE2(3, "DeliverViaNetwork() - Ammasso - ERROR - Unable post Work Request to Send Queue: %d, \"%s\"", rtn, cc_status_to_string(rtn));
933 displayQueueQuery(otherNode->qp, &(otherNode->qp_attrs));
935 ***************************************************************/
937 MACHSTATE(2, "DeliverViaNetwork() - Ammasso - INFO: Completed.");
939 AMMASSO_STATS_END(DeliverViaNetwork_post_lock
)
940 AMMASSO_STATS_END(DeliverViaNetwork
)
944 /****************************************************************************
948 * Checks both sockets to see which are readable and which are writeable.
949 * We check all these things at the same time since this can be done for
950 * free with ``select.'' The result is stored in global variables, since
951 * this is essentially global state information and several routines need it.
953 ***************************************************************************/
955 int CheckSocketsReady(int withDelayMs
)
958 CMK_PIPE_DECL(withDelayMs
);
960 CmiStdoutAdd(CMK_PIPE_SUB
);
961 if (Cmi_charmrun_fd
!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd
);
963 nreadable
=CMK_PIPE_CALL();
964 ctrlskt_ready_read
= 0;
965 dataskt_ready_read
= 0;
966 dataskt_ready_write
= 0;
968 if (nreadable
== 0) {
969 MACHSTATE(2, "} CheckSocketsReady (nothing readable)");
974 MACHSTATE(3, "} CheckSocketsReady (INTERRUPTED!)");
975 return CheckSocketsReady(0);
977 CmiStdoutCheck(CMK_PIPE_SUB
);
978 if (Cmi_charmrun_fd
!=-1)
979 ctrlskt_ready_read
= CMK_PIPE_CHECKREAD(Cmi_charmrun_fd
);
980 MACHSTATE(3, "} CheckSocketsReady");
984 /***********************************************************************
985 * CommunicationServer()
987 * This function does the scheduling of the tasks related to the
988 * message sends and receives.
989 * It first check the charmrun port for message, and poll the gm event
990 * for send complete and outcoming messages.
992 ***********************************************************************/
994 // NOTE: Always called from interrupt
995 static void ServiceCharmrun_nolock(void) {
999 MACHSTATE(2, "ServiceCharmrun_nolock begin {");
1004 CheckSocketsReady(0);
1005 if (ctrlskt_ready_read
) { ctrl_getone(); again
=1; }
1006 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
1009 MACHSTATE(2, "} ServiceCharmrun_nolock end");
1012 void processAmmassoControlMessage(char* msg
, int len
, Tailer
*tail
, OtherNode from
) {
1014 int nodeIndex
, ctrlType
, i
, n
, nWR
;
1015 AmmassoToken
*token
, *pretoken
;
1016 AmmassoBuffer
*tokenBuf
;
1017 cc_data_addr_t
*tokenSgl
;
1019 AmmassoTokenDescription
*tokenDesc
;
1021 AMMASSO_STATS_START(processAmmassoControlMessage
)
1023 // Do not check the message, the flags field was set, and this is enough
1025 // Perform an action based on the control message type
1026 switch (tail
->flags
) {
1030 // Decrement the node ready count by one
1031 contextBlock
->nodeReadyCount
--;
1032 MACHSTATE1(3, "processAmmassoControlMessage() - Ammasso - INFO: Received READY packet... still waiting for %d more...", contextBlock
->nodeReadyCount
);
1036 case AMMASSO_ALLOCATE
: // Sent by the receiver to allocate more tokens
1038 token
= getQPSendToken(from
);
1039 tokenBuf
= token
->localBuf
;
1041 tokenBuf
->tail
.length
= 1;
1042 tokenBuf
->tail
.ack
= 0; // do not send any ACK with this message
1043 tokenBuf
->tail
.flags
= AMMASSO_ALLOCATED
;
1045 // Setup the local SGL
1046 tokenSgl
= token
->wr
.wr_u
.rdma_write
.local_sgl
.sge_list
;
1047 tokenSgl
->length
= sizeof(Tailer
);
1048 tokenSgl
->to
= (unsigned long)&tokenBuf
->tail
;
1049 token
->wr
.wr_u
.rdma_write
.remote_to
= (unsigned long)&token
->remoteBuf
->tail
;
1051 CC_POST_CHECK(cc_qp_post_sq
,(contextBlock
->rnic
, node
->qp
, &token
->wr
, 1, &nWR
),node
->myNode
);
1053 LIST_ENQUEUE(from
->,usedTokens
,token
);
1054 from
->max_used_tokens
= (from
->num_usedTokens
>from
->max_used_tokens
)?from
->num_usedTokens
:from
->max_used_tokens
;
1055 // add the new tokens at the end of usedTokens (right after the one which
1056 // which we are sending back the confirmation)
1058 if (contextBlock
->num_freeTokens
< n
) {
1059 int quantity
= (n
- contextBlock
->num_freeTokens
+ 1023) & (~1023);
1060 BufferAlloc(quantity
);
1062 token
= contextBlock
->freeTokens
;
1063 tokenDesc
= (AmmassoTokenDescription
*)(msg
+sizeof(int));
1064 for (i
=0; i
<n
; ++i
) {
1065 token
->remoteBuf
= (AmmassoBuffer
*)tokenDesc
[i
].to
;
1066 token
->wr
.wr_u
.rdma_write
.remote_stag
= tokenDesc
[i
].stag
;
1068 token
= token
->next
;
1070 from
->last_usedTokens
->next
= contextBlock
->freeTokens
;
1071 from
->last_usedTokens
= pretoken
;
1072 pretoken
->next
= NULL
;
1073 contextBlock
->freeTokens
= token
;
1074 from
->num_usedTokens
+= n
;
1075 contextBlock
->num_freeTokens
-= n
;
1079 case AMMASSO_ALLOCATED
: // Sent by the sender to conferm the token allocation
1081 // link the pending buffers at the end of those allocated (right after the
1082 // one with which this message came)
1083 from
->last_recv_buf
->next
= from
->pending
;
1084 from
->last_recv_buf
= from
->last_pending
;
1085 from
->last_pending
= NULL
;
1086 from
->num_recv_buf
+= from
->num_pending
;
1087 *from
->remoteAck
+= from
->num_pending
;
1088 from
->num_pending
= 0;
1092 case AMMASSO_MOREBUFFERS
: // Sent by the sender to ask for more tokens
1094 /* DYNAMIC ALLOCATOR: Grant what requested */
1096 GrantTokens(from
, n
);
1097 /* DYNAMIC ALLOCATOR END */
1101 case AMMASSO_RELEASE
: // Sent by the receiver to request back tokens
1103 /* DYNAMIC ALLOCATOR: Ignore the request of the receiver to return buffers */
1104 /* DYNAMIC ALLOCATOR END */
1108 case AMMASSO_RELEASED
: // Sent by the sender to release tokens
1110 // This secondLastRecvBuf is set in CheckRecvBufForMessage
1111 from
->secondLastRecvBuf
->next
= NULL
;
1112 tokenBuf
= from
->last_recv_buf
;
1113 from
->last_recv_buf
= from
->secondLastRecvBuf
;
1114 from
->num_recv_buf
--;
1115 LIST_ENQUEUE(contextBlock
->,freeRecvBuffers
,tokenBuf
);
1117 for (i
=1; i
<n
; ++i
) {
1118 LIST_DEQUEUE(from
->,recv_buf
,tokenBuf
);
1119 LIST_ENQUEUE(contextBlock
->,freeRecvBuffers
,tokenBuf
);
1126 // The ACK has already been accounted in ProcessMessage, just mask it
1127 from
->localAck
&= ACK_MASK
;
1132 MACHSTATE1(5, "processAmmassoControlMessage() - Ammasso -INFO: Received control message with invalid flags: %d", tail
->flags
);
1133 CmiAbort("Invalid control message received");
1136 AMMASSO_STATS_END(processAmmassoControlMessage
)
1139 int ProcessMessage(char* msg
, int len
, Tailer
*tail
, OtherNode from
) {
1141 int rank
, srcPE
, seqNum
, magicCookie
, size
, i
;
1143 unsigned char checksum
;
1148 AMMASSO_STATS_START(ProcessMessage
)
1150 MACHSTATE(2, "ProcessMessage() - INFO: Called...");
1151 MACHSTATE2(1, "ProcessMessage() - INFO: tail - ack=%d, flags=%d", tail
->ack
, tail
->flags
);
1153 // Decide whether a direct ACK will be needed or not, based on how many
1154 // messages the other side has sent us, and we haven't acknowledged yet
1155 if (2*from
->messagesNotYetAcknowledged
> from
->num_recv_buf
) {
1161 // This message contains an ACK as all messages, parse it. Do not worry about
1162 // wrap around of the ACK, since when this happen a special control message
1163 // ACK_WRAPPING is received
1164 if (tail
->ack
> from
->localAck
) {
1165 AmmassoToken
*token
;
1166 for (i
=from
->localAck
; i
<tail
->ack
; ++i
) {
1167 LIST_DEQUEUE(from
->,usedTokens
,token
);
1168 LIST_ENQUEUE(from
->,sendTokens
,token
);
1170 from
->localAck
= tail
->ack
;
1174 MACHSTATE1(1, "ProcessMessage() - INFO: msg = %p", msg
);
1176 for (j
= 0; j
< DGRAM_HEADER_SIZE
+ 24; j
++) {
1177 MACHSTATE2(1, "ProcessMessage() - INFO: msg[%d] = %02x", j
, msg
[j
]);
1181 if (tail
->flags
!= 0) {
1182 processAmmassoControlMessage(msg
, len
, tail
, from
);
1186 // Get the header fields of the message
1187 DgramHeaderBreak(msg
, rank
, srcPE
, magicCookie
, seqNum
, broot
);
1189 MACHSTATE(1, "ProcessMessage() - INFO: Message Contents:");
1190 MACHSTATE1(1, " rank = %d", rank
);
1191 MACHSTATE1(1, " srcPE = %d", srcPE
);
1192 MACHSTATE1(1, " magicCookie = %d", magicCookie
);
1193 MACHSTATE1(1, " seqNum = %d", seqNum
);
1194 MACHSTATE1(1, " broot = %d", broot
);
1196 #ifdef CMK_USE_CHECKSUM
1198 // Check the checksum
1199 checksum
= computeCheckSum(msg
, len
);
1200 if (checksum
!= 0) {
1201 MACHSTATE1(5, "ProcessMessage() - Ammasso - ERROR: Received message with bad checksum (%d)... ignoring...", checksum
);
1202 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Received message with bad checksum (%d)... ignoring...\n", CmiMyPe(), checksum
);
1208 // Check the magic cookie for correctness
1209 if (magicCookie
!= (Cmi_charmrun_pid
& DGRAM_MAGIC_MASK
)) {
1210 MACHSTATE(5, "ProcessMessage() - Ammasso - ERROR: Received message with a bad magic cookie... ignoring...");
1211 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Received message with a bad magic cookie... ignoring...\n", CmiMyPe());
1214 CmiPrintf("ProcessMessage() - INFO: Cmi_charmrun_pid = %d\n", Cmi_charmrun_pid
);
1215 CmiPrintf("ProcessMessage() - INFO: node->recv_UseIndex = %d\n", nodes_by_pe
[srcPE
]->recv_UseIndex
);
1216 CmiPrintf("ProcessMessage() - INFO: msg = %p\n", msg
);
1218 for (j
= 0; j
< DGRAM_HEADER_SIZE
+ 24; j
++) {
1219 CmiPrintf("ProcessMessage() - INFO: msg[%d] = %02x\n", j
, msg
[j
]);
1227 // Get the OtherNode structure for the node this message was sent from
1228 fromNode
= nodes_by_pe
[srcPE
];
1230 CmiAssert(fromNode
== from
);
1232 MACHSTATE1(1, "ProcessMessage() - INFO: Message from node %d...", fromNode
->myNode
);
1234 //MACHSTATE(3, "ProcessMessage() - INFO: Pre-recv_expect_lock");
1235 //#if CMK_SHARED_VARS_UNAVAILABLE
1236 // while (fromNode->recv_expect_lock != 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
1238 //CmiLock(fromNode->recv_expect_lock);
1240 // Check the sequence number of the message
1241 if (seqNum
== (fromNode
->recv_expect
)) {
1242 // The expected sequence number was received so setup the next one
1243 fromNode
->recv_expect
= ((seqNum
+1) & DGRAM_SEQNO_MASK
);
1245 MACHSTATE(5, "ProcessMessage() - Ammasso - ERROR: Received a message with a bad sequence number... ignoring...");
1246 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Received a message witha bad sequence number... ignoring...\n", CmiMyPe());
1250 //CmiUnlock(fromNode->recv_expect_lock);
1251 //MACHSTATE(3, "ProcessMessage() - INFO: Post-recv_expect_lock");
1253 newMsg
= fromNode
->asm_msg
;
1255 // Check to see if this is the start of the message (i.e. - if the message was
1256 // fragmented, if this is the first packet of the message) or the entire
1257 // message. Only the first packet's header information will be copied into the
1259 if (newMsg
== NULL
) {
1261 // Allocate memory to hold the new message
1262 size
= CmiMsgHeaderGetLength(msg
);
1263 newMsg
= (char*)CmiAlloc(size
);
1266 // Verify the message size
1268 MACHSTATE2(5, "ProcessMessage() - Ammasso - ERROR: Message size mismatch (size: %d != len: %d)", size
, len
);
1269 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Message size mismatch (size: %d != len: %d)\n", CmiMyPe(), size
, len
);
1270 CmiAbort("Message Size Mismatch");
1273 // Copy the message into the memory location and setup the fromNode structure accordingly
1274 //memcpy(newMsg, msg + DGRAM_HEADER_SIZE, size);
1275 memcpy(newMsg
, msg
, len
);
1276 fromNode
->asm_rank
= rank
;
1277 fromNode
->asm_total
= size
;
1278 fromNode
->asm_fill
= len
;
1279 fromNode
->asm_msg
= newMsg
;
1281 // Otherwise, this packet is a continuation of the overall message so append it to the last packet
1284 size
= len
- DGRAM_HEADER_SIZE
;
1286 // Make sure there is enough room in the asm_msg buffer (this should always be true because of the alloc in the true
1287 // portion of this if statement).
1288 if (fromNode
->asm_fill
+ size
> fromNode
->asm_total
) {
1289 MACHSTATE(5, "ProcessMessage() - Ammasso - ERROR: Message size mismatch");
1290 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Message size mismatch", CmiMyPe());
1291 CmiAbort("Message Size Mismatch");
1294 // Copy the message into the asm_msg buffer
1295 memcpy(newMsg
+ fromNode
->asm_fill
, msg
+ DGRAM_HEADER_SIZE
, size
);
1296 fromNode
->asm_fill
+= size
;
1299 MACHSTATE2(1, "ProcessMessage() - Ammasso - INFO: Message copied into asm_buf (asm_fill = %d, asm_total = %d)...", fromNode
->asm_fill
, fromNode
->asm_total
);
1301 // Check to see if a full packet has been received
1302 if (fromNode
->asm_fill
== fromNode
->asm_total
) {
1304 MACHSTATE(1, "ProcessMessage() - Ammasso - INFO: Pushing message...");
1306 // React to the message based on its rank
1309 case DGRAM_BROADCAST
:
1311 MACHSTATE1(3, "ProcessMessage() - Ammasso - INFO: Broadcast - _Cmi_mynodesize = %d", _Cmi_mynodesize
);
1313 // Make a copy of the message for all the PEs on this node except for zero, zero gets the original
1314 for (i
= 1; i
< _Cmi_mynodesize
; i
++)
1315 CmiPushPE(i
, CopyMsg(newMsg
, fromNode
->asm_total
));
1316 CmiPushPE(0, newMsg
);
1319 #if CMK_NODE_QUEUE_AVAILABLE
1320 case DGRAM_NODEBROADCAST
:
1321 case DGRAM_NODEMESSAGE
:
1322 CmiPushNode(newMsg
);
1327 CmiPushPE(rank
, newMsg
);
1330 MACHSTATE(1, "ProcessMessage() - Ammasso - INFO: NULLing asm_msg...");
1332 // Clear the message buffer
1333 fromNode
->asm_msg
= NULL
;
1336 MACHSTATE(1, "ProcessMessage() - Ammasso - INFO: Checking for re-broadcast");
1338 // If this packet is part of a broadcast, pass it on to the next nodes
1339 #if CMK_BROADCAST_SPANNING_TREE
1341 if (rank
== DGRAM_BROADCAST
1342 #if CMK_NODE_QUEUE_AVAILABLE
1343 || rank
== DGRAM_NODEBROADCAST
1345 ) SendSpanningChildren(NULL
, 0, len
, msg
, broot
, rank
);
1347 #elif CMK_BROADCAST_HYPERCUBE
1349 if (rank
== DGRAM_BROADCAST
1350 #if CMK_NODE_QUEUE_AVAILABLE
1351 || rank
== DGRAM_NODEBROADCAST
1354 MACHSTATE(3, "ProcessMessage() - INFO: Calling SendHypercube()...");
1355 SendHypercube(NULL
, 0, len
, msg
, broot
, rank
);
1359 AMMASSO_STATS_END(ProcessMessage
)
1363 // DMK : NOTE : I attempted to put this is a single function to be called by many places but it was getting
1364 // way too messy... I'll leave the code here for now in hopes of being able to do this in the future.
1365 /*************************************************
1366 // NOTE: Return non-zero if a message was received, zero otherwise
1367 int PollForMessage(cc_cq_handle_t cq) {
1372 cc_qp_query_attrs_t qpAttr;
1374 // Poll the Completion Queue for Work Completions
1375 rtn = cc_cq_poll(contextBlock->rnic, cq, &wc);
1377 // Just return 0 if there are no Work Completions
1378 if (rtn == CCERR_CQ_EMPTY) return 0;
1380 // Let the user know if there was an error
1382 MACHSTATE2(3, "PollForMessage() - Ammasso - ERROR: Unable to poll Completion Queue: %d, \"%s\"", rtn, cc_status_to_string(rtn));
1385 // Let the user know if there was an error
1386 if (wc.status != CC_OK) {
1387 MACHSTATE2(3, "PollForMessage() - Ammasso - ERROR: Work Completion Status: %d, \"%s\"", wc.status, cc_status_to_string(wc.status));
1390 // Depending on the WC.type, react accordingly
1391 switch (wc.wr_type) {
1393 // wc.wr_type == CC_WR_TYPE_RDMA_READ
1394 case CC_WR_TYPE_RDMA_READ:
1396 // DMK : TODO : Fill this in
1400 // wc.wr_type == CC_WR_TYPE_RDMA_WRITE
1401 case CC_WR_TYPE_RDMA_WRITE:
1403 // DMK : TODO : Fill this in
1407 // wc.wr_type == CC_WR_TYPE_SEND (Message was sent)
1408 case CC_WR_TYPE_SEND:
1410 // This will be called as a result of a cc_qp_post_sq()'s Send Work Request in DeliverViaNetwork finishing
1411 // Free the STag that was created
1412 rtn = cc_stag_dealloc(contextBlock->rnic, wc.stag);
1416 // wc.wr_type == CC_WR_TYPE_RECV (Message was received)
1417 case CC_WR_TYPE_RECV:
1419 // Check for CCERR_FLUSHED which means "we're shutting down" according to the example code
1420 if (wc.status == CCERR_FLUSHED)
1423 //// Make sure that node is defined
1424 //if (node == NULL) {
1425 // MACHSTATE(3, "PollForMessage() - WARNING: Received message but node == NULL... ignoring...");
1429 // HORRIBLE HACK!!! -- In the OtherNode structure, a myNode was placed directly after the rq_wr structure. This
1430 // was done so this function could tell from which node the received message came.
1431 fromNode = *((int*)((char*)wc.wr_id + sizeof(cc_rq_wr_t)));
1432 //fromNode = node->myNode;
1436 MACHSTATE1(3, "PollForMessage() - INFO: fromNode = %d", fromNode);
1437 MACHSTATE1(3, "PollForMessage() - INFO: wc.status = %d", wc.status);
1438 MACHSTATE1(3, "PollForMessage() - INFO: wc.bytes_rcvd = %d", wc.bytes_rcvd);
1439 MACHSTATE1(3, "PollForMessage() - INFO: wc.wr_id = %d", wc.wr_id);
1440 //MACHSTATE1(3, "PollForMessage() - INFO: &(nodes[0].rq_wr) = %p", &(nodes[0].rq_wr));
1441 //MACHSTATE1(3, "PollForMessage() - INFO: &(nodes[1].rq_wr) = %p", &(nodes[1].rq_wr));
1442 //MACHSTATE(3, "PollForMessage() - INFO: Raw Message Data:");
1443 //for (j = 0; j < wc.bytes_rcvd; j++) {
1444 // MACHSTATE1(3, " [%02x]", nodes[fromNode].recv_buf[j]);
1448 // Process the Message
1449 ProcessMessage(nodes[fromNode].recv_buf, wc.bytes_rcvd);
1451 // Re-Post the receive buffer
1453 rtn = cc_qp_post_rq(contextBlock->rnic, nodes[fromNode].qp, &(nodes[fromNode].rq_wr), 1, &tmp);
1455 // Let the user know what happened
1456 MACHSTATE2(3, "PollForMessage() - Ammasso - ERROR: Unable to Post Work Request to Queue Pair: %d, \"%s\"", rtn, cc_status_to_string(rtn));
1463 MACHSTATE1(3, "PollForMessage() - Ammasso - WARNING - Unknown WC.wr_type: %d", wc.wr_type);
1466 } // end switch (wc.wr_type)
1470 ****************************************************/
1473 static void CommunicationServer_nolock(int withDelayMs
) {
1477 MACHSTATE(2, "CommunicationServer_nolock start {");
1479 // DMK : TODO : In spare time (here), check for messages and/or completions and/or errors
1480 //while(PollForMessage(contextBlock->send_cq)); // Clear all the completed sends
1481 //while(PollForMessage(contextBlock->recv_cq)); // Keep looping while there are still messages that have been received
1483 for (i
= 0; i
< contextBlock
->numNodes
; i
++) {
1484 if (i
== contextBlock
->myNode
) continue;
1485 //CompletionEventHandlerWithAckFlag(contextBlock->rnic, nodes[i].recv_cq, &(nodes[i]), 1);
1486 //CompletionEventHandler(contextBlock->rnic, nodes[i].send_cq, &(nodes[i]));
1487 CheckRecvBufForMessage(&(nodes
[i
]));
1490 MACHSTATE(2, "}CommunicationServer_nolock end");
1494 /* CommunicationServer()
1496 * 0: from smp thread
1498 * 2: from worker thread
1500 static void CommunicationServer(int withDelayMs
, int where
) {
1502 //// Check to see if we are running in standalone mode... if so, do nothing
1503 //if (Cmi_charmrun_pid == 0)
1506 AMMASSO_STATS_START(CommunicationServer
)
1508 MACHSTATE2(2, "CommunicationServer(%d) from %d {",withDelayMs
, where
);
1510 // Check to see if this call is from an interrupt
1513 // Don't service charmrum if converse exits, this fixed a hang bug
1514 if (!machine_initiated_shutdown
)
1515 ServiceCharmrun_nolock();
1517 // Don't process any further for interrupts
1522 inProgress
[CmiMyRank()] += 1;
1523 CommunicationServer_nolock(withDelayMs
);
1525 inProgress
[CmiMyRank()] -= 1;
1527 #if CMK_IMMEDIATE_MSG
1529 CmiHandleImmediate();
1532 MACHSTATE(2,"} CommunicationServer");
1534 AMMASSO_STATS_END(CommunicationServer
)
1542 void CmiMachineExit(void) {
1548 MACHSTATE(2, "CmiMachineExit() - INFO: Called...");
1550 // DMK - This is a sleep to help keep the output from the stat displays below separated in the program output
1551 if (contextBlock
->myNode
)
1552 usleep(10000*contextBlock
->myNode
);
1554 AMMASSO_STATS_DISPLAY(MachineInit
)
1556 AMMASSO_STATS_DISPLAY(AmmassoDoIdle
)
1558 AMMASSO_STATS_DISPLAY(DeliverViaNetwork
)
1559 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_pre_lock
)
1560 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_lock
)
1561 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_post_lock
)
1562 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_send
)
1564 AMMASSO_STATS_DISPLAY(getQPSendBuffer
)
1565 AMMASSO_STATS_DISPLAY(getQPSendBuffer_lock
)
1566 AMMASSO_STATS_DISPLAY(getQPSendBuffer_CEH
)
1567 AMMASSO_STATS_DISPLAY(getQPSendBuffer_loop
)
1569 AMMASSO_STATS_DISPLAY(sendDataOnQP
)
1570 AMMASSO_STATS_DISPLAY(sendDataOnQP_pre_send
)
1571 AMMASSO_STATS_DISPLAY(sendDataOnQP_send
)
1572 AMMASSO_STATS_DISPLAY(sendDataOnQP_post_send
)
1574 AMMASSO_STATS_DISPLAY(sendDataOnQP_1024
)
1575 AMMASSO_STATS_DISPLAY(sendDataOnQP_2048
)
1576 AMMASSO_STATS_DISPLAY(sendDataOnQP_4096
)
1577 AMMASSO_STATS_DISPLAY(sendDataOnQP_16384
)
1578 AMMASSO_STATS_DISPLAY(sendDataOnQP_over
)
1580 AMMASSO_STATS_DISPLAY(AsynchronousEventHandler
)
1581 AMMASSO_STATS_DISPLAY(CompletionEventHandler
)
1582 AMMASSO_STATS_DISPLAY(ProcessMessage
)
1583 AMMASSO_STATS_DISPLAY(processAmmassoControlMessage
)
1585 AMMASSO_STATS_DISPLAY(sendAck
)
1587 // TODO: It would probably be a good idea to make a "closing connection" control message so all of the nodes
1588 // can agree to close the connections in a graceful way when they are all done. Similar to the READY packet
1589 // but for closing so the closing is graceful (and the other node does not try to reconnect). This barrier
1592 // Check to see if in stand-alone mode
1593 if (Cmi_charmrun_pid
!= 0 && contextBlock
->rnic
!= -1) {
1595 // Do Clean-up for each of the OtherNode structures (the connections and related data)
1596 for (i
= 0; i
< contextBlock
->numNodes
; i
++) {
1598 // Close all the connections and destroy all the QPs (and related data structures)
1599 closeQPConnection(nodes
+ i
, 1); // The program is closing so destroy the QPs
1601 // Destroy the locks
1602 CmiDestroyLock(nodes
[i
].sendBufLock
);
1603 CmiDestroyLock(nodes
[i
].send_next_lock
);
1604 CmiDestroyLock(nodes
[i
].recv_expect_lock
);
1607 // DMK : TODO : Clean up the buffer pool here (unregister the memory from the RNIC and free it)
1609 //// Close the RNIC interface
1610 rtn
= cc_rnic_close(contextBlock
->rnic
);
1612 MACHSTATE2(5, "CmiMachineExit() - ERROR: Unable to close the RNIC: %d, \"%s\"", rtn
, cc_status_to_string(rtn
));
1613 sprintf(buf
, "CmiMachineExit() - ERROR: Unable to close the RNIC: %d, \"%s\"", rtn
, cc_status_to_string(rtn
));
1617 MACHSTATE(2, "CmiMachineExit() - INFO: RNIC Closed.");
1623 OtherNode
getNodeFromQPId(cc_qp_id_t qp_id
) {
1627 // DMK : FIXME : This is a horrible linear search through the nodes table to find the node structure that has this
1628 // qp_id but currently this is the fastest way that I know how to do this... In the future to speed it up.
1629 for (i
= 0; i
< contextBlock
->numNodes
; i
++)
1630 if (nodes
[i
].qp_id
== qp_id
)
1636 OtherNode
getNodeFromQPHandle(cc_qp_handle_t qp
) {
1640 // DMK : FIXME : This is a horrible linear search through the nodes table to find the node structure that has this
1641 // qp_id but currently this is the fastest way that I know how to do this... In the future to speed it up.
1642 for (i
= 0; i
< contextBlock
->numNodes
; i
++)
1643 if (nodes
[i
].qp
== qp
)
1650 void AsynchronousEventHandler(cc_rnic_handle_t rnic
, cc_event_record_t
*er
, void *cb
) {
1654 cc_ep_handle_t connReqEP
;
1657 cc_qp_modify_attrs_t modAttrs
;
1658 AmmassoPrivateData
*priv
;
1659 AmmassoToken
*token
;
1661 AMMASSO_STATS_START(AsynchronousEventHandler
)
1663 MACHSTATE2(2, "AsynchronousEventHandler() - INFO: Called... event_id = %d, \"%s\"", er
->event_id
, cc_event_id_to_string(er
->event_id
));
1665 // Do a couple of checks... the reasons for these stem from some example code
1666 if (er
->rnic_handle
!= contextBlock
->rnic
) {
1667 MACHSTATE(5, "AsynchronousEventHandler() - WARNING: er->rnic_handle != contextBlock->rnic");
1669 if (er
->rnic_user_context
!= contextBlock
) {
1670 MACHSTATE(5, "AsynchronousEventHandler() - WARNING: er->rnic_user_context != contextBlock");
1673 // Based on the er->event_id, do something about it
1674 switch (er
->event_id
) {
1676 // er->event_id == CCAE_LLP_CLOSE_COMPLETE
1677 case CCAE_LLP_CLOSE_COMPLETE
:
1678 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Connection Closed.");
1680 // Get the OtherNode structure for the other node
1681 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: er->resource_indicator = %d (CC_RES_IND_QP: %d)", er
->resource_indicator
, CC_RES_IND_QP
);
1682 node
= getNodeFromQPId(er
->resource_id
.qp_id
);
1684 MACHSTATE(5, "AsynchronousEventHandler() - ERROR: Unable to find QP from QP ID... Unable to create/recover connection");
1688 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
1689 #if CMK_SHARED_VARS_UNAVAILABLE
1690 while (node
->sendBufLock
!= 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
1692 CmiLock(node
->sendBufLock
);
1694 // Set the state of the connection to closed
1695 node
->connectionState
= QP_CONN_STATE_CONNECTION_CLOSED
;
1697 CmiUnlock(node
->sendBufLock
);
1698 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
1702 // er->event_id == CCAE_CONNECTION_REQUEST
1703 case CCAE_CONNECTION_REQUEST
:
1705 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: Incomming Connection Request -> %s:%d",
1706 inet_ntoa(*(struct in_addr
*) &(er
->event_data
.connection_request
.laddr
)),
1707 ntohs(er
->event_data
.connection_request
.lport
)
1710 connReqEP
= er
->event_data
.connection_request
.cr_handle
;
1712 priv
= (AmmassoPrivateData
*)er
->event_data
.connection_request
.private_data
;
1714 nodeNumber
= priv
->node
;
1716 MACHSTATE1(3, "AsynchronousEventHandler() - INFO: Connection Request from node %d", nodeNumber
);
1718 if (nodeNumber
< 0 || nodeNumber
>= contextBlock
->numNodes
) {
1720 // Refuse the connection and log the rejection
1721 MACHSTATE1(1, "AsynchronousEventHandler() - WARNING: Unknown entity attempting to connect (node %d)... rejecting connection.", nodeNumber
);
1722 cc_cr_reject(contextBlock
->rnic
, connReqEP
);
1728 for (j
= 0; j
< 16; j
++) {
1729 MACHSTATE2(3, " private_data[%d] = %02X", j
, ((char*)er
->event_data
.connection_request
.private_data
)[j
]);
1733 // Grab the remote stag and to, by the protocol, all the tokens are
1734 // consecutive in memory
1735 for (i
=0; i
<(AMMASSO_INITIAL_BUFFERS
/(contextBlock
->numNodes
-1)); ++i
) {
1736 LIST_DEQUEUE(contextBlock
->,freeTokens
,token
);
1737 token
->wr
.wr_u
.rdma_write
.remote_stag
= priv
->stag
;
1738 token
->wr
.wr_u
.rdma_write
.remote_to
= priv
->to
+ (i
* sizeof(AmmassoBuffer
));
1739 token
->remoteBuf
= (AmmassoBuffer
*)(priv
->to
+ (i
* sizeof(AmmassoBuffer
)));
1740 LIST_ENQUEUE(nodes
[nodeNumber
].,sendTokens
,token
);
1743 nodes
[nodeNumber
].ack_sq_wr
->wr_u
.rdma_write
.remote_to
= priv
->ack_to
;
1744 nodes
[nodeNumber
].ack_sq_wr
->wr_u
.rdma_write
.remote_stag
= priv
->stag
;
1746 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: tokens starting from %p, stag = %d",priv
->to
,priv
->stag
);
1748 // Keep a copy of the end point handle
1749 nodes
[nodeNumber
].cr
= connReqEP
;
1751 // Accept the connection
1752 priv
= (AmmassoPrivateData
*)buf
;
1753 priv
->node
= contextBlock
->myNode
;
1754 priv
->stag
= nodes
[nodeNumber
].recv_buf
->stag
;
1755 priv
->to
= (cc_uint64_t
)nodes
[nodeNumber
].recv_buf
;
1756 priv
->ack_to
= (cc_uint64_t
)nodes
[nodeNumber
].directAck
;
1758 MACHSTATE1(1, " node = %d", priv
->node
);
1759 MACHSTATE1(1, " stag = %d", priv
->stag
);
1760 MACHSTATE1(1, " to = %p", priv
->to
);
1763 MACHSTATE2(3, " buf = %p, priv = %p", buf
, priv
);
1764 for (j
= 0; j
< 16; j
++) {
1765 MACHSTATE2(3, " ((char*)priv)[%d] = %02X", j
, ((char*)priv
)[j
]);
1769 MACHSTATE(1, "AsynchronousEventHandler() - Ammasso - INFO: Accepting Connection...");
1771 rtn
= cc_cr_accept(contextBlock
->rnic
, connReqEP
, nodes
[nodeNumber
].qp
, sizeof(AmmassoPrivateData
), (cc_uint8_t
*)priv
);
1774 // Let the user know what happened
1775 MACHSTATE1(3, "AsynchronousEventHandler() - Ammasso - WARNING: Unable to accept connection from node %d", nodeNumber
);
1779 MACHSTATE1(3, "AsynchronousEventHandler() - Ammasso - INFO: Accepted Connection from node %d", nodeNumber
);
1781 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
1782 #if CMK_SHARED_VARS_UNAVAILABLE
1783 while (nodes
[nodeNumber
].sendBufLock
!= 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
1785 CmiLock(nodes
[nodeNumber
].sendBufLock
);
1787 // Indicate that this connection has been made only if is it the first time the connection was made (don't count re-connects)
1788 if (nodes
[nodeNumber
].connectionState
== QP_CONN_STATE_PRE_CONNECT
)
1789 (contextBlock
->outstandingConnectionCount
)--;
1790 nodes
[nodeNumber
].connectionState
= QP_CONN_STATE_CONNECTED
;
1792 CmiUnlock(nodes
[nodeNumber
].sendBufLock
);
1793 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
1795 MACHSTATE1(1, "AsynchronousEventHandler() - Connected to node %d", nodes
[nodeNumber
].myNode
);
1801 // er->event_id == CCAE_ACTIVE_CONNECT_RESULTS
1802 case CCAE_ACTIVE_CONNECT_RESULTS
:
1803 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Connection Results");
1805 // Get the OtherNode structure for the other node
1806 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: er->resource_indicator = %d (CC_RES_IND_QP: %d)", er
->resource_indicator
, CC_RES_IND_QP
);
1807 node
= getNodeFromQPId(er
->resource_id
.qp_id
);
1809 MACHSTATE(5, "AsynchronousEventHandler() - ERROR: Unable to find QP from QP ID... Unable to create/recover connection");
1813 // Check to see if the connection was established or not
1814 if (er
->event_data
.active_connect_results
.status
!= CC_CONN_STATUS_SUCCESS
) {
1816 MACHSTATE(5, " Connection Failed.");
1817 MACHSTATE1(5, " - status: \"%s\"", cc_connect_status_to_string(er
->event_data
.active_connect_results
.status
));
1818 MACHSTATE1(5, " - private_data_length = %d", er
->event_data
.active_connect_results
.private_data_length
);
1819 displayQueueQuery(node
->qp
, &(node
->qp_attrs
));
1821 // Attempt to reconnect (try again... don't give up... you can do it!)
1822 reestablishQPConnection(node
);
1824 } else { // Connection was a success
1826 MACHSTATE(3, " Connection Success...");
1827 MACHSTATE2(1, " l -> %s:%d", inet_ntoa(*(struct in_addr
*) &(er
->event_data
.active_connect_results
.laddr
)), ntohs(er
->event_data
.active_connect_results
.lport
));
1828 MACHSTATE2(1, " r -> %s:%d", inet_ntoa(*(struct in_addr
*) &(er
->event_data
.active_connect_results
.raddr
)), ntohs(er
->event_data
.active_connect_results
.rport
));
1829 MACHSTATE4(1, " private_data_length = %d (%d, %d, %d)", er
->event_data
.active_connect_results
.private_data_length
, sizeof(int), sizeof(cc_stag_t
), sizeof(cc_uint64_t
));
1831 priv
= (AmmassoPrivateData
*)((char*)er
->event_data
.active_connect_results
.private_data
);
1834 MACHSTATE2(1, " private_data = %p, priv = %p", er
->event_data
.active_connect_results
.private_data
, priv
);
1835 for (j
= 0; j
< 16; j
++) {
1836 MACHSTATE3(1, " private_data[%d] = %02X (priv:%02X)", j
, ((char*)(er
->event_data
.active_connect_results
.private_data
))[j
], ((char*)priv
)[j
]);
1840 // Grab the remote stag and to, by the protocol, all the tokens are
1841 // consecutive in memory
1842 for (i
=0; i
<(AMMASSO_INITIAL_BUFFERS
/(contextBlock
->numNodes
-1)); ++i
) {
1843 LIST_DEQUEUE(contextBlock
->,freeTokens
,token
);
1844 token
->wr
.wr_u
.rdma_write
.remote_stag
= priv
->stag
;
1845 token
->wr
.wr_u
.rdma_write
.remote_to
= priv
->to
+ (i
* sizeof(AmmassoBuffer
));
1846 token
->remoteBuf
= (AmmassoBuffer
*)(priv
->to
+ (i
* sizeof(AmmassoBuffer
)));
1847 LIST_ENQUEUE(node
->,sendTokens
,token
);
1850 node
->ack_sq_wr
->wr_u
.rdma_write
.remote_to
= priv
->ack_to
;
1851 node
->ack_sq_wr
->wr_u
.rdma_write
.remote_stag
= priv
->stag
;
1853 MACHSTATE1(3, " node = %d", priv
->node
);
1854 MACHSTATE1(3, " stag = %d", priv
->stag
);
1855 MACHSTATE1(3, " to = %p", priv
->to
);
1857 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: tokens from %p, stag = %d",priv
->to
,priv
->stag
);
1859 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
1860 #if CMK_SHARED_VARS_UNAVAILABLE
1861 while (node
->sendBufLock
!= 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
1863 CmiLock(node
->sendBufLock
);
1865 // Indicate that this connection has been made only if it is the first time the connection was made (don't count re-connects)
1866 if (node
->connectionState
== QP_CONN_STATE_PRE_CONNECT
)
1867 (contextBlock
->outstandingConnectionCount
)--;
1868 node
->connectionState
= QP_CONN_STATE_CONNECTED
;
1870 CmiUnlock(node
->sendBufLock
);
1871 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
1873 MACHSTATE1(1, "AsynchronousEventHandler() - Connected to node %d", node
->myNode
);
1878 // er->event_id is set to a value that indicates the QP will transition into the TERMINATE state
1879 // Remotely Detected Errors
1880 case CCAE_TERMINATE_MESSAGE_RECEIVED
:
1882 case CCAE_LLP_SEGMENT_SIZE_INVALID
:
1883 case CCAE_LLP_INVALID_CRC
:
1884 case CCAE_LLP_BAD_FPDU
:
1885 // Remote Operation Errors
1886 case CCAE_INVALID_DDP_VERSION
:
1887 case CCAE_INVALID_RDMA_VERSION
:
1888 case CCAE_UNEXPECTED_OPCODE
:
1889 case CCAE_INVALID_DDP_QUEUE_NUMBER
:
1890 case CCAE_RDMA_READ_NOT_ENABLED
:
1892 // Remote Protection Errors (not associated with RQ)
1893 case CCAE_TAGGED_INVALID_STAG
:
1894 case CCAE_TAGGED_BASE_BOUNDS_VIOLATION
:
1895 case CCAE_TAGGED_ACCESS_RIGHTS_VIOLATION
:
1896 case CCAE_TAGGED_INVALID_PD
:
1897 case CCAE_WRAP_ERROR
:
1898 // Remote Closing Error
1899 case CCAE_BAD_LLP_CLOSE
:
1900 // Remote Protection Errors (associated with RQ)
1901 case CCAE_INVALID_MSN_RANGE
:
1902 case CCAE_INVALID_MSN_GAP
:
1903 // IRRQ Proection Errors
1904 case CCAE_IRRQ_INVALID_STAG
:
1905 case CCAE_IRRQ_BASE_BOUNDS_VIOLATION
:
1906 case CCAE_IRRQ_ACCESS_RIGHTS_VIOLATION
:
1907 case CCAE_IRRQ_INVALID_PD
:
1908 case CCAE_IRRQ_WRAP_ERROR
: // For these, the VERBS
1909 case CCAE_IRRQ_OVERFLOW
: // spec is 100% clear as to
1910 case CCAE_IRRQ_MSN_GAP
: // what to do... but I think
1911 case CCAE_IRRQ_MSN_RANGE
: // this is correct... DMK
1912 // Local Errors - ??? Not 100% sure about these (they are written differently in cc_ae.h than in the verbs spec... but I think they go here)
1913 case CCAE_CQ_SQ_COMPLETION_OVERFLOW
:
1914 case CCAE_CQ_RQ_COMPLETION_ERROR
:
1915 case CCAE_QP_SRQ_WQE_ERROR
:
1917 // er->event_id is set to a value that indicates the QP will transition into the ERROR state
1919 case CCAE_LLP_CONNECTION_LOST
:
1920 case CCAE_LLP_CONNECTION_RESET
:
1921 // Remote Closing Error
1922 case CCAE_BAD_CLOSE
:
1924 case CCAE_QP_LOCAL_CATASTROPHIC_ERROR
:
1925 MACHSTATE3(5, "AsynchronousEventHandler() - WARNING: Connection Error \"%s\" - er->resource_indicator = %d (CC_RES_IND_QP: %d)", cc_event_id_to_string(er
->event_id
), er
->resource_indicator
, CC_RES_IND_QP
);
1926 CmiPrintf("AsynchronousEventHandler() - WARNING: Connection Error \"%s\" - er->resource_indicator = %d (CC_RES_IND_QP: %d)\n", cc_event_id_to_string(er
->event_id
), er
->resource_indicator
, CC_RES_IND_QP
);
1928 // Figure out which QP went down
1929 node
= getNodeFromQPId(er
->resource_id
.qp_id
);
1931 MACHSTATE(5, "AsynchronousEventHandler() - ERROR: Unable to find QP from QP ID... Unable to recover connection");
1935 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
1936 #if CMK_SHARED_VARS_UNAVAILABLE
1937 while (node
->sendBufLock
!= 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
1939 CmiLock(node
->sendBufLock
);
1941 // Indicate that the connection was lost or will be lost in the very near future (depending on the er->event_id)
1942 node
->connectionState
= QP_CONN_STATE_CONNECTION_LOST
;
1944 CmiUnlock(node
->sendBufLock
);
1945 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
1947 MACHSTATE1(1, "AsynchronousEventHandler() - Connection ERROR Occured - node %d", node
->myNode
);
1948 displayQueueQuery(node
->qp
, &(node
->qp_attrs
));
1950 // Attempt to bring the connection back to life
1951 reestablishQPConnection(node
);
1955 // er->event_id == ???
1957 MACHSTATE1(5, "AsynchronousEventHandler() - WARNING - Unknown/Unexpected Asynchronous Event: er->event_id = %d", er
->event_id
);
1960 } // end switch (er->event_id)
1962 AMMASSO_STATS_END(AsynchronousEventHandler
)
1965 void CheckRecvBufForMessage(OtherNode node
) {
1969 AmmassoBuffer
*current
;
1971 MACHSTATE1(2, "CheckRecvBufForMessage() - INFO: Called... (node->recv_buf = %p)...", node
->recv_buf
);
1973 // Process all messages, identified by a length not zero
1974 while ((len
= node
->recv_buf
->tail
.length
) != 0) {
1976 MACHSTATE1(2, " (len = %d)...", len
);
1978 // Start by zero-ing out the length of the message so it isn't picked up again
1979 node
->recv_buf
->tail
.length
= 0;
1980 (*node
->remoteAck
) ++;
1981 node
->messagesNotYetAcknowledged
++;
1983 current
= node
->recv_buf
;
1984 // Move the recv_buf back at the end of the receiving queue. This works also
1985 // if the queue if formed by a single element
1986 node
->last_recv_buf
->next
= node
->recv_buf
;
1987 node
->last_recv_buf
= node
->recv_buf
;
1988 node
->recv_buf
= node
->recv_buf
->next
;
1989 node
->last_recv_buf
->next
= NULL
;
1991 // Process the messsage, NOTE that the message start is aligned to 8 bytes!
1992 needAck
= ProcessMessage(&(current
->buf
[AMMASSO_BUFSIZE
- ALIGN8(len
)]), len
, ¤t
->tail
, node
);
1994 // If an ACK is needed in response to this message, send one
1995 if (needAck
) sendAck(node
);
2001 void CompletionEventHandler(cc_rnic_handle_t rnic, cc_cq_handle_t cq, void *cb) {
2002 CompletionEventHandlerWithAckFlag(rnic, cq, cb, 0);
2005 void CompletionEventHandlerWithAckFlag(cc_rnic_handle_t rnic, cc_cq_handle_t cq, void *cb, int breakOnAck) {
2007 OtherNode node = (OtherNode)cb;
2015 AMMASSO_STATS_START(CompletionEventHandler)
2017 //MACHSTATE(3, "CompletionEventHandler() - Called...");
2019 //// Reset the request notification type
2020 //rtn = cc_cq_request_notification(contextBlock->rnic, cq, CC_CQ_NOTIFICATION_TYPE_NEXT);
2021 //if (rtn != CC_OK) {
2022 // // Let the user know what happened
2023 // MACHSTATE2(3, "CompletionEventHandler() - Ammasso - WARNING - Unable to reset CQ request notification type: %d, \"%s\"", rtn, cc_status_to_string(rtn));
2026 // Keep polling the Completion Queue until it is empty of Work Completions
2029 //if (PollForMessage(cq) == 0) break;
2031 // Poll the Completion Queue
2032 rtn = cc_cq_poll(contextBlock->rnic, cq, &wc);
2033 //if (rtn == CCERR_CQ_EMPTY) break;
2034 if (rtn != CC_OK) break;
2036 // Let the user know if there was an error
2037 if (wc.status != CC_OK) {
2038 MACHSTATE2(3, "CompletionEventHandler() - Ammasso - WARNING - WC status not CC_OK: %d, \"%s\"", wc.status, cc_status_to_string(wc.status));
2040 // DMK : TODO : Add code here that will recover from a Work Request Gone Wrong... i.e. - if a connection is lost, the
2041 // WRs that are still pending in the queues will complete in failure with a FLUSHED error status... these
2042 // WRs will have to be reposed after the QP is back into the IDLE state in reestablishQPConnection (or later).
2045 // Depending on the WC.type, react accordingly
2046 switch (wc.wr_type) {
2048 // wc.wr_type == CC_WR_TYPE_RDMA_READ
2049 case CC_WR_TYPE_RDMA_READ:
2050 // DMK : TODO : Fill this in
2053 // wc.wr_type == CC_WR_TYPE_RDMA_WRITE
2054 case CC_WR_TYPE_RDMA_WRITE:
2055 // DMK : TODO : Fill this in
2058 // wc.wr_type == CC_WR_TYPE_SEND
2059 case CC_WR_TYPE_SEND:
2061 // This will be called as a result of a cc_qp_post_sq()'s Send Work Request in DeliverViaNetwork finishing
2063 // DMK : NOTE : I left this here as an example (if message memory is registered with the RNIC on the fly
2064 // in DeliverViaNetwork(), the memory would have to be un-registered/un-pinned here).
2065 // Free the STag that was created
2066 //rtn = cc_stag_dealloc(contextBlock->rnic, wc.stag);
2068 // Unlock the send_buf so it can be used again
2069 // DMK : TODO : Make this a real lock
2070 //MACHSTATE1(3, "CompletionEventHandler() - INFO: Send completed... clearing sendBufLock for next send (%p)", node);
2071 //CmiUnlock(node->sendBufLock);
2073 MACHSTATE1(3, "CompletionEventHandler() - INFO: Send completed with node %d... still waiting for acknowledge...", node->myNode);
2077 // wc.wr_type == CC_WR_TYPE_RECV
2078 case CC_WR_TYPE_RECV:
2080 // Check for CCERR_FLUSHED which mean "we're shutting down" according to the example code
2081 if (wc.status == CCERR_FLUSHED) {
2082 displayQueueQuery(node->qp, &(node->qp_attrs));
2086 // Make sure that node is defined
2088 MACHSTATE(3, "CompletionEventHandler() - WARNING: Received message but node == NULL... ignoring...");
2092 displayQueueQuery(node->qp, &(node->qp_attrs));
2096 MACHSTATE1(3, "CompletionEventHandler() - INFO: fromNode = %d", node->myNode);
2097 MACHSTATE1(3, "CompletionEventHandler() - INFO: wc.status = %d", wc.status);
2098 MACHSTATE1(3, "CompletionEventHandler() - INFO: wc.bytes_rcvd = %d", wc.bytes_rcvd);
2099 MACHSTATE1(3, "CompletionEventHandler() - INFO: node = %p", node);
2100 //MACHSTATE1(3, "CompletionEventHandler() - INFO: &(nodes[0]) = %p", &(nodes[0]));
2101 //MACHSTATE1(3, "CompletionEventHandler() - INFO: &(nodes[1]) = %p", &(nodes[1]));
2102 //MACHSTATE1(3, "CompletionEventHandler() - INFO: nodes[0].recv_buf = %p", nodes[0].recv_buf);
2103 //MACHSTATE1(3, "CompletionEventHandler() - INFO: nodes[1].recv_buf = %p", nodes[1].recv_buf);
2104 //MACHSTATE(3, "CompletionEventHandler() - INFO: Raw Message Data:");
2105 //for (j = 0; j < DGRAM_HEADER_SIZE; j++) {
2106 // //MACHSTATE1(3, " [%02x]", node->recv_buf[j]);
2107 // MACHSTATE2(3, " [0:%02x, 1:%02x]", nodes[0].recv_buf[j], nodes[1].recv_buf[j]);
2111 // Get the address of the receive buffer used
2112 rq_wr = (cc_rq_wr_t*)wc.wr_id;
2114 // Process the Message
2115 ProcessMessage((char*)(rq_wr->local_sgl.sge_list[0].to), wc.bytes_rcvd, &needAck);
2116 //ProcessMessage(node->recv_buf, wc.bytes_rcvd);
2118 // Re-Post the receive buffer
2119 // DMK : NOTE : I'm doing this after processing the message because I'm not 100% sure if the RNIC is "smart" enought
2120 // to not reuse the buffer if still in this event handler.
2122 rtn = cc_qp_post_rq(contextBlock->rnic, node->qp, rq_wr, 1, &tmp);
2123 if (rtn != CC_OK || tmp != 1) {
2124 // Let the user know what happened
2125 MACHSTATE2(3, "CompletionEventHandler() - Ammasso - ERROR: Unable to Post Work Request to Queue Pair: %d, \"%s\"", rtn, cc_status_to_string(rtn));
2128 // Send and ACK to indicate this node is ready to receive another message
2132 // Check to see if the function should return (stop polling for messages)
2133 if (breakOnAck && (!needAck)) // NOTE: Should only not need an ACK if ACK arrived or error
2140 MACHSTATE1(3, "CompletionEventHandler() - Ammasso - WARNING - Unknown WC.wr_type: %d", wc.wr_type);
2143 } // end switch (wc.wr_type)
2146 AMMASSO_STATS_END(CompletionEventHandler)
2150 // NOTE: DMK: The code here follows from open_tcp_sockets() in machine-tcp.c.
2151 void CmiAmmassoOpenQueuePairs(void) {
2154 int i
, myNode
, numNodes
, keepWaiting
;
2156 cc_qp_create_attrs_t qpCreateAttrs
;
2158 cc_inet_addr_t address
;
2159 cc_inet_port_t port
;
2160 AmmassoBuffer
*sendBuffer
;
2161 AmmassoBuffer
*bufferScanner
;
2162 AmmassoToken
*newTokens
, *tokenScanner
;
2163 cc_data_addr_t
*newSgls
;
2164 cc_stag_index_t newStagIndex
;
2165 ammasso_ack_t
*ack_location
;
2168 MACHSTATE1(2, "CmiAmmassoOpenQueuePairs() - INFO: Called... (Cmi_charmrun_pid = %d)", Cmi_charmrun_pid
);
2170 // Check for stand-alone mode... no connections needed
2171 if (Cmi_charmrun_pid
== 0) return;
2173 if (nodes
== NULL
) {
2174 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - WARNING: nodes = NULL");
2177 MACHSTATE1(1, "CmiAmmassoOpenQueuePairs() - INFO: nodes = %p (remove this line)", nodes
);
2179 // DMK : FIXME : At this point, CmiMyNode() seems to be returning 0 on any node while _Cmi_mynode is
2180 // !!!!!!!!!!! returning the correct value. However, _Cmi_mynode and _Cmi_numnodes may not work with
2181 // !!!!!!!!!!! SMP. Resolve this issue and fix this code. For now, using _Cmi_mynode and _Cmi_numnodes.
2182 //myNode = CmiMyNode();
2183 //numNodes = CmiNumNodes();
2184 contextBlock
->myNode
= myNode
= _Cmi_mynode
;
2185 contextBlock
->numNodes
= numNodes
= _Cmi_numnodes
;
2186 contextBlock
->outstandingConnectionCount
= contextBlock
->numNodes
- 1; // No connection with self
2187 contextBlock
->nodeReadyCount
= contextBlock
->numNodes
- 1; // No ready packet from self
2188 contextBlock
->conditionRegistered
= 0;
2190 MACHSTATE2(1, "CmiAmmassoOpenQueuePairs() - INFO: myNode = %d, numNodes = %d", myNode
, numNodes
);
2192 CmiAssert(sizeof(AmmassoBuffer
) == (sizeof(AmmassoBuffer
)&(~63)));
2194 // Try to allocate the memory for the receiving buffers
2195 contextBlock
->freeRecvBuffers
= (AmmassoBuffer
*) CmiAlloc(AMMASSO_INITIAL_BUFFERS
*sizeof(AmmassoBuffer
) + (contextBlock
->numNodes
-1)*sizeof(ammasso_ack_t
));
2197 ack_location
= (ammasso_ack_t
*)&(contextBlock
->freeRecvBuffers
[AMMASSO_INITIAL_BUFFERS
]);
2198 if (contextBlock
->freeRecvBuffers
== NULL
) {
2200 // Attempt to close the RNIC
2201 cc_rnic_close(contextBlock
->rnic
);
2203 // Let the user know what happened and bail
2204 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for RECV buffers");
2205 sprintf(buf
, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for RECV buffers");
2209 contextBlock
->pinnedMemory
= AMMASSO_INITIAL_BUFFERS
*sizeof(AmmassoBuffer
) + (contextBlock
->numNodes
-1)*sizeof(ammasso_ack_t
);
2210 CC_CHECK(cc_nsmr_register_virt
,(contextBlock
->rnic
,
2211 CC_ADDR_TYPE_VA_BASED
,
2212 (cc_byte_t
*)contextBlock
->freeRecvBuffers
,
2213 AMMASSO_INITIAL_BUFFERS
*sizeof(AmmassoBuffer
) + (contextBlock
->numNodes
-1)*sizeof(ammasso_ack_t
),
2214 contextBlock
->pd_id
,
2216 CC_ACF_LOCAL_READ
| CC_ACF_LOCAL_WRITE
| CC_ACF_REMOTE_WRITE
,
2220 for (i
=0; i
<AMMASSO_INITIAL_BUFFERS
; ++i
) {
2221 contextBlock
->freeRecvBuffers
[i
].tail
.length
= 0;
2222 contextBlock
->freeRecvBuffers
[i
].next
= &(contextBlock
->freeRecvBuffers
[i
+1]);
2223 contextBlock
->freeRecvBuffers
[i
].stag
= newStagIndex
;
2225 contextBlock
->freeRecvBuffers
[AMMASSO_INITIAL_BUFFERS
-1].next
= NULL
;
2226 contextBlock
->last_freeRecvBuffers
= &contextBlock
->freeRecvBuffers
[AMMASSO_INITIAL_BUFFERS
-1];
2228 buffersPerNode
= AMMASSO_INITIAL_BUFFERS
/ (contextBlock
->numNodes
-1);
2230 // distribute all the buffers allocated to the different processors, together
2231 // the the buffer where to receive the directly sent ACK
2232 bufferScanner
= contextBlock
->freeRecvBuffers
;
2233 contextBlock
->freeRecvBuffers
= contextBlock
->freeRecvBuffers
[(contextBlock
->numNodes
-1)*buffersPerNode
-1].next
;
2234 contextBlock
->num_freeRecvBuffers
= AMMASSO_INITIAL_BUFFERS
- (contextBlock
->numNodes
-1)*buffersPerNode
;
2235 for (i
=0; i
<contextBlock
->numNodes
; ++i
) {
2236 if (i
== contextBlock
->myNode
) continue;
2237 nodes
[i
].num_recv_buf
= buffersPerNode
;
2238 nodes
[i
].recv_buf
= bufferScanner
;
2239 bufferScanner
[buffersPerNode
-1].next
= NULL
;
2240 nodes
[i
].last_recv_buf
= &(bufferScanner
[buffersPerNode
-1]);
2241 nodes
[i
].pending
= NULL
;
2242 bufferScanner
+= buffersPerNode
; // move forward of buffersPerNode buffers (of size sizeof(AmmassoBuffer))
2243 nodes
[i
].directAck
= ack_location
;
2247 // Try to allocate the memory for the sending buffers, together with the
2248 // buffers from where the direct ACK will be sent
2249 sendBuffer
= (AmmassoBuffer
*) CmiAlloc(AMMASSO_INITIAL_BUFFERS
*sizeof(AmmassoBuffer
) + (contextBlock
->numNodes
-1)*sizeof(ammasso_ack_t
));
2251 if (sendBuffer
== NULL
) {
2253 // Attempt to close the RNIC
2254 cc_rnic_close(contextBlock
->rnic
);
2256 // Let the user know what happened and bail
2257 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2258 sprintf(buf
, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2262 contextBlock
->pinnedMemory
+= AMMASSO_INITIAL_BUFFERS
*sizeof(AmmassoBuffer
) + (contextBlock
->numNodes
-1)*sizeof(ammasso_ack_t
);
2263 CC_CHECK(cc_nsmr_register_virt
,(contextBlock
->rnic
,
2264 CC_ADDR_TYPE_VA_BASED
,
2265 (cc_byte_t
*)sendBuffer
,
2266 AMMASSO_INITIAL_BUFFERS
*sizeof(AmmassoBuffer
) + (contextBlock
->numNodes
-1)*sizeof(ammasso_ack_t
),
2267 contextBlock
->pd_id
,
2269 CC_ACF_LOCAL_READ
| CC_ACF_LOCAL_WRITE
,
2273 contextBlock
->freeTokens
= NULL
;
2274 contextBlock
->num_freeTokens
= 0;
2276 // Allocate the send tokens, together with the tokens for the ACK buffers
2277 newTokens
= (AmmassoToken
*) CmiAlloc((AMMASSO_INITIAL_BUFFERS
+contextBlock
->numNodes
-1)*ALIGN8(sizeof(AmmassoToken
)));
2279 if (newTokens
== NULL
) {
2281 // Attempt to close the RNIC
2282 cc_rnic_close(contextBlock
->rnic
);
2284 // Let the user know what happened and bail
2285 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2286 sprintf(buf
, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2290 newSgls
= (cc_data_addr_t
*) CmiAlloc((AMMASSO_INITIAL_BUFFERS
+contextBlock
->numNodes
-1)*ALIGN8(sizeof(cc_data_addr_t
)));
2292 if (newSgls
== NULL
) {
2294 // Attempt to close the RNIC
2295 cc_rnic_close(contextBlock
->rnic
);
2297 // Let the user know what happened and bail
2298 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2299 sprintf(buf
, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2303 contextBlock
->num_freeTokens
= 0;
2304 contextBlock
->last_freeTokens
= NULL
;
2305 contextBlock
->freeTokens
= NULL
;
2306 tokenScanner
= newTokens
;
2307 for (i
=0; i
<AMMASSO_INITIAL_BUFFERS
; ++i
) {
2308 newSgls
->stag
= newStagIndex
;
2309 newSgls
->length
= AMMASSO_BUFSIZE
+ sizeof(Tailer
);
2310 newSgls
->to
= (unsigned long)&(sendBuffer
[i
]);
2311 tokenScanner
->wr
.wr_id
= (unsigned long)tokenScanner
;
2312 tokenScanner
->wr
.wr_type
= CC_WR_TYPE_RDMA_WRITE
;
2313 tokenScanner
->wr
.wr_u
.rdma_write
.local_sgl
.sge_count
= 1;
2314 tokenScanner
->wr
.wr_u
.rdma_write
.local_sgl
.sge_list
= newSgls
;
2315 tokenScanner
->wr
.signaled
= 1;
2316 tokenScanner
->localBuf
= (AmmassoBuffer
*)&(sendBuffer
[i
]);
2317 LIST_ENQUEUE(contextBlock
->,freeTokens
,tokenScanner
);
2318 newSgls
= (cc_data_addr_t
*)(((char*)newSgls
)+ALIGN8(sizeof(cc_data_addr_t
)));
2319 tokenScanner
= (AmmassoToken
*)(((char*)tokenScanner
)+ALIGN8(sizeof(AmmassoToken
)));
2322 /* At this point, newSgls, tokenScanner, and ack_location point to the first
2323 element to be used for the ACK buffers */
2325 // Setup the ack_sq_wr for all nodes
2326 for (i
=0; i
<contextBlock
->numNodes
; ++i
) {
2327 if (i
== contextBlock
->myNode
) continue;
2328 newSgls
->stag
= newStagIndex
;
2329 newSgls
->length
= sizeof(ammasso_ack_t
);
2330 newSgls
->to
= (unsigned long)ack_location
;
2331 nodes
[i
].remoteAck
= ack_location
;
2332 tokenScanner
->wr
.wr_id
= (unsigned long)tokenScanner
;
2333 tokenScanner
->wr
.wr_type
= CC_WR_TYPE_RDMA_WRITE
;
2334 tokenScanner
->wr
.wr_u
.rdma_write
.local_sgl
.sge_count
= 1;
2335 tokenScanner
->wr
.wr_u
.rdma_write
.local_sgl
.sge_list
= newSgls
;
2336 tokenScanner
->wr
.signaled
= 1;
2337 nodes
[i
].ack_sq_wr
= &tokenScanner
->wr
;
2338 newSgls
= (cc_data_addr_t
*)(((char*)newSgls
)+ALIGN8(sizeof(cc_data_addr_t
)));
2339 tokenScanner
= (AmmassoToken
*)(((char*)tokenScanner
)+ALIGN8(sizeof(AmmassoToken
)));
2343 // Loop through all of the PEs
2344 // Begin setting up the Queue Pairs (common code for both "server" and "client")
2345 // For nodes with a lower PE, set up a "server" connection (accept)
2346 // For nodes with a higher PE, connect as "client" (connect)
2348 // Loop through all the nodes in the setup
2349 for (i
= 0; i
< numNodes
; i
++) {
2351 // Setup any members of this nodes OtherNode structure that need setting up
2352 nodes
[i
].myNode
= i
;
2353 nodes
[i
].connectionState
= QP_CONN_STATE_PRE_CONNECT
;
2354 nodes
[i
].messagesNotYetAcknowledged
= 0;
2355 nodes
[i
].usedTokens
= NULL
;
2356 nodes
[i
].last_usedTokens
= NULL
;
2357 nodes
[i
].num_usedTokens
= 0;
2358 nodes
[i
].localAck
= 0;
2359 nodes
[i
].sendBufLock
= CmiCreateLock();
2360 nodes
[i
].send_next_lock
= CmiCreateLock();
2361 nodes
[i
].recv_expect_lock
= CmiCreateLock();
2362 nodes
[i
].max_used_tokens
= 0;
2363 //nodes[i].send_UseIndex = 0;
2364 //nodes[i].send_InUseCounter = 0;
2365 //nodes[i].recv_UseIndex = 0;
2367 // If you walk around talking to yourself people will look at you all funny-like. Try not to do that.
2368 if (i
== myNode
) continue;
2370 *nodes
[i
].remoteAck
= 0;
2372 // Establish the Connection
2373 establishQPConnection(nodes
+ i
, 0); // Don't reuse the QP (there isn't one yet)
2375 } // end for (i < numNodes)
2378 // Need to block here until all the connections for this node are made
2379 MACHSTATE(2, "CmiAmmassoOpenQueuePairs() - INFO: Waiting for all connections to be established...");
2380 while (contextBlock
->outstandingConnectionCount
> 0) {
2384 for (i
= 0; i
< contextBlock
->numNodes
; i
++) {
2385 if (i
== contextBlock
->myNode
) continue;
2386 //CompletionEventHandler(contextBlock->rnic, nodes[i].recv_cq, &(nodes[i]));
2387 //CompletionEventHandler(contextBlock->rnic, nodes[i].send_cq, &(nodes[i]));
2388 CheckRecvBufForMessage(&(nodes
[i
]));
2391 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: All Connections have been established... Continuing");
2393 // Pause a little so both ends of the connection have time to receive and process the asynchronous events
2394 //usleep(800000); // 800ms
2397 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: Sending ready to all neighboors...");
2399 // Send all the ready packets
2400 for (i
= 0; i
< numNodes
; i
++) {
2404 if (i
== myNode
) continue; // Skip self
2406 MACHSTATE1(1, "CmiAmmassoOpenQueuePairs() - INFO: Sending READY to node %d", i
);
2408 // Send a READY control message to the node, give a non-null length
2409 sendDataOnQP(buf
, 1, &(nodes
[i
]), AMMASSO_READY
);
2412 // Set the sendBufLock
2413 CmiLock(nodes[i].sendBufLock);
2415 // Setup the message
2416 *( (int*)(nodes[i].send_buf )) = Cmi_charmrun_pid; // Send the charmrun PID
2417 *((short*)(nodes[i].send_buf + 4)) = myNode; // Send this node's number
2420 nodes[i].send_sgl.length = 6; // DMK : TODO : FIXME : Change this later when multiple buffers are supported
2421 rtn = cc_qp_post_sq(contextBlock->rnic, nodes[i].qp, &(nodes[i].sq_wr), 1, &tmp);
2422 if (rtn != CC_OK || tmp != 1) {
2423 // Free the sendBufLock
2424 CmiUnlock(nodes[i].sendBufLock);
2426 // Let the user know what happened
2427 MACHSTATE1(3, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to send READY packet to node %d.", i);
2431 // Note: The code in the completion event handler will unlock sendBufLock again after the packet has actually been sent
2434 // DMK : NOTE : I don't think this is really needed... (leaving for now just incase I find out it really is)
2435 //// Wait until all the ready packets have been sent
2436 //while (keepWaiting) {
2439 // // Assume we won't find a lock that is set
2442 // // Check all the locks, if one is set, sleep and try again
2443 // for (j = 0; i < numNodes; j++)
2444 // if (nodes[j].sendBufLock) {
2446 // usleep(10000); // sleep 10ms
2451 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: All ready packets sent to neighboors...");
2453 // Need to block here until all of the ready packets have been received
2454 // NOTE : Because this is a fully connection graph of connections between the nodes, this will block all the nodes
2455 // until all the nodes are ready (and all the PEs since there is a node barrier in the run pe function that
2456 // all the threads execute... the thread executing this is one of those so it has to reach that node barrier
2457 // before any of the other can start doing much of anything).
2458 MACHSTATE(2, "CmiAmmassoOpenQueuePairs() - INFO: Waiting for all neighboors to be ready...");
2459 while (contextBlock
->nodeReadyCount
> 0) {
2460 usleep(10000); // Sleep 10ms
2462 for (i
= 0; i
< contextBlock
->numNodes
; i
++) {
2463 if (i
== contextBlock
->myNode
) continue;
2464 //CompletionEventHandler(contextBlock->rnic, nodes[i].recv_cq, &(nodes[i]));
2465 //CompletionEventHandler(contextBlock->rnic, nodes[i].send_cq, &(nodes[i]));
2466 CheckRecvBufForMessage(&(nodes
[i
]));
2469 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: All neighboors ready...");
2471 MACHSTATE(2, "CmiAmmassoOpenQueuePairs() - INFO: Finished.");
2476 // NOTE: When reestablishing a connection, the QP can be reused so don't recreate a new one (reuseQPFlag = 1).
2477 // When openning the connection for the first time, there is no QP so create one (reuseQPFlag = 0).
2478 // DMK : TODO : Fix the comment and parameter (I've been playing with what reuseQPFlag actually does and got
2479 // tired of updating comments)... update the comment when this is finished).
2480 void establishQPConnection(OtherNode node
, int reuseQPFlag
) {
2482 cc_qp_create_attrs_t qpCreateAttrs
;
2485 cc_uint32_t numWRsPosted
;
2487 MACHSTATE1(2, "establishQPConnection() - INFO: Called for node %d...", node
->myNode
);
2489 ///// Shared "Client" and "Server" Code /////
2491 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-RECV-CQ-CREATE)");
2493 // Create the Completion Queue, just create a fake one since with rdma writes it is not used
2494 node
->recv_cq_depth
= 1;
2495 CC_CHECK(cc_cq_create
,(contextBlock
->rnic
, &(node
->recv_cq_depth
), contextBlock
->eh_id
, node
, &(node
->recv_cq
)));
2497 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-SEND-CQ-CREATE)");
2499 // Create the Completion Queue
2500 //node->send_cq_depth = AMMASSO_NUMMSGBUFS_PER_QP * 4;
2501 node
->send_cq_depth
= AMMASSO_BUFFERS_INFLY
;
2502 CC_CHECK(cc_cq_create
,(contextBlock
->rnic
, &(node
->send_cq_depth
), contextBlock
->eh_id
, node
, &(node
->send_cq
)));
2504 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-QP-CREATE)");
2506 // Create the Queue Pair
2507 // Set some initial Create Queue Pair Attributes that will be reused for all Queue Pairs Created
2508 qpCreateAttrs
.sq_cq
= node
->send_cq
; // Set the Send Queue's Completion Queue
2509 qpCreateAttrs
.rq_cq
= node
->recv_cq
; // Set the Request Queue's Completion Queue
2510 qpCreateAttrs
.sq_depth
= node
->send_cq_depth
;
2511 qpCreateAttrs
.rq_depth
= node
->recv_cq_depth
;
2512 qpCreateAttrs
.srq
= 0;
2513 qpCreateAttrs
.rdma_read_enabled
= 1;
2514 qpCreateAttrs
.rdma_write_enabled
= 1;
2515 qpCreateAttrs
.rdma_read_response_enabled
= 1;
2516 qpCreateAttrs
.mw_bind_enabled
= 0;
2517 qpCreateAttrs
.zero_stag_enabled
= 0;
2518 qpCreateAttrs
.send_sgl_depth
= 1;
2519 qpCreateAttrs
.recv_sgl_depth
= 1;
2520 qpCreateAttrs
.rdma_write_sgl_depth
= 1;
2521 qpCreateAttrs
.ord
= 1;
2522 qpCreateAttrs
.ird
= 1;
2523 qpCreateAttrs
.pdid
= contextBlock
->pd_id
; // Set the Protection Domain
2524 qpCreateAttrs
.user_context
= node
; // Set the User Context Block that will be passed into function calls
2526 CC_CHECK(cc_qp_create
,(contextBlock
->rnic
, &qpCreateAttrs
, &(node
->qp
), &(node
->qp_id
)));
2528 // Since the QP was just created (or re-created), reset the sequence number and any other variables that need reseting
2529 //node->send_InUseCounter = 0;
2530 //node->send_UseIndex = 0;
2531 node
->sendBufLock
= 0;
2532 node
->send_next
= 0;
2533 node
->send_next_lock
= CmiCreateLock();
2534 node
->recv_expect
= 0;
2535 node
->recv_expect_lock
= CmiCreateLock();
2536 node
->recv_UseIndex
= 0;
2540 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-NSMR-REGISTER-VIRT QP-QUERY-ATTRS)");
2542 // Attempt to register the qp_attrs member of the OtherNode structure with the RNIC so the Queue Pair's state can be queried
2543 contextBlock
->pinnedMemory
+= sizeof(cc_qp_query_attrs_t
);
2544 CC_CHECK(cc_nsmr_register_virt
,(contextBlock
->rnic
,
2545 CC_ADDR_TYPE_VA_BASED
,
2546 (cc_byte_t
*)(&(node
->qp_attrs
)),
2547 sizeof(cc_qp_query_attrs_t
),
2548 contextBlock
->pd_id
,
2550 CC_ACF_LOCAL_READ
| CC_ACF_LOCAL_WRITE
,
2551 &(node
->qp_attrs_stag_index
))
2555 ///// "Server" Specific /////
2556 if (node
->myNode
< contextBlock
->myNode
) {
2562 MACHSTATE(1, "establishQPConnection() - INFO: Starting \"Server\" Code...");
2564 // Setup the address
2565 CC_CHECK(cc_rnic_getconfig
,(contextBlock
->rnic
, CC_GETCONFIG_ADDRS
, &count
, &value
));
2567 //MACHSTATE1(3, "establishQPConnection() - count = %4d", count);
2568 //for (j = 0; j < count; j++) {
2569 // MACHSTATE2(3, "establishQPConnection() - value[%d] = %4d", j, (int)value[j]);
2572 // Setup the Address
2573 // DMK : TODO : FIXME : Fix this code so that it handles host-network/big-little endian ordering
2574 *(((char*)&(node
->address
)) + 0) = value
[0];
2575 *(((char*)&(node
->address
)) + 1) = value
[1];
2576 *(((char*)&(node
->address
)) + 2) = value
[2];
2577 *(((char*)&(node
->address
)) + 3) = value
[3];
2580 node
->port
= htons(AMMASSO_PORT
+ node
->myNode
);
2582 MACHSTATE4(1, "establishQPConnection() - Using Address (Hex) 0x%02X 0x%02X 0x%02X 0x%02X", ((node
->address
>> 24) & 0xFF), ((node
->address
>> 16) & 0xFF), ((node
->address
>> 8) & 0xFF), (node
->address
& 0xFF));
2583 MACHSTATE4(1, " (Dec) %4d %4d %4d %4d", ((node
->address
>> 24) & 0xFF), ((node
->address
>> 16) & 0xFF), ((node
->address
>> 8) & 0xFF), (node
->address
& 0xFF));
2584 MACHSTATE2(1, " Port (Hex) 0x%02X 0x%02X", ((node
->port
>> 8) & 0xFF), (node
->port
& 0xFF));
2586 /* Listen for an incomming connection (NOTE: This call will return
2587 immediately; when a connection attempt is made by a "client", the
2588 asynchronous handler will be called.) */
2589 CC_CHECK(cc_ep_listen_create
,(contextBlock
->rnic
, node
->address
, &(node
->port
), 3, contextBlock
, &(node
->ep
)));
2591 MACHSTATE(1, "establishQPConnection() - Listening...");
2595 ///// "Client" Specific /////
2596 if (node
->myNode
> contextBlock
->myNode
) {
2598 AmmassoPrivateData priv
;
2600 // A one-time sleep that should give the passive side QPs time to post the listens before the active sides start trying to connect
2601 if (node
->myNode
== contextBlock
->myNode
+ 1 || reuseQPFlag
) // Only do once if all the connections are being made for the first time, do this for all
2602 usleep(400000); // Sleep 400ms // connections if reconnecting so the other RNIC has time to setup the listen
2604 MACHSTATE(1, "establishQPConnection() - INFO: Starting \"Client\" Code...");
2606 // Setup the Address
2607 // DMK : TODO : FIXME : Fix this code so that it handles host-network/big-little endian ordering
2608 *(((char*)&(node
->address
)) + 0) = *(((char*)&(node
->addr
.sin_addr
.s_addr
)) + 0);
2609 *(((char*)&(node
->address
)) + 1) = *(((char*)&(node
->addr
.sin_addr
.s_addr
)) + 1);
2610 *(((char*)&(node
->address
)) + 2) = *(((char*)&(node
->addr
.sin_addr
.s_addr
)) + 2);
2611 *(((char*)&(node
->address
)) + 3) = (*(((char*)&(node
->addr
.sin_addr
.s_addr
)) + 3)) - 1;
2614 node
->port
= htons(AMMASSO_PORT
+ contextBlock
->myNode
);
2616 MACHSTATE4(1, "establishQPConnection() - Using Address (Hex) 0x%02X 0x%02X 0x%02X 0x%02X", ((node
->address
>> 24) & 0xFF), ((node
->address
>> 16) & 0xFF), ((node
->address
>> 8) & 0xFF), (node
->address
& 0xFF));
2617 MACHSTATE4(1, " (Dec) %4d %4d %4d %4d", ((node
->address
>> 24) & 0xFF), ((node
->address
>> 16) & 0xFF), ((node
->address
>> 8) & 0xFF), (node
->address
& 0xFF));
2618 MACHSTATE2(1, " Port (Hex) 0x%02X 0x%02X", ((node
->port
>> 8) & 0xFF), (node
->port
& 0xFF));
2620 /* Attempt to make a connection to a "server" (NOTE: This call will return
2621 immediately; when the connection to the "server" is established, the
2622 asynchronous handler will be called.) */
2624 // by allocation protocol, the stags will be the same
2625 priv
.node
= contextBlock
->myNode
;
2626 priv
.stag
= node
->recv_buf
->stag
;
2627 priv
.to
= (cc_uint64_t
)node
->recv_buf
;
2628 priv
.ack_to
= (cc_uint64_t
)node
->directAck
;
2630 CC_CHECK(cc_qp_connect
,(contextBlock
->rnic
, node
->qp
, node
->address
, node
->port
, sizeof(AmmassoPrivateData
), (cc_uint8_t
*)&priv
));
2636 // NOTE: This will be called in the event of a connection error
2637 void reestablishQPConnection(OtherNode node
) {
2641 cc_qp_modify_attrs_t modAttrs
;
2644 MACHSTATE1(2, "reestablishQPConnection() - INFO: For node %d: Clearing Outstanding WRs...", node
->myNode
);
2646 // Drain the RECV completion Queue (if a connection is lost, all pending Work Requests are completed with a FLUSHED error)
2648 // Pool for a message
2649 rtn
= cc_cq_poll(contextBlock
->rnic
, node
->recv_cq
, &wc
);
2650 if (rtn
== CCERR_CQ_EMPTY
) break;
2652 // DMK : TODO : FIXME : Something should be done with the WRs that are pulled off so they can be reissued
2655 // Drain the SEND completion Queue (if a connection is lost, all pending Work Requests are completed with a FLUSHED error)
2657 // Pool for a message
2658 rtn
= cc_cq_poll(contextBlock
->rnic
, node
->send_cq
, &wc
);
2659 if (rtn
== CCERR_CQ_EMPTY
) break;
2661 // DMK : TODO : FIXME : Something should be done with the WRs that are pulled off so they can be reissued
2664 MACHSTATE1(1, "reestablishQPConnection() - INFO: For node %d: Waiting for QP to enter ERROR state...", node
->myNode
);
2668 // Query the QP's state
2669 rtn
= cc_qp_query(contextBlock
->rnic
, node
->qp
, &(node
->qp_attrs
));
2671 MACHSTATE2(5, "AsynchronousEventHandler() - ERROR: Unable to Query Queue Pair (l): %d, \"%s\"", rtn
, cc_status_to_string(rtn
));
2675 // Check to see if the state is ERROR, if so, break from the loop... otherwise, keep waiting, it will be soon
2676 if (node
->qp_attrs
.qp_state
== CC_QP_STATE_ERROR
)
2679 usleep(1000); // 1ms
2683 MACHSTATE2(1, "reestablishQPConnection() - INFO: Finished waiting node %d: QP state = \"%s\"...", node
->myNode
, cc_qp_state_to_string(node
->qp_attrs
.qp_state
));
2684 MACHSTATE1(1, "reestablishQPConnection() - INFO: Attempting to transition QP into IDLE state for node %d", node
->myNode
);
2686 // Transition the Queue Pair from ERROR into IDLE state
2687 modAttrs
.llp_ep
= node
->ep
;
2688 modAttrs
.next_qp_state
= CC_QP_STATE_IDLE
;
2689 modAttrs
.ord
= CC_QP_NO_ATTR_CHANGE
;
2690 modAttrs
.ird
= CC_QP_NO_ATTR_CHANGE
;
2691 modAttrs
.sq_depth
= CC_QP_NO_ATTR_CHANGE
;
2692 modAttrs
.rq_depth
= CC_QP_NO_ATTR_CHANGE
;
2693 modAttrs
.stream_message_buffer
= NULL
;
2694 modAttrs
.stream_message_length
= 0;
2695 rtn
= cc_qp_modify(contextBlock
->rnic
, node
->qp
, &modAttrs
);
2697 // Let the user know what happened
2698 MACHSTATE2(5, "reestablishQPConnection() - ERROR: Unable to Modify QP State: %d, \"%s\"", rtn
, cc_status_to_string(rtn
));
2701 rtn
= cc_qp_query(contextBlock
->rnic
, node
->qp
, &(node
->qp_attrs
));
2703 MACHSTATE2(5, "reestablishQPConnection() - ERROR: Unable to Query Queue Pair (1): %d, \"%s\"", rtn
, cc_status_to_string(rtn
));
2705 MACHSTATE2(1, "reestablishQPConnection() - INFO: Transition results for node %d: QP state = \"%s\"...", node
->myNode
, cc_qp_state_to_string(node
->qp_attrs
.qp_state
));
2707 closeQPConnection(node
, 0); // Close the connection but do not destroy the QP
2708 establishQPConnection(node
, 1); // Reopen the connection and reuse the QP that has already been created
2712 // NOTE: When reestablishing a connection, the QP can be reused so don't destroy it and create a new one (destroyQPFlag = 0).
2713 // When closing the connection because the application is terminating, destroy the QP (destroyQPFlat != 0).
2714 // DMK : TODO : Fix the comment and parameter (I've been playing with what destroyQPFlag actually does and got
2715 // tired of updating comments)... update the comment when this is finished).
2716 void closeQPConnection(OtherNode node
, int destroyQPFlag
) {
2718 MACHSTATE(2, "closeQPConnection() - INFO: Called...");
2721 // Close the Completion Queues
2722 cc_qp_destroy(contextBlock->rnic, node->qp);
2723 cc_cq_destroy(contextBlock->rnic, node->send_cq);
2724 cc_cq_destroy(contextBlock->rnic, node->recv_cq);
2726 if (destroyQPFlag) {
2727 // De-Register Memory with the RNIC
2728 cc_stag_dealloc(contextBlock->rnic, node->qp_attrs_stag_index);
2729 cc_stag_dealloc(contextBlock->rnic, node->recv_stag_index);
2730 cc_stag_dealloc(contextBlock->rnic, node->send_stag_index);
2731 //cc_stag_dealloc(contextBlock->rnic, node->rdma_stag_index);
2733 CmiFree(node->recv_buf);
2734 CmiFree(node->rq_wr);
2735 CmiFree(node->recv_sgl);
2737 CmiFree(node->send_buf);
2738 CmiFree(node->sq_wr);
2739 CmiFree(node->send_sgl);
2740 //CmiFree(node->send_bufFree);
2746 char* cc_status_to_string(cc_status_t errorCode
) {
2748 switch (errorCode
) {
2750 case CC_OK
: return "OK";
2751 case CCERR_INSUFFICIENT_RESOURCES
: return "Insufficient Resources";
2752 case CCERR_INVALID_MODIFIER
: return "Invalid Modifier";
2753 case CCERR_INVALID_MODE
: return "Invalid Mode";
2754 case CCERR_IN_USE
: return "In Use";
2755 case CCERR_INVALID_RNIC
: return "Invalid RNIC";
2756 case CCERR_INTERRUPTED_OPERATION
: return "Interrupted Operation";
2757 case CCERR_INVALID_EH
: return "Invalid EH";
2758 case CCERR_INVALID_CQ
: return "Invalid CQ";
2759 case CCERR_CQ_EMPTY
: return "CQ Empty";
2760 case CCERR_NOT_IMPLEMENTED
: return "Not Implemented";
2761 case CCERR_CQ_DEPTH_TOO_SMALL
: return "CQ Depth Too Small";
2762 case CCERR_PD_IN_USE
: return "PD In Use";
2763 case CCERR_INVALID_PD
: return "Invalid PD";
2764 case CCERR_INVALID_SRQ
: return "Invalid SRQ";
2765 case CCERR_INVALID_ADDRESS
: return "Invalid Address";
2766 case CCERR_INVALID_NETMASK
: return "Invalid Netmask";
2767 case CCERR_INVALID_QP
: return "Invalid QP";
2768 case CCERR_INVALID_QP_STATE
: return "Invalid QP State";
2769 case CCERR_TOO_MANY_WRS_POSTED
: return "Too Many WRs Posted";
2770 case CCERR_INVALID_WR_TYPE
: return "Invalid WR Type";
2771 case CCERR_INVALID_SGL_LENGTH
: return "Invalid SGL Length";
2772 case CCERR_INVALID_SQ_DEPTH
: return "Invalid SQ Depth";
2773 case CCERR_INVALID_RQ_DEPTH
: return "Invalid RQ Depth";
2774 case CCERR_INVALID_ORD
: return "Invalid ORD";
2775 case CCERR_INVALID_IRD
: return "Invalid IRD";
2776 case CCERR_QP_ATTR_CANNOT_CHANGE
: return "QP_ATTR_CANNON_CHANGE";
2777 case CCERR_INVALID_STAG
: return "Invalid STag";
2778 case CCERR_QP_IN_USE
: return "QP In Use";
2779 case CCERR_OUTSTANDING_WRS
: return "Outstanding WRs";
2780 // case CCERR_MR_IN_USE: NOTE : "CCERR_MR_IN_USE = CCERR_STAG_IN_USE" in "cc_status.h"
2781 case CCERR_STAG_IN_USE
: return "STag In Use";
2782 case CCERR_INVALID_STAG_INDEX
: return "Invalid STag Index";
2783 case CCERR_INVALID_SGL_FORMAT
: return "Invalid SGL Format";
2784 case CCERR_ADAPTER_TIMEOUT
: return "Adapter Timeout";
2785 case CCERR_INVALID_CQ_DEPTH
: return "Invalid CQ Depth";
2786 case CCERR_INVALID_PRIVATE_DATA_LENGTH
: return "Invalid Private Data Length";
2787 case CCERR_INVALID_EP
: return "Invalid EP";
2788 case CCERR_FLUSHED
: return "Flushed";
2789 case CCERR_INVALID_WQE
: return "Invalid WQE";
2790 case CCERR_LOCAL_QP_CATASTROPHIC_ERROR
: return "Local QP Catastrophic Error";
2791 case CCERR_REMOTE_TERMINATION_ERROR
: return "Remote Termination Error";
2792 case CCERR_BASE_AND_BOUNDS_VIOLATION
: return "Base and Bounds Violation";
2793 case CCERR_ACCESS_VIOLATION
: return "Access Violation";
2794 case CCERR_INVALID_PD_ID
: return "Invalid PD ID";
2795 case CCERR_WRAP_ERROR
: return "Wrap Error";
2796 case CCERR_INV_STAG_ACCESS_ERROR
: return "Invalid STag Access Error";
2797 case CCERR_ZERO_RDMA_READ_RESOURCES
: return "Zero RDMA Read Resources";
2798 case CCERR_QP_NOT_PRIVILEGED
: return "QP Not Privileged";
2799 case CCERR_STAG_STATE_NOT_INVALID
: return "STag State Not Invalid"; // ???
2800 case CCERR_INVALID_PAGE_SIZE
: return "Invalid Page Size";
2801 case CCERR_INVALID_BUFFER_SIZE
: return "Invalid Buffer Size";
2802 case CCERR_INVALID_PBE
: return "Invalid PBE";
2803 case CCERR_INVALID_FBO
: return "Invalid FBO";
2804 case CCERR_INVALID_LENGTH
: return "Invalid Length";
2805 case CCERR_INVALID_ACCESS_RIGHTS
: return "Invalid Access Rights";
2806 case CCERR_PBL_TOO_BIG
: return "PBL Too Big";
2807 case CCERR_INVALID_VA
: return "Invalid VA";
2808 case CCERR_INVALID_REGION
: return "Invalid Region";
2809 case CCERR_INVALID_WINDOW
: return "Invalid Window";
2810 case CCERR_TOTAL_LENGTH_TOO_BIG
: return "Total Length Too Big";
2811 case CCERR_INVALID_QP_ID
: return "Invalid QP ID";
2812 case CCERR_ADDR_IN_USE
: return "Address In Use";
2813 case CCERR_ADDR_NOT_AVAIL
: return "Address Not Available";
2814 case CCERR_NET_DOWN
: return "Network Down";
2815 case CCERR_NET_UNREACHABLE
: return "Network Unreachable";
2816 case CCERR_CONN_ABORTED
: return "Connection Aborted";
2817 case CCERR_CONN_RESET
: return "Connection Reset";
2818 case CCERR_NO_BUFS
: return "No Buffers";
2819 case CCERR_CONN_TIMEDOUT
: return "Connection Timed-Out";
2820 case CCERR_CONN_REFUSED
: return "Connection Refused";
2821 case CCERR_HOST_UNREACHABLE
: return "Host Unreachable";
2822 case CCERR_INVALID_SEND_SGL_DEPTH
: return "Invalid Send SGL Depth";
2823 case CCERR_INVALID_RECV_SGL_DEPTH
: return "Invalid Receive SGL Depth";
2824 case CCERR_INVALID_RDMA_WRITE_SGL_DEPTH
: return "Ivalid RDMA Write SGL Depth";
2825 case CCERR_INSUFFICIENT_PRIVILEGES
: return "Insufficient Privileges";
2826 case CCERR_STACK_ERROR
: return "Stack Error";
2827 case CCERR_INVALID_VERSION
: return "Invalid Version";
2828 case CCERR_INVALID_MTU
: return "Invalid MTU";
2829 case CCERR_INVALID_IMAGE
: return "Invalid Image";
2830 case CCERR_PENDING
: return "(PENDING: Internal to Adapter... Hopefully you aren't reading this...)"; /* not an error; user internally by adapter */
2831 case CCERR_DEFER
: return "(DEFER: Internal to Adapter... Hopefully you aren't reading this...)"; /* not an error; used internally by adapter */
2832 case CCERR_FAILED_WRITE
: return "Failed Write";
2833 case CCERR_FAILED_ERASE
: return "Failed Erase";
2834 case CCERR_FAILED_VERIFICATION
: return "Failed Verification";
2835 case CCERR_NOT_FOUND
: return "Not Found";
2836 default: return "Unknown Error Code";
2840 // NOTE: Letting these string be separate incase different information should be
2841 // returned that what cc_status_to_string() would return
2842 char* cc_conn_error_to_string(cc_connect_status_t errorCode
) {
2844 switch (errorCode
) {
2845 case CC_CONN_STATUS_SUCCESS
: return "Success";
2846 case CC_CONN_STATUS_NO_MEM
: return "No Memory";
2847 case CC_CONN_STATUS_TIMEDOUT
: return "Timed-Out";
2848 case CC_CONN_STATUS_REFUSED
: return "Refused";
2849 case CC_CONN_STATUS_NETUNREACH
: return "Network Unreachable";
2850 case CC_CONN_STATUS_HOSTUNREACH
: return "Host Unreachable";
2851 case CC_CONN_STATUS_INVALID_RNIC
: return "Invalid RNIC";
2852 case CC_CONN_STATUS_INVALID_QP
: return "Invalid QP";
2853 case CC_CONN_STATUS_INVALID_QP_STATE
: return "Invalid QP State";
2854 case CC_CONN_STATUS_REJECTED
: return "Rejected";
2855 default: return (cc_status_to_string((cc_status_t
)errorCode
));
2859 void displayQueueQuery(cc_qp_handle_t qp
, cc_qp_query_attrs_t
*attrs
) {
2861 //cc_qp_query_attrs_t attr;
2865 OtherNode node
= getNodeFromQPHandle(qp
);
2867 MACHSTATE1(2, "displayQueueQuery() - Called for node %d", node
->myNode
);
2869 MACHSTATE(2, "displayQueueQuery() - Called for unknown node");
2872 // Query the Queue for its Attributes
2873 rtn
= cc_qp_query(contextBlock
->rnic
, qp
, attrs
);
2875 // Let the user know what happened
2876 MACHSTATE2(5, "displayQueueQuery() - ERROR: Unable to query queue: %d, \"%s\"", rtn
, cc_status_to_string(rtn
));
2880 // Output the results of the Query
2881 // DMK : TODO : For now I'm only putting in the ones that I care about... add more later or as needed
2882 MACHSTATE2(1, "displayQueueQuery() - qp_state = %d, \"%s\"", attrs
->qp_state
, cc_qp_state_to_string(attrs
->qp_state
));
2883 if (attrs
->terminate_message_length
> 0) {
2884 memcpy(buf
, attrs
->terminate_message
, attrs
->terminate_message_length
);
2885 buf
[attrs
->terminate_message_length
] = '\0';
2886 MACHSTATE1(1, "displayQueueQuery() - terminate_message = \"%s\"", buf
);
2888 MACHSTATE(1, "displayQueueQuery() - terminate_message = NULL");
2892 char* cc_qp_state_to_string(cc_qp_state_t qpState
) {
2895 case CC_QP_STATE_IDLE
: return "IDLE";
2896 case CC_QP_STATE_CONNECTING
: return "CONNECTED";
2897 case CC_QP_STATE_RTS
: return "RTS";
2898 case CC_QP_STATE_CLOSING
: return "CLOSING";
2899 case CC_QP_STATE_TERMINATE
: return "TERMINATE";
2900 case CC_QP_STATE_ERROR
: return "ERROR";
2901 default: return "unknown";
2905 char* cc_event_id_to_string(cc_event_id_t id
) {
2908 case CCAE_REMOTE_SHUTDOWN
: return "Remote Shutdown";
2909 case CCAE_ACTIVE_CONNECT_RESULTS
: return "Active Connect Results";
2910 case CCAE_CONNECTION_REQUEST
: return "Connection Request";
2911 case CCAE_LLP_CLOSE_COMPLETE
: return "LLP Close Complete";
2912 case CCAE_TERMINATE_MESSAGE_RECEIVED
: return "Terminate Message Received";
2913 case CCAE_LLP_CONNECTION_RESET
: return "LLP Connection Reset";
2914 case CCAE_LLP_CONNECTION_LOST
: return "LLP Connection Lost";
2915 case CCAE_LLP_SEGMENT_SIZE_INVALID
: return "Segment Size Invalid";
2916 case CCAE_LLP_INVALID_CRC
: return "LLP Invalid CRC";
2917 case CCAE_LLP_BAD_FPDU
: return "LLP Bad FPDU";
2918 case CCAE_INVALID_DDP_VERSION
: return "Invalid DDP Version";
2919 case CCAE_INVALID_RDMA_VERSION
: return "Invalid RMDA Version";
2920 case CCAE_UNEXPECTED_OPCODE
: return "Unexpected Opcode";
2921 case CCAE_INVALID_DDP_QUEUE_NUMBER
: return "Invalid DDP Queue Number";
2922 case CCAE_RDMA_READ_NOT_ENABLED
: return "RDMA Read Not Enabled";
2923 case CCAE_RDMA_WRITE_NOT_ENABLED
: return "RDMA Write Not Enabled";
2924 case CCAE_RDMA_READ_TOO_SMALL
: return "RDMA Read Too Small";
2925 case CCAE_NO_L_BIT
: return "No L Bit";
2926 case CCAE_TAGGED_INVALID_STAG
: return "Tagged Invalid STag";
2927 case CCAE_TAGGED_BASE_BOUNDS_VIOLATION
: return "Tagged Base Bounds Violation";
2928 case CCAE_TAGGED_ACCESS_RIGHTS_VIOLATION
: return "Tagged Access Rights Violation";
2929 case CCAE_TAGGED_INVALID_PD
: return "Tagged Invalid PD";
2930 case CCAE_WRAP_ERROR
: return "Wrap Error";
2931 case CCAE_BAD_CLOSE
: return "Bad Close";
2932 case CCAE_BAD_LLP_CLOSE
: return "Bad LLP Close";
2933 case CCAE_INVALID_MSN_RANGE
: return "Invalid MSN Range";
2934 case CCAE_INVALID_MSN_GAP
: return "Invalid MSN Gap";
2935 case CCAE_IRRQ_OVERFLOW
: return "IRRQ Overflow";
2936 case CCAE_IRRQ_MSN_GAP
: return "IRRQ MSG Gap";
2937 case CCAE_IRRQ_MSN_RANGE
: return "IRRQ MSN Range";
2938 case CCAE_IRRQ_INVALID_STAG
: return "IRRQ Invalid STag";
2939 case CCAE_IRRQ_BASE_BOUNDS_VIOLATION
: return "IRRQ Base Bounds Violation";
2940 case CCAE_IRRQ_ACCESS_RIGHTS_VIOLATION
: return "IRRQ Access Rights Violation";
2941 case CCAE_IRRQ_INVALID_PD
: return "IRRQ Invalid PD";
2942 case CCAE_IRRQ_WRAP_ERROR
: return "IRRQ Wrap Error";
2943 case CCAE_CQ_SQ_COMPLETION_OVERFLOW
: return "CQ SQ Completion Overflow";
2944 case CCAE_CQ_RQ_COMPLETION_ERROR
: return "CQ RQ Completion Overflow";
2945 case CCAE_QP_SRQ_WQE_ERROR
: return "QP SRQ WQE Error";
2946 case CCAE_QP_LOCAL_CATASTROPHIC_ERROR
: return "QP Local Catastrophic Error";
2947 case CCAE_CQ_OVERFLOW
: return "CQ Overflow";
2948 case CCAE_CQ_OPERATION_ERROR
: return "CQ Operation Error";
2949 case CCAE_SRQ_LIMIT_REACHED
: return "SRQ Limit Reached";
2950 case CCAE_QP_RQ_LIMIT_REACHED
: return "QP RQ Limit Reached";
2951 case CCAE_SRQ_CATASTROPHIC_ERROR
: return "SRQ Catastrophic Error";
2952 case CCAE_RNIC_CATASTROPHIC_ERROR
: return "RNIC Catastrophic Error";
2953 default: return "Unknown Event ID";
2957 char* cc_connect_status_to_string(cc_connect_status_t status
) {
2960 case CC_CONN_STATUS_SUCCESS
: return "Success";
2961 case CC_CONN_STATUS_TIMEDOUT
: return "Timedout";
2962 case CC_CONN_STATUS_REFUSED
: return "Refused";
2963 case CC_CONN_STATUS_NETUNREACH
: return "Network Unreachable";
2964 default: return "Unknown";