AMPI: fix mismatched free/delete[] identified by valgrind
[charm.git] / src / arch / net / machine-ammasso.c
blobbf7970db7513caf78f2eaeb8af3d3dd9db87e4c1
1 /**
2 ** Ammasso implementation of Converse NET version
3 ** Contains Ammasso specific
4 ** code for:
5 ** CmiMachineInit()
6 ** CmiCommunicationInit()
7 ** CmiNotifyIdle()
8 ** DeliverViaNetwork()
9 ** CommunicationServer()
10 ** CmiMachineExit()
12 ** Written By: Isaac Dooley idooley2@uiuc.edu
13 ** 03/12/05 Esteban Pauli etpauli2@uiuc.edu
14 ** Filippo Gioachin gioachin@uiuc.edu
15 ** David Kunzman kunzman2@uiuc.edu
17 ** Change Log:
18 ** 03/12/05 : DMK : Initial Version
19 ** 04/30/05 : Filippo : Revised Version
21 ** Todo List:
23 **/
25 #ifndef ALIGN8
26 #define ALIGN8(x) (int)((~7)&((x)+7))
27 #endif
29 /* DYNAMIC ALLOCATOR: Limit of the allowed pinned memory */
30 #define MAX_PINNED_MEMORY 100000000
31 /* DYNAMIC ALLOCATOR END */
33 #define WASTE_TIME 600
34 // In order to use CC_POST_CHECK, the last argument to cc_qp_post_sq must be "nWR"
35 #define CC_POST_CHECK(routine,args,nodeTo) {\
36 int retry=10000000; \
37 while (ammasso_check_post_err(routine args, #routine, __LINE__, &nWR, nodeTo, retry) == 1) { \
38 int i; \
39 retry += WASTE_TIME; \
40 for (i=0; i<=WASTE_TIME; ++i) { retry --; } \
41 } \
44 #define CC_CHECK(routine,args) \
45 ammasso_check_err(routine args, #routine, __LINE__);
47 static void ammasso_check_err(cc_status_t returnCode,const char *routine,int line) {
48 if (returnCode!=CC_OK) {
49 char buf[128];
50 char *errMsg = cc_status_to_string(returnCode);
52 // Attempt to close the RNIC
53 cc_rnic_close(contextBlock->rnic);
55 // Let the user know what happened and bail
56 MACHSTATE3(5,"Fatal CC error while executing %s at %s:%d\n", routine, __FILE__, line);
57 MACHSTATE2(5," Description: %d, %s\n",returnCode,errMsg);
58 sprintf(buf,"Fatal CC error while executing %s at %s:%d\n"
59 " Description: %d, %s\n", routine, __FILE__, line,returnCode,errMsg);
60 CmiAbort(buf);
64 // We pass the pointer used in the cc_qp_post_sq call as last parameter, since
65 // when this function is called, cc_pq_post_sq has already been called
66 static int ammasso_check_post_err(cc_status_t returnCode,const char *routine,int line, int *nWR, int nodeTo, int retry) {
67 if (returnCode == CCERR_TOO_MANY_WRS_POSTED && *nWR != 1 && retry>0) {
68 cc_wc_t wc;
69 // drain the send completion queue and retry
70 while (cc_cq_poll(contextBlock->rnic, nodes[nodeTo].send_cq, &wc) == CC_OK) {
71 MACHSTATE1(5, "Error posting send request - INFO: Send completed with node %d... now waiting for acknowledge...", nodeTo);
73 MACHSTATE(5, "Error posting send request - Retrying...");
74 return 1;
77 if (returnCode != CC_OK || *nWR != 1) {
78 char buf[128];
79 char *errMsg = cc_status_to_string(returnCode);
81 // Attempt to close the RNIC
82 cc_rnic_close(contextBlock->rnic);
84 // Let the user know what happened and bail
85 MACHSTATE3(5,"Fatal CC error while executing %s at %s:%d\n", routine, __FILE__, line);
86 MACHSTATE3(5," Description: %d, %s (nWR = %d)\n",returnCode,errMsg,nWR);
87 sprintf(buf,"Fatal CC error while executing %s at %s:%d\n"
88 " Description: %d, %s (nWR = %d)\n", routine, __FILE__, line,returnCode,errMsg,*nWR);
89 CmiAbort(buf);
91 return 0;
95 ////////////////////////////////////////////////////////////////////////////////////////////////////
96 // Function ProtoTypes /////////////////////////////////////////////////////////////////////////////
98 void CmiMachineInit(char **argv);
100 void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port);
102 void AmmassoDoIdle(void);
103 void CmiNotifyIdle(void);
104 static CmiIdleState* CmiNotifyGetState(void);
105 static void CmiNotifyBeginIdle(CmiIdleState *s);
106 static void CmiNotifyStillIdle(CmiIdleState *s);
108 void sendAck(OtherNode node);
109 AmmassoToken *getQPSendToken(OtherNode node);
110 int sendDataOnQP(char* data, int len, OtherNode node, char flags);
111 void DeliverViaNetwork(OutgoingMsg msg, OtherNode otherNode, int rank, unsigned int broot, int copy);
112 static void CommunicationServer(int withDelayMs, int where);
113 void CmiMachineExit(void);
115 void AsynchronousEventHandler(cc_rnic_handle_t rnic, cc_event_record_t *eventRecord, void *cb);
116 void CheckRecvBufForMessage(OtherNode node);
117 //void CompletionEventHandler(cc_rnic_handle_t rnic, cc_cq_handle_t cq, void *cb);
118 //void CompletionEventHandlerWithAckFlag(cc_rnic_handle_t rnic, cc_cq_handle_t cq, void *cb, int breakOnAck);
120 void CmiAmmassoOpenQueuePairs(void);
122 void processAmmassoControlMessage(char* msg, int len, Tailer *tail, OtherNode from);
123 int ProcessMessage(char* msg, int len, Tailer *tail, OtherNode from);
125 OtherNode getNodeFromQPId(cc_qp_id_t qp_id);
126 OtherNode getNodeFromQPHandle(cc_qp_handle_t qp);
128 void establishQPConnection(OtherNode node, int reuseQPFlag);
129 void reestablishQPConnection(OtherNode node);
130 void closeQPConnection(OtherNode node, int destroyQPFlag);
132 void BufferAlloc(int n);
133 void TokenAlloc(int n);
134 void RequestTokens(OtherNode node, int n);
135 void GrantTokens(OtherNode node, int n);
136 void RequestReleaseTokens(OtherNode node, int n);
137 void ReleaseTokens(OtherNode node, int n);
139 ////////////////////////////////////////////////////////////////////////////////////////////////////
140 // Function Bodies /////////////////////////////////////////////////////////////////////////////////
142 /* Callbacks used by the DYNAMIC ALLOCATOR */
143 void AllocatorCheck (void) {
144 int i, limit;
145 char buf[24];
146 for (i=0; i<contextBlock->numNodes; ++i) {
147 if (i==contextBlock->myNode) continue;
148 limit = nodes[i].num_sendTokens - nodes[i].max_used_tokens - 10;
149 CmiPrintf("[%d] AllocatorCheck called: node %d, limit %d\n",CmiMyPe(),i,limit);
150 if (limit > 0) {
151 ReleaseTokens(&nodes[i], limit);
152 CmiPrintf("[%d] Releasing %d tokens to %d\n", CmiMyPe(), limit, i);
153 nodes[i].max_used_tokens = 0;
157 /* DYNAMIC ALLOCATOR END */
159 void BufferAlloc(int n) {
160 int i;
161 char buf[128];
162 cc_stag_index_t newStagIndex;
163 AmmassoBuffer *newBuffers;
165 MACHSTATE1(3, "Allocating %d new Receive Buffers",n);
167 // Try to allocate the memory for n receiving buffers
168 newBuffers = (AmmassoBuffer*) CmiAlloc(n*sizeof(AmmassoBuffer));
170 if (newBuffers == NULL) {
172 // Attempt to close the RNIC
173 cc_rnic_close(contextBlock->rnic);
175 // Let the user know what happened and bail
176 MACHSTATE(5, "BufferAlloc() - ERROR: Unable to allocate memory for RECV buffers");
177 sprintf(buf, "BufferAlloc() - ERROR: Unable to allocate memory for RECV buffers");
178 CmiAbort(buf);
181 contextBlock->pinnedMemory += n*sizeof(AmmassoBuffer);
182 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
183 CC_ADDR_TYPE_VA_BASED,
184 (cc_byte_t*)newBuffers,
185 n*sizeof(AmmassoBuffer),
186 contextBlock->pd_id,
187 0, 0,
188 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE | CC_ACF_REMOTE_WRITE,
189 &newStagIndex)
192 for (i=0; i<n; ++i) {
193 newBuffers[i].tail.length = 0;
194 newBuffers[i].next = &(newBuffers[i+1]);
195 newBuffers[i].stag = newStagIndex;
197 newBuffers[n-1].next = NULL;
198 if (contextBlock->freeRecvBuffers == NULL) {
199 contextBlock->freeRecvBuffers = newBuffers;
200 } else {
201 contextBlock->last_freeRecvBuffers->next = newBuffers;
203 contextBlock->last_freeRecvBuffers = &newBuffers[n-1];
204 contextBlock->num_freeRecvBuffers += n;
207 void TokenAlloc(int n) {
208 int i;
209 char buf[128];
210 cc_stag_index_t newStagIndex;
211 AmmassoToken *sendToken, *tokenScanner;
212 cc_data_addr_t *sendSgl;
213 AmmassoBuffer *sendBuffer;
215 MACHSTATE1(3, "Allocating %d new Tokens",n);
217 // Try to allocate the memory for n sending buffers
218 sendBuffer = (AmmassoBuffer*) CmiAlloc(n*sizeof(AmmassoBuffer));
220 if (sendBuffer == NULL) {
222 // Attempt to close the RNIC
223 cc_rnic_close(contextBlock->rnic);
225 // Let the user know what happened and bail
226 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for SEND buffers");
227 sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for SEND buffers");
228 CmiAbort(buf);
231 contextBlock->pinnedMemory += n*sizeof(AmmassoBuffer);
232 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
233 CC_ADDR_TYPE_VA_BASED,
234 (cc_byte_t*)sendBuffer,
235 n*sizeof(AmmassoBuffer),
236 contextBlock->pd_id,
237 0, 0,
238 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE,
239 &newStagIndex)
242 // Allocate the send tokens
243 sendToken = (AmmassoToken*) CmiAlloc(n*ALIGN8(sizeof(AmmassoToken)));
245 if (sendToken == NULL) {
247 // Attempt to close the RNIC
248 cc_rnic_close(contextBlock->rnic);
250 // Let the user know what happened and bail
251 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for send TOKEN buffers");
252 sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for send TOKEN buffers");
253 CmiAbort(buf);
256 sendSgl = (cc_data_addr_t*) CmiAlloc(n*ALIGN8(sizeof(cc_data_addr_t)));
258 if (sendSgl == NULL) {
260 // Attempt to close the RNIC
261 cc_rnic_close(contextBlock->rnic);
263 // Let the user know what happened and bail
264 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for send SGL buffers");
265 sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for send SGL buffers");
266 CmiAbort(buf);
269 tokenScanner = sendToken;
270 for (i=0; i<n; ++i) {
271 sendSgl->stag = newStagIndex;
272 sendSgl->length = AMMASSO_BUFSIZE + sizeof(Tailer);
273 sendSgl->to = (unsigned long)&(sendBuffer[i]);
274 tokenScanner->wr.wr_id = (unsigned long)tokenScanner;
275 tokenScanner->wr.wr_type = CC_WR_TYPE_RDMA_WRITE;
276 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_count = 1;
277 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_list = sendSgl;
278 tokenScanner->wr.signaled = 1;
279 tokenScanner->localBuf = (AmmassoBuffer*)&(sendBuffer[i]);
280 LIST_ENQUEUE(contextBlock->,freeTokens,tokenScanner);
281 sendSgl = (cc_data_addr_t*)(((char*)sendSgl)+ALIGN8(sizeof(cc_data_addr_t)));
282 tokenScanner = (AmmassoToken*)(((char*)tokenScanner)+ALIGN8(sizeof(AmmassoToken)));
286 void RequestTokens(OtherNode node, int n) {
287 char buf[24];
288 *((int*)buf) = n;
289 sendDataOnQP(buf, sizeof(int), node, AMMASSO_MOREBUFFERS);
292 void GrantTokens(OtherNode node, int n) {
293 int i;
294 char *buf;
295 AmmassoBuffer *buffer;
296 AmmassoBuffer *prebuffer;
297 AmmassoTokenDescription *tokenDesc;
298 if (node->pending != NULL) return;
299 if (n*sizeof(AmmassoTokenDescription) + sizeof(int) > AMMASSO_BUFSIZE) {
300 n = (AMMASSO_BUFSIZE-sizeof(int)) / sizeof(AmmassoTokenDescription);
302 if (contextBlock->num_freeRecvBuffers < n) {
303 int quantity = (n - contextBlock->num_freeRecvBuffers + 1023) & (~1023);
304 BufferAlloc(quantity);
306 buf = (char*) CmiAlloc(n*sizeof(AmmassoTokenDescription) + sizeof(int));
307 *((int*)buf) = n;
308 tokenDesc = (AmmassoTokenDescription*)(buf+sizeof(int));
309 buffer = contextBlock->freeRecvBuffers;
310 for (i=0; i<n; ++i) {
311 tokenDesc[i].stag = buffer->stag;
312 tokenDesc[i].to = (unsigned long)buffer;
313 prebuffer = buffer;
314 buffer = buffer->next;
316 node->pending = contextBlock->freeRecvBuffers;
317 node->last_pending = prebuffer;
318 node->num_pending = n;
319 prebuffer->next = NULL;
320 contextBlock->num_freeRecvBuffers -= n;
321 contextBlock->freeRecvBuffers = buffer;
322 sendDataOnQP(buf, n*sizeof(AmmassoTokenDescription) + sizeof(int), node, AMMASSO_ALLOCATE);
323 CmiFree(buf);
326 void RequestReleaseTokens(OtherNode node, int n) {
327 char buf[24];
328 *((int*)buf) = n;
329 sendDataOnQP(buf, sizeof(int), node, AMMASSO_RELEASE);
332 void ReleaseTokens(OtherNode node, int n) {
333 int i, nWR;
334 AmmassoToken *token;
335 AmmassoBuffer *tokenBuf;
336 cc_data_addr_t *tokenSgl;
338 if (node->num_sendTokens < n) n = node->num_sendTokens - 1;
339 if (n <= 0) return;
340 token = node->sendTokens;
341 tokenBuf = token->localBuf;
343 tokenBuf->tail.length = 1;
344 tokenBuf->tail.ack = 0; // do not send any ACK with this message
345 tokenBuf->tail.flags = AMMASSO_RELEASED;
347 // Setup the local SGL
348 tokenSgl = token->wr.wr_u.rdma_write.local_sgl.sge_list;
349 tokenSgl->length = sizeof(Tailer);
350 tokenSgl->to = (unsigned long)&tokenBuf->tail;
351 token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
353 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
355 if (contextBlock->freeTokens == NULL) {
356 contextBlock->freeTokens = node->sendTokens;
357 } else {
358 contextBlock->last_freeTokens->next = node->sendTokens;
360 for (i=1; i<n; ++i) token = token->next;
361 contextBlock->last_freeTokens = token;
362 node->sendTokens = token->next;
363 token->next = NULL;
364 contextBlock->num_freeTokens += n;
365 node->num_sendTokens -= n;
368 /* CmiMachineInit()
369 * This is called as the node is starting up. It does some initialization of the machine layer.
371 void CmiMachineInit(char **argv) {
373 char buf[128];
374 cc_status_t rtn;
376 AMMASSO_STATS_INIT
378 AMMASSO_STATS_START(MachineInit)
380 MACHSTATE(2, "CmiMachineInit() - INFO: (***** Ammasso Specific*****) - Called... Initializing RNIC...");
381 MACHSTATE1(1, "CmiMachineInit() - INFO: Cmi_charmrun_pid = %d", Cmi_charmrun_pid);
384 //CcdCallOnConditionKeep(CcdPERIODIC, (CcdVoidFn)periodicFunc, NULL);
387 // Allocate a context block that will be used throughout this machine layer
388 if (contextBlock != NULL) {
389 MACHSTATE(5, "CmiMachineInit() - ERROR: contextBlock != NULL");
390 sprintf(buf, "CmiMachineInit() - ERROR: contextBlock != NULL");
391 CmiAbort(buf);
393 contextBlock = (mycb_t*)malloc(sizeof(mycb_t));
394 if (contextBlock == NULL) {
395 MACHSTATE(5, "CmiMachineInit() - ERROR: Unable to malloc memory for contextBlock");
396 sprintf(buf, "CmiMachineInit() - ERROR: Unable to malloc memory for contextBlock");
397 CmiAbort(buf);
400 // Initialize the contextBlock by zero-ing everything out and then setting anything special
401 memset(contextBlock, 0, sizeof(mycb_t));
402 contextBlock->rnic = -1;
404 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-OPEN_RNIC)");
406 // Check to see if in stand-alone mode
407 if (Cmi_charmrun_pid != 0) {
409 // Try to Open the RNIC
410 // TODO : Look-up the difference between CC_PBL_PAGE_MODE and CC_PBL_BLOCK_MODE
411 // TODO : Would a call to cc_rnic_enum or cc_rnic_query do any good here?
412 rtn = cc_rnic_open(0, CC_PBL_PAGE_MODE, contextBlock, &(contextBlock->rnic));
413 if (rtn != CC_OK) {
414 MACHSTATE2(5, "CmiMachineInit() - ERROR: Unable to open RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
415 sprintf(buf, "CmiMachineInit() - ERROR: Unable to open RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
416 CmiAbort(buf);
419 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-SET-ASYNC-HANDLER)");
421 // Set the asynchronous event handler function
422 CC_CHECK(cc_eh_set_async_handler,(contextBlock->rnic, AsynchronousEventHandler, contextBlock));
425 MACHSTATE(3, "CmiMachineInit() - INFO: (PRE-SET-CE-HANDLER)");
427 // Set the Completion Event Handler
428 contextBlock->eh_id = 0;
429 CC_CHECK(cc_eh_set_ce_handler,(contextBlock->rnic, CompletionEventHandler, &(contextBlock->eh_id)));
432 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-PD-ALLOC)");
434 // Allocate the Protection Domain
435 CC_CHECK(cc_pd_alloc,(contextBlock->rnic, &(contextBlock->pd_id)));
437 MACHSTATE(1, "CmiMachineInit() - INFO: RNIC Open For Business!!!");
439 } else { // Otherwise, not in stand-alone mode
441 // Flag the rnic variable as invalid
442 contextBlock->rnic = -1;
445 MACHSTATE(2, "CmiMachineInit() - INFO: Completed Successfully !!!");
447 AMMASSO_STATS_END(MachineInit)
450 void CmiCommunicationInit(char **argv)
454 void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port) {
456 // DMK : NOTE : The hope is that this can be used to request the RMDA addresses of the other nodes after the
457 // initial addresses from charmrun are given to the node. Get the address here, use that to request
458 // the RDMA address, use the RDMA address to create the QP connection (in establishQPConnection(), which
459 // only subtracts one from the address at the moment... the way our cluster is setup).
461 MACHSTATE1(2, "CmiNodeAddressesStoreHandler() - INFO: pe = %d", pe);
462 MACHSTATE1(1, " addr = { sin_family = %d,", addr->sin_family);
463 MACHSTATE1(1, " sin_port = %d,", addr->sin_port);
464 MACHSTATE4(1, " sin_addr.s_addr = %d.%d.%d.%d }", (addr->sin_addr.s_addr & 0xFF), ((addr->sin_addr.s_addr >> 8) & 0xFF), ((addr->sin_addr.s_addr >> 16) & 0xFF), ((addr->sin_addr.s_addr >> 24) & 0xFF));
465 MACHSTATE1(1, " port = %d", port);
469 void AmmassoDoIdle(void) {
471 int i;
472 cc_wc_t wc;
474 AMMASSO_STATS_START(AmmassoDoIdle)
476 /* DYNAMIC ALLOCATOR: Callbacks */
477 /*if (contextBlock->conditionRegistered == 0) {
478 CcdCallOnConditionKeep(CcdPERIODIC_1s, (CcdVoidFn) AllocatorCheck, NULL);
479 //CcdCallFnAfter((CcdVoidFn) AllocatorCheck, NULL, 100);
480 contextBlock->conditionRegistered = 1;
482 /* DYNAMIC ALLOCATOR END */
484 for (i = 0; i < contextBlock->numNodes; i++) {
485 if (i == contextBlock->myNode) continue;
486 CheckRecvBufForMessage(&(nodes[i]));
487 while (cc_cq_poll(contextBlock->rnic, nodes[i].send_cq, &wc) == CC_OK) {
488 MACHSTATE1(3, "AmmassoDoIdle() - INFO: Send completed with node %d... now waiting for acknowledge...", i);
492 AMMASSO_STATS_END(AmmassoDoIdle)
495 void CmiNotifyIdle(void) {
496 AmmassoDoIdle();
499 static CmiIdleState* CmiNotifyGetState(void) {
500 return NULL;
503 static void CmiNotifyBeginIdle(CmiIdleState *s) {
504 AmmassoDoIdle();
507 static void CmiNotifyStillIdle(CmiIdleState *s) {
508 AmmassoDoIdle();
511 /* NOTE: if the ack overflows, we cannot use this method of sending, but we need
512 to send a special message for the purpose */
513 void sendAck(OtherNode node) {
515 int nWR;
517 AMMASSO_STATS_START(sendAck)
519 MACHSTATE2(3, "sendAck() - Ammasso - INFO: Called... sending ACK %d to node %d", *node->remoteAck, node->myNode);
521 if (*node->remoteAck < ACK_MASK) {
523 // Send an ACK message to the specified QP/Connection/Node
524 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, node->ack_sq_wr, 1, &nWR),node->myNode);
526 } else {
527 // Rare case: happens only after days of run! In this case, do not update
528 // directly the ack, but zero it on the other side, wait one second to be
529 // safe, and then send a regular message with the ACK
530 AmmassoToken *token;
531 AmmassoBuffer *tokenBuf;
532 int tmp_ack = *node->remoteAck;
533 *node->remoteAck = 0;
534 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, node->ack_sq_wr, 1, &nWR),node->myNode);
536 sleep(1);
538 token = getQPSendToken(node);
539 tokenBuf = token->localBuf;
540 tokenBuf->tail.ack = tmp_ack;
541 tokenBuf->tail.flags = ACK_WRAPPING;
542 tokenBuf->tail.length = 1; // So it will be seen by the receiver
543 token->wr.wr_u.rdma_write.local_sgl.sge_list->length = sizeof(Tailer);
544 token->wr.wr_u.rdma_write.local_sgl.sge_list->to = (unsigned long)&tokenBuf->tail;
545 token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
546 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
547 LIST_ENQUEUE(node->,usedTokens,token);
548 node->max_used_tokens = (node->num_usedTokens>node->max_used_tokens)?node->num_usedTokens:node->max_used_tokens;
549 *node->remoteAck = tmp_ack & ACK_MASK;
552 node->messagesNotYetAcknowledged = 0;
554 AMMASSO_STATS_END(sendAck)
557 /* NOTE, even in SMP versions, only the communication server should be sending
558 messages out, thus no locking should be necessary */
559 /* This function returns the token usable for next communication. If no token is
560 available, it blocks until one becomes available */
561 AmmassoToken *getQPSendToken(OtherNode node) {
562 AmmassoToken *token;
563 int i;
564 cc_wc_t wc;
565 ammasso_ack_t newAck;
566 while (node->connectionState != QP_CONN_STATE_CONNECTED ||
567 node->sendTokens == NULL) {
568 // Try to see if an ACK has been sent directly, so we free some tokens The
569 // direct token will never be greater than ACK_MASK (by protocol
570 // definition), so we do not need to wrap around
571 MACHSTATE(3, "getQPSendBuffer() - INFO: No tokens available");
572 if (*node->directAck > node->localAck) {
573 newAck = *node->directAck;
574 for (i=node->localAck; i<newAck; ++i) {
575 LIST_DEQUEUE(node->,usedTokens,token);
576 LIST_ENQUEUE(node->,sendTokens,token);
578 node->localAck = newAck;
581 CheckRecvBufForMessage(node);
583 while (cc_cq_poll(contextBlock->rnic, node->send_cq, &wc) == CC_OK) {
584 MACHSTATE1(3, "getQPSendBuffer() - INFO: Send completed with node %d... now waiting for acknowledge...", node->myNode);
587 LIST_DEQUEUE(node->,sendTokens,token);
588 return token;
592 int getQPSendBuffer(OtherNode node, char force) {
594 int rtnBufIndex, i;
595 cc_wc_t wc;
597 AMMASSO_STATS_START(getQPSendBuffer)
599 MACHSTATE1(3, "getQPSendBuffer() - Ammasso - Called (send to node %d)...", node->myNode);
601 while (1) {
603 AMMASSO_STATS_START(getQPSendBuffer_loop)
605 rtnBufIndex = -1;
607 AMMASSO_STATS_START(getQPSendBuffer_lock)
609 MACHSTATE(3, "getQPSendBuffer() - INFO: Pre-sendBufLock");
610 #if CMK_SHARED_VARS_UNAVAILABLE
611 while (node->sendBufLock != 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
612 #endif
613 CmiLock(node->sendBufLock);
615 AMMASSO_STATS_END(getQPSendBuffer_lock)
617 // If force is set, let the message use any of the available send buffers. Otherwise, there can only
618 // be AMMASSO_NUMMSGBUFS_PER_QP message outstanding (haven't gotten an ACK for) so wait for that.
619 // VERY IMPORTANT !!! Only use force for sending ACKs !!!!!!!! If this is done, it is ensured that there
620 // will always be at least one buffer available when the force code is executed
621 if (node->connectionState == QP_CONN_STATE_CONNECTED) {
622 if (force) {
623 rtnBufIndex = node->send_UseIndex;
624 node->send_UseIndex++;
625 if (node->send_UseIndex >= AMMASSO_NUMMSGBUFS_PER_QP * 2)
626 node->send_UseIndex = 0;
627 } else {
628 if (node->send_InUseCounter < AMMASSO_NUMMSGBUFS_PER_QP) {
629 rtnBufIndex = node->send_UseIndex;
630 node->send_InUseCounter++;
631 node->send_UseIndex++;
632 if (node->send_UseIndex >= AMMASSO_NUMMSGBUFS_PER_QP * 2)
633 node->send_UseIndex = 0;
638 CmiUnlock(node->sendBufLock);
639 MACHSTATE3(3, "getQPSendBuffer() - INFO: Post-sendBufLock - rtnBufIndex = %d, node->connectionState = %d, node->send_UseIndex = %d", rtnBufIndex, node->connectionState, node->send_UseIndex);
641 if (rtnBufIndex >= 0) {
643 AMMASSO_STATS_END(getQPSendBuffer_loop)
644 break;
646 } else {
648 //usleep(1);
650 AMMASSO_STATS_START(getQPSendBuffer_CEH)
652 CheckRecvBufForMessage(node);
653 //CompletionEventHandlerWithAckFlag(contextBlock->rnic, node->recv_cq, node, 1);
655 //CompletionEventHandler(contextBlock->rnic, node->send_cq, node);
656 while (cc_cq_poll(contextBlock->rnic, node->send_cq, &wc) == CC_OK) {
657 MACHSTATE1(3, "getQPSendBuffer() - INFO: Send completed with node %d... now waiting for acknowledge...", nextNode);
661 AMMASSO_STATS_END(getQPSendBuffer_CEH)
663 AMMASSO_STATS_END(getQPSendBuffer_loop)
667 //// Increment the send_UseIndex counter so another buffer will be used next time
668 //node->send_UseIndex++;
669 //if (node->send_UseIndex >= AMMASSO_NUMMSGBUFS_PER_QP * 2)
670 // node->send_UseIndex = 0;
672 MACHSTATE1(3, "getQPSendBuffer() - Ammasso - Finished (returning buffer index: %d)", rtnBufIndex);
674 AMMASSO_STATS_END(getQPSendBuffer)
676 return rtnBufIndex;
681 // NOTE: The force parameter can be thought of as an "is ACK" control message flag (see comments in getQPSendBuffer())
682 int sendDataOnQP(char* data, int len, OtherNode node, char flags) {
684 AmmassoToken *sendBufToken;
685 AmmassoBuffer *tokenBuf;
686 cc_data_addr_t *tokenSgl;
687 int toSendLength;
688 cc_wc_t wc;
689 cc_uint32_t nWR;
690 char isFirst = 1;
691 char *origMsgStart = data;
692 char *sendBufBegin;
694 int origSize = len;
696 if (origSize <= 1024) {
697 AMMASSO_STATS_START(sendDataOnQP_1024)
698 } else if (origSize <= 2048) {
699 AMMASSO_STATS_START(sendDataOnQP_2048)
700 } else if (origSize <= 4096) {
701 AMMASSO_STATS_START(sendDataOnQP_4096)
702 } else if (origSize <= 16384) {
703 AMMASSO_STATS_START(sendDataOnQP_16384)
704 } else {
705 AMMASSO_STATS_START(sendDataOnQP_over)
708 AMMASSO_STATS_START(sendDataOnQP)
710 //CompletionEventHandler(contextBlock->rnic, node->recv_cq, node);
711 //CompletionEventHandler(contextBlock->rnic, node->send_cq, node);
712 while (cc_cq_poll(contextBlock->rnic, node->send_cq, &wc) == CC_OK) {
713 MACHSTATE1(3, "sendDataOnQP() - INFO: Send completed with node %d... now waiting for acknowledge...", node->myNode);
717 MACHSTATE2(2, "sendDataOnQP() - Ammasso - INFO: Called (send to node %d, len = %d)...", node->myNode, len);
719 // Assert that control messages will not be fragmented
720 CmiAssert(flags==0 || len<=AMMASSO_BUFSIZE);
722 // DMK : For each message that is fragmented, attach another DGRAM header to
723 // it, (keeping in mind that the control messages are no where near large
724 // enough for this to occur).
726 while (len > 0) {
728 AMMASSO_STATS_START(sendDataOnQP_pre_send)
730 // Get a free send buffer (NOTE: This call will block until a send buffer is free)
731 sendBufToken = getQPSendToken(node);
732 tokenBuf = sendBufToken->localBuf;
733 // Enqueue the token to the used queue immediately, so it is safe to be
734 // interrupted by other calls
735 LIST_ENQUEUE(node->,usedTokens,sendBufToken);
736 node->max_used_tokens = (node->num_usedTokens>node->max_used_tokens)?node->num_usedTokens:node->max_used_tokens;
738 // Copy the contents (up to AMMASSO_BUFSIZE worth) of data into the send buffer
740 // The toSendLength includes the DGRAM header size. If the chunk sent is not
741 // the first, the initial DGRAM_HEADER_SIZE bytes need to be contructed from
742 // the DGRAM header of the original message (instead of just being copied
743 // together with the message itself)
745 if (isFirst) {
747 toSendLength = len > AMMASSO_BUFSIZE ? AMMASSO_BUFSIZE : len; // MIN of len and AMMASSO_BUFSIZE
748 sendBufBegin = tokenBuf->buf + AMMASSO_BUFSIZE - ALIGN8(toSendLength);
750 memcpy(sendBufBegin, data, toSendLength);
752 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: Sending 1st Fragment - toSendLength = %d...", toSendLength);
754 } else {
756 toSendLength = len > (AMMASSO_BUFSIZE - DGRAM_HEADER_SIZE) ? AMMASSO_BUFSIZE : (len+DGRAM_HEADER_SIZE); // MIN of len and AMMASSO_BUFSIZE
757 sendBufBegin = tokenBuf->buf + AMMASSO_BUFSIZE - ALIGN8(toSendLength);
759 memcpy(sendBufBegin+DGRAM_HEADER_SIZE, data, toSendLength-DGRAM_HEADER_SIZE);
761 // This dgram header is the same of the original message, except for the
762 // sequence number, so copy the original and just modify the sequence
763 // number.
765 // NOTE: If the message is large enough that fragmentation needs to
766 // happen, the send_next_lock is already owned by the thread executing
767 // this code.
768 memcpy(sendBufBegin, origMsgStart, DGRAM_HEADER_SIZE);
770 ((DgramHeader*)sendBufBegin)->seqno = node->send_next;
771 node->send_next = ((node->send_next+1) & DGRAM_SEQNO_MASK); // Increase the sequence number
773 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: Sending Continuation Fragment - toSendLength = %d...", toSendLength);
776 // Write the size of the message at the end of the buffer, with the ack and
777 // flags
778 tokenBuf->tail.length = toSendLength;
779 node->messagesNotYetAcknowledged = 0;
780 tokenBuf->tail.ack = *node->remoteAck;
781 if (*node->remoteAck > ACK_MASK) {
782 // Rare case of ACK wrapping
783 tokenBuf->tail.ack = 0;
784 // in the rare case that we didn't send an ack with this message because
785 // the ack just wrapped around (rare case), send a full ACK message. It is
786 // safe to send it now, since the queues are consistent between sender and
787 // receiver. The fact that this ack is effectively sent before the regular
788 // message is not important, since on the other side it will be discovered
789 // only after this one is received
790 sendAck(node);
792 tokenBuf->tail.flags = flags;
794 // Setup the local SGL
795 tokenSgl = sendBufToken->wr.wr_u.rdma_write.local_sgl.sge_list;
796 tokenSgl->length = ALIGN8(toSendLength) + sizeof(Tailer);
797 tokenSgl->to = (unsigned long)sendBufBegin;
798 sendBufToken->wr.wr_u.rdma_write.remote_to = (unsigned long)(((char*)sendBufToken->remoteBuf)+AMMASSO_BUFSIZE-ALIGN8(toSendLength));
800 // The remote_to and remote_stag are already fixed part of the token
802 AMMASSO_STATS_END(sendDataOnQP_pre_send)
803 AMMASSO_STATS_START(sendDataOnQP_send)
805 MACHSTATE(3, "sendDataOnQP() - Ammasso - INFO: Enqueuing RDMA Write WR...");
807 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tokenSgl->to = %p", tokenSgl->to);
808 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: sendBufToken->wr.wr_u.rdma_write.remote_to = %p", sendBufToken->wr.wr_u.rdma_write.remote_to);
809 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tail.ack = %d", tokenBuf->tail.ack);
810 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tail.flags = %d", tokenBuf->tail.flags);
812 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &sendBufToken->wr, 1, &nWR),node->myNode);
814 MACHSTATE(1, "sendDataOnQP() - Ammasso - INFO: RDMA Write WR Enqueue Completed");
816 AMMASSO_STATS_END(sendDataOnQP_send)
817 AMMASSO_STATS_START(sendDataOnQP_post_send)
819 // Update the data and len variables for the next while (if fragmenting is needed)
820 data += toSendLength;
821 len -= toSendLength;
822 if (isFirst == 0) {
823 data -= DGRAM_HEADER_SIZE;
824 len += DGRAM_HEADER_SIZE;
826 isFirst = 0;
828 AMMASSO_STATS_END(sendDataOnQP_post_send)
831 AMMASSO_STATS_END(sendDataOnQP)
833 if (origSize <= 1024) {
834 AMMASSO_STATS_END(sendDataOnQP_1024)
835 } else if (origSize <= 2048) {
836 AMMASSO_STATS_END(sendDataOnQP_2048)
837 } else if (origSize <= 4096) {
838 AMMASSO_STATS_END(sendDataOnQP_4096)
839 } else if (origSize <= 16384) {
840 AMMASSO_STATS_END(sendDataOnQP_16384)
841 } else {
842 AMMASSO_STATS_END(sendDataOnQP_over)
848 /* DeliverViaNetwork()
851 void DeliverViaNetwork(OutgoingMsg msg, OtherNode otherNode, int rank, unsigned int broot, int copy) {
853 cc_status_t rtn;
854 cc_stag_index_t stag;
855 cc_data_addr_t sgl;
856 cc_sq_wr_t wr;
857 cc_uint32_t WRsPosted;
859 AMMASSO_STATS_START(DeliverViaNetwork)
861 MACHSTATE(2, "DeliverViaNetwork() - Ammasso - INFO: Called...");
863 // We don't need to do this since the message data is being copied into the
864 // send_buf, the OutgoingMsg can be free'd ASAP
866 // The lock will be already held by the calling function in machine.c
867 // (CommLock)
869 AMMASSO_STATS_START(DeliverViaNetwork_post_lock)
871 DgramHeaderMake(msg->data, rank, msg->src, Cmi_charmrun_pid, otherNode->send_next, broot); // Set DGram Header Fields In-Place
872 otherNode->send_next = ((otherNode->send_next+1) & DGRAM_SEQNO_MASK); // Increase the sequence number
874 MACHSTATE1(1, "DeliverViaNetwork() - INFO: Sending message to node %d", otherNode->myNode);
875 MACHSTATE1(1, "DeliverViaNetwork() - INFO: rank %d", rank);
876 MACHSTATE1(1, "DeliverViaNetwork() - INFO: broot %d", broot);
878 AMMASSO_STATS_START(DeliverViaNetwork_send)
880 sendDataOnQP(msg->data, msg->size, otherNode, 0);
882 AMMASSO_STATS_END(DeliverViaNetwork_send)
884 //CmiUnlock(otherNode->send_next_lock);
885 MACHSTATE(1, "DeliverViaNetwork() - INFO: Post-send_next_lock");
888 // DMK : NOTE : I left this in as an example of how to retister the memory with the RNIC on the fly. Since the ccil
889 // library has a bug which causes it not to de-pin memory that we unregister, it will probably be a better
890 // idea to fragment a message that is too large for a single buffer from the buffer pool.
891 /***************************************************************
892 // DMK : TODO : This is an important optimization area. This is registering the memory where the outgoing message
893 // is located with the RNIC. One balancing act that we will need to do is the cost of copying messages
894 // into memory regions already allocated VS. the cost of registering the memory. The cost of registering
895 // memory might be constant as the memory doesn't have to traversed. If the cost of doing a memcpy on a
896 // small message, since memcpy traverses the memory range, is less than the cost of registering the
897 // memory with the RNIC, it would be better to copy the message into a pre-registered memory location.
899 // Start by registering the memory of the outgoing message with the RNIC
900 rtn = cc_nsmr_register_virt(contextBlock->rnic,
901 CC_ADDR_TYPE_VA_BASED,
902 msg->data + DGRAM_HEADER_SIZE,
903 msg->size - DGRAM_HEADER_SIZE,
904 contextBlock->pd_id,
905 0, 0,
906 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE,
907 &stag
909 if (rtn != CC_OK) {
910 // Let the user know what happened
911 MACHSTATE2(3, "DeliverViaNetwork() - Ammasso - ERROR - Unable to register OutgoingMsg memory with RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
912 return;
915 // Setup the Scatter/Gather List
916 sgl.stag = stag;
917 sgl.to = (cc_uint64_t)(unsigned long)(msg->data + DGRAM_HEADER_SIZE);
918 sgl.length = msg->size - DGRAM_HEADER_SIZE;
920 // Create the Work Request
921 wr.wr_id = (cc_uint64_t)(unsigned long)&(wr);
922 wr.wr_type = CC_WR_TYPE_SEND;
923 wr.wr_u.send.local_sgl.sge_count = 1;
924 wr.wr_u.send.local_sgl.sge_list = &sgl;
925 wr.signaled = 1;
927 // Post the Work Request to the Send Queue
928 // DMK : TODO : Update this code to handle CCERR_QP_TOO_MANY_WRS_POSTED errors (pause breifly and retry)
929 rtn = cc_qp_post_sq(contextBlock->rnic, otherNode->qp, &(wr), 1, &(WRsPosted));
930 if (rtn != CC_OK || WRsPosted != 1) {
931 // Let the user know what happened
932 MACHSTATE2(3, "DeliverViaNetwork() - Ammasso - ERROR - Unable post Work Request to Send Queue: %d, \"%s\"", rtn, cc_status_to_string(rtn));
933 displayQueueQuery(otherNode->qp, &(otherNode->qp_attrs));
935 ***************************************************************/
937 MACHSTATE(2, "DeliverViaNetwork() - Ammasso - INFO: Completed.");
939 AMMASSO_STATS_END(DeliverViaNetwork_post_lock)
940 AMMASSO_STATS_END(DeliverViaNetwork)
944 /****************************************************************************
946 * CheckSocketsReady
948 * Checks both sockets to see which are readable and which are writeable.
949 * We check all these things at the same time since this can be done for
950 * free with ``select.'' The result is stored in global variables, since
951 * this is essentially global state information and several routines need it.
953 ***************************************************************************/
955 int CheckSocketsReady(int withDelayMs)
957 int nreadable;
958 CMK_PIPE_DECL(withDelayMs);
960 CmiStdoutAdd(CMK_PIPE_SUB);
961 if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
963 nreadable=CMK_PIPE_CALL();
964 ctrlskt_ready_read = 0;
965 dataskt_ready_read = 0;
966 dataskt_ready_write = 0;
968 if (nreadable == 0) {
969 MACHSTATE(2, "} CheckSocketsReady (nothing readable)");
970 return nreadable;
972 if (nreadable==-1) {
973 CMK_PIPE_CHECKERR();
974 MACHSTATE(3, "} CheckSocketsReady (INTERRUPTED!)");
975 return CheckSocketsReady(0);
977 CmiStdoutCheck(CMK_PIPE_SUB);
978 if (Cmi_charmrun_fd!=-1)
979 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
980 MACHSTATE(3, "} CheckSocketsReady");
981 return nreadable;
984 /***********************************************************************
985 * CommunicationServer()
987 * This function does the scheduling of the tasks related to the
988 * message sends and receives.
989 * It first check the charmrun port for message, and poll the gm event
990 * for send complete and outcoming messages.
992 ***********************************************************************/
994 // NOTE: Always called from interrupt
995 static void ServiceCharmrun_nolock(void) {
997 int again = 1;
999 MACHSTATE(2, "ServiceCharmrun_nolock begin {");
1001 while (again) {
1002 again = 0;
1004 CheckSocketsReady(0);
1005 if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
1006 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
1009 MACHSTATE(2, "} ServiceCharmrun_nolock end");
1012 void processAmmassoControlMessage(char* msg, int len, Tailer *tail, OtherNode from) {
1014 int nodeIndex, ctrlType, i, n, nWR;
1015 AmmassoToken *token, *pretoken;
1016 AmmassoBuffer *tokenBuf;
1017 cc_data_addr_t *tokenSgl;
1018 OtherNode node;
1019 AmmassoTokenDescription *tokenDesc;
1021 AMMASSO_STATS_START(processAmmassoControlMessage)
1023 // Do not check the message, the flags field was set, and this is enough
1025 // Perform an action based on the control message type
1026 switch (tail->flags) {
1028 case AMMASSO_READY:
1030 // Decrement the node ready count by one
1031 contextBlock->nodeReadyCount--;
1032 MACHSTATE1(3, "processAmmassoControlMessage() - Ammasso - INFO: Received READY packet... still waiting for %d more...", contextBlock->nodeReadyCount);
1034 break;
1036 case AMMASSO_ALLOCATE: // Sent by the receiver to allocate more tokens
1038 token = getQPSendToken(from);
1039 tokenBuf = token->localBuf;
1041 tokenBuf->tail.length = 1;
1042 tokenBuf->tail.ack = 0; // do not send any ACK with this message
1043 tokenBuf->tail.flags = AMMASSO_ALLOCATED;
1045 // Setup the local SGL
1046 tokenSgl = token->wr.wr_u.rdma_write.local_sgl.sge_list;
1047 tokenSgl->length = sizeof(Tailer);
1048 tokenSgl->to = (unsigned long)&tokenBuf->tail;
1049 token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
1051 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
1053 LIST_ENQUEUE(from->,usedTokens,token);
1054 from->max_used_tokens = (from->num_usedTokens>from->max_used_tokens)?from->num_usedTokens:from->max_used_tokens;
1055 // add the new tokens at the end of usedTokens (right after the one which
1056 // which we are sending back the confirmation)
1057 n = *((int*)msg);
1058 if (contextBlock->num_freeTokens < n) {
1059 int quantity = (n - contextBlock->num_freeTokens + 1023) & (~1023);
1060 BufferAlloc(quantity);
1062 token = contextBlock->freeTokens;
1063 tokenDesc = (AmmassoTokenDescription*)(msg+sizeof(int));
1064 for (i=0; i<n; ++i) {
1065 token->remoteBuf = (AmmassoBuffer*)tokenDesc[i].to;
1066 token->wr.wr_u.rdma_write.remote_stag = tokenDesc[i].stag;
1067 pretoken = token;
1068 token = token->next;
1070 from->last_usedTokens->next = contextBlock->freeTokens;
1071 from->last_usedTokens = pretoken;
1072 pretoken->next = NULL;
1073 contextBlock->freeTokens = token;
1074 from->num_usedTokens += n;
1075 contextBlock->num_freeTokens -= n;
1077 break;
1079 case AMMASSO_ALLOCATED: // Sent by the sender to conferm the token allocation
1081 // link the pending buffers at the end of those allocated (right after the
1082 // one with which this message came)
1083 from->last_recv_buf->next = from->pending;
1084 from->last_recv_buf = from->last_pending;
1085 from->last_pending = NULL;
1086 from->num_recv_buf += from->num_pending;
1087 *from->remoteAck += from->num_pending;
1088 from->num_pending = 0;
1090 break;
1092 case AMMASSO_MOREBUFFERS: // Sent by the sender to ask for more tokens
1094 /* DYNAMIC ALLOCATOR: Grant what requested */
1095 n = *((int*)msg);
1096 GrantTokens(from, n);
1097 /* DYNAMIC ALLOCATOR END */
1099 break;
1101 case AMMASSO_RELEASE: // Sent by the receiver to request back tokens
1103 /* DYNAMIC ALLOCATOR: Ignore the request of the receiver to return buffers */
1104 /* DYNAMIC ALLOCATOR END */
1106 break;
1108 case AMMASSO_RELEASED: // Sent by the sender to release tokens
1110 // This secondLastRecvBuf is set in CheckRecvBufForMessage
1111 from->secondLastRecvBuf->next = NULL;
1112 tokenBuf = from->last_recv_buf;
1113 from->last_recv_buf = from->secondLastRecvBuf;
1114 from->num_recv_buf--;
1115 LIST_ENQUEUE(contextBlock->,freeRecvBuffers,tokenBuf);
1116 n = *((int*)msg);
1117 for (i=1; i<n; ++i) {
1118 LIST_DEQUEUE(from->,recv_buf,tokenBuf);
1119 LIST_ENQUEUE(contextBlock->,freeRecvBuffers,tokenBuf);
1122 break;
1124 case ACK_WRAPPING:
1126 // The ACK has already been accounted in ProcessMessage, just mask it
1127 from->localAck &= ACK_MASK;
1129 break;
1131 default:
1132 MACHSTATE1(5, "processAmmassoControlMessage() - Ammasso -INFO: Received control message with invalid flags: %d", tail->flags);
1133 CmiAbort("Invalid control message received");
1136 AMMASSO_STATS_END(processAmmassoControlMessage)
1139 int ProcessMessage(char* msg, int len, Tailer *tail, OtherNode from) {
1141 int rank, srcPE, seqNum, magicCookie, size, i;
1142 unsigned int broot;
1143 unsigned char checksum;
1144 OtherNode fromNode;
1145 char *newMsg;
1146 int needAck;
1148 AMMASSO_STATS_START(ProcessMessage)
1150 MACHSTATE(2, "ProcessMessage() - INFO: Called...");
1151 MACHSTATE2(1, "ProcessMessage() - INFO: tail - ack=%d, flags=%d", tail->ack, tail->flags);
1153 // Decide whether a direct ACK will be needed or not, based on how many
1154 // messages the other side has sent us, and we haven't acknowledged yet
1155 if (2*from->messagesNotYetAcknowledged > from->num_recv_buf) {
1156 needAck = 1;
1157 } else {
1158 needAck = 0;
1161 // This message contains an ACK as all messages, parse it. Do not worry about
1162 // wrap around of the ACK, since when this happen a special control message
1163 // ACK_WRAPPING is received
1164 if (tail->ack > from->localAck) {
1165 AmmassoToken *token;
1166 for (i=from->localAck; i<tail->ack; ++i) {
1167 LIST_DEQUEUE(from->,usedTokens,token);
1168 LIST_ENQUEUE(from->,sendTokens,token);
1170 from->localAck = tail->ack;
1174 MACHSTATE1(1, "ProcessMessage() - INFO: msg = %p", msg);
1175 int j;
1176 for (j = 0; j < DGRAM_HEADER_SIZE + 24; j++) {
1177 MACHSTATE2(1, "ProcessMessage() - INFO: msg[%d] = %02x", j, msg[j]);
1181 if (tail->flags != 0) {
1182 processAmmassoControlMessage(msg, len, tail, from);
1183 return needAck;
1186 // Get the header fields of the message
1187 DgramHeaderBreak(msg, rank, srcPE, magicCookie, seqNum, broot);
1189 MACHSTATE(1, "ProcessMessage() - INFO: Message Contents:");
1190 MACHSTATE1(1, " rank = %d", rank);
1191 MACHSTATE1(1, " srcPE = %d", srcPE);
1192 MACHSTATE1(1, " magicCookie = %d", magicCookie);
1193 MACHSTATE1(1, " seqNum = %d", seqNum);
1194 MACHSTATE1(1, " broot = %d", broot);
1196 #ifdef CMK_USE_CHECKSUM
1198 // Check the checksum
1199 checksum = computeCheckSum(msg, len);
1200 if (checksum != 0) {
1201 MACHSTATE1(5, "ProcessMessage() - Ammasso - ERROR: Received message with bad checksum (%d)... ignoring...", checksum);
1202 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Received message with bad checksum (%d)... ignoring...\n", CmiMyPe(), checksum);
1203 return needAck;
1206 #else
1208 // Check the magic cookie for correctness
1209 if (magicCookie != (Cmi_charmrun_pid & DGRAM_MAGIC_MASK)) {
1210 MACHSTATE(5, "ProcessMessage() - Ammasso - ERROR: Received message with a bad magic cookie... ignoring...");
1211 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Received message with a bad magic cookie... ignoring...\n", CmiMyPe());
1214 CmiPrintf("ProcessMessage() - INFO: Cmi_charmrun_pid = %d\n", Cmi_charmrun_pid);
1215 CmiPrintf("ProcessMessage() - INFO: node->recv_UseIndex = %d\n", nodes_by_pe[srcPE]->recv_UseIndex);
1216 CmiPrintf("ProcessMessage() - INFO: msg = %p\n", msg);
1217 int j;
1218 for (j = 0; j < DGRAM_HEADER_SIZE + 24; j++) {
1219 CmiPrintf("ProcessMessage() - INFO: msg[%d] = %02x\n", j, msg[j]);
1222 return needAck;
1225 #endif
1227 // Get the OtherNode structure for the node this message was sent from
1228 fromNode = nodes_by_pe[srcPE];
1230 CmiAssert(fromNode == from);
1232 MACHSTATE1(1, "ProcessMessage() - INFO: Message from node %d...", fromNode->myNode);
1234 //MACHSTATE(3, "ProcessMessage() - INFO: Pre-recv_expect_lock");
1235 //#if CMK_SHARED_VARS_UNAVAILABLE
1236 // while (fromNode->recv_expect_lock != 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
1237 //#endif
1238 //CmiLock(fromNode->recv_expect_lock);
1240 // Check the sequence number of the message
1241 if (seqNum == (fromNode->recv_expect)) {
1242 // The expected sequence number was received so setup the next one
1243 fromNode->recv_expect = ((seqNum+1) & DGRAM_SEQNO_MASK);
1244 } else {
1245 MACHSTATE(5, "ProcessMessage() - Ammasso - ERROR: Received a message with a bad sequence number... ignoring...");
1246 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Received a message witha bad sequence number... ignoring...\n", CmiMyPe());
1247 return needAck;
1250 //CmiUnlock(fromNode->recv_expect_lock);
1251 //MACHSTATE(3, "ProcessMessage() - INFO: Post-recv_expect_lock");
1253 newMsg = fromNode->asm_msg;
1255 // Check to see if this is the start of the message (i.e. - if the message was
1256 // fragmented, if this is the first packet of the message) or the entire
1257 // message. Only the first packet's header information will be copied into the
1258 // asm_buf buffer.
1259 if (newMsg == NULL) {
1261 // Allocate memory to hold the new message
1262 size = CmiMsgHeaderGetLength(msg);
1263 newMsg = (char*)CmiAlloc(size);
1264 _MEMCHECK(newMsg);
1266 // Verify the message size
1267 if (len > size) {
1268 MACHSTATE2(5, "ProcessMessage() - Ammasso - ERROR: Message size mismatch (size: %d != len: %d)", size, len);
1269 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Message size mismatch (size: %d != len: %d)\n", CmiMyPe(), size, len);
1270 CmiAbort("Message Size Mismatch");
1273 // Copy the message into the memory location and setup the fromNode structure accordingly
1274 //memcpy(newMsg, msg + DGRAM_HEADER_SIZE, size);
1275 memcpy(newMsg, msg, len);
1276 fromNode->asm_rank = rank;
1277 fromNode->asm_total = size;
1278 fromNode->asm_fill = len;
1279 fromNode->asm_msg = newMsg;
1281 // Otherwise, this packet is a continuation of the overall message so append it to the last packet
1282 } else {
1284 size = len - DGRAM_HEADER_SIZE;
1286 // Make sure there is enough room in the asm_msg buffer (this should always be true because of the alloc in the true
1287 // portion of this if statement).
1288 if (fromNode->asm_fill + size > fromNode->asm_total) {
1289 MACHSTATE(5, "ProcessMessage() - Ammasso - ERROR: Message size mismatch");
1290 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Message size mismatch", CmiMyPe());
1291 CmiAbort("Message Size Mismatch");
1294 // Copy the message into the asm_msg buffer
1295 memcpy(newMsg + fromNode->asm_fill, msg + DGRAM_HEADER_SIZE, size);
1296 fromNode->asm_fill += size;
1299 MACHSTATE2(1, "ProcessMessage() - Ammasso - INFO: Message copied into asm_buf (asm_fill = %d, asm_total = %d)...", fromNode->asm_fill, fromNode->asm_total);
1301 // Check to see if a full packet has been received
1302 if (fromNode->asm_fill == fromNode->asm_total) {
1304 MACHSTATE(1, "ProcessMessage() - Ammasso - INFO: Pushing message...");
1306 // React to the message based on its rank
1307 switch (rank) {
1309 case DGRAM_BROADCAST:
1311 MACHSTATE1(3, "ProcessMessage() - Ammasso - INFO: Broadcast - _Cmi_mynodesize = %d", _Cmi_mynodesize);
1313 // Make a copy of the message for all the PEs on this node except for zero, zero gets the original
1314 for (i = 1; i < _Cmi_mynodesize; i++)
1315 CmiPushPE(i, CopyMsg(newMsg, fromNode->asm_total));
1316 CmiPushPE(0, newMsg);
1317 break;
1319 #if CMK_NODE_QUEUE_AVAILABLE
1320 case DGRAM_NODEBROADCAST:
1321 case DGRAM_NODEMESSAGE:
1322 CmiPushNode(newMsg);
1323 break;
1324 #endif
1326 default:
1327 CmiPushPE(rank, newMsg);
1330 MACHSTATE(1, "ProcessMessage() - Ammasso - INFO: NULLing asm_msg...");
1332 // Clear the message buffer
1333 fromNode->asm_msg = NULL;
1336 MACHSTATE(1, "ProcessMessage() - Ammasso - INFO: Checking for re-broadcast");
1338 // If this packet is part of a broadcast, pass it on to the next nodes
1339 #if CMK_BROADCAST_SPANNING_TREE
1341 if (rank == DGRAM_BROADCAST
1342 #if CMK_NODE_QUEUE_AVAILABLE
1343 || rank == DGRAM_NODEBROADCAST
1344 #endif
1345 ) SendSpanningChildren(NULL, 0, len, msg, broot, rank);
1347 #elif CMK_BROADCAST_HYPERCUBE
1349 if (rank == DGRAM_BROADCAST
1350 #if CMK_NODE_QUEUE_AVAILABLE
1351 || rank == DGRAM_NODEBROADCAST
1352 #endif
1354 MACHSTATE(3, "ProcessMessage() - INFO: Calling SendHypercube()...");
1355 SendHypercube(NULL, 0, len, msg, broot, rank);
1357 #endif
1359 AMMASSO_STATS_END(ProcessMessage)
1360 return needAck;
1363 // DMK : NOTE : I attempted to put this is a single function to be called by many places but it was getting
1364 // way too messy... I'll leave the code here for now in hopes of being able to do this in the future.
1365 /*************************************************
1366 // NOTE: Return non-zero if a message was received, zero otherwise
1367 int PollForMessage(cc_cq_handle_t cq) {
1369 cc_status_t rtn;
1370 cc_wc_t wc;
1371 int tmp, fromNode;
1372 cc_qp_query_attrs_t qpAttr;
1374 // Poll the Completion Queue for Work Completions
1375 rtn = cc_cq_poll(contextBlock->rnic, cq, &wc);
1377 // Just return 0 if there are no Work Completions
1378 if (rtn == CCERR_CQ_EMPTY) return 0;
1380 // Let the user know if there was an error
1381 if (rtn != CC_OK) {
1382 MACHSTATE2(3, "PollForMessage() - Ammasso - ERROR: Unable to poll Completion Queue: %d, \"%s\"", rtn, cc_status_to_string(rtn));
1385 // Let the user know if there was an error
1386 if (wc.status != CC_OK) {
1387 MACHSTATE2(3, "PollForMessage() - Ammasso - ERROR: Work Completion Status: %d, \"%s\"", wc.status, cc_status_to_string(wc.status));
1390 // Depending on the WC.type, react accordingly
1391 switch (wc.wr_type) {
1393 // wc.wr_type == CC_WR_TYPE_RDMA_READ
1394 case CC_WR_TYPE_RDMA_READ:
1396 // DMK : TODO : Fill this in
1398 break;
1400 // wc.wr_type == CC_WR_TYPE_RDMA_WRITE
1401 case CC_WR_TYPE_RDMA_WRITE:
1403 // DMK : TODO : Fill this in
1405 break;
1407 // wc.wr_type == CC_WR_TYPE_SEND (Message was sent)
1408 case CC_WR_TYPE_SEND:
1410 // This will be called as a result of a cc_qp_post_sq()'s Send Work Request in DeliverViaNetwork finishing
1411 // Free the STag that was created
1412 rtn = cc_stag_dealloc(contextBlock->rnic, wc.stag);
1414 break;
1416 // wc.wr_type == CC_WR_TYPE_RECV (Message was received)
1417 case CC_WR_TYPE_RECV:
1419 // Check for CCERR_FLUSHED which means "we're shutting down" according to the example code
1420 if (wc.status == CCERR_FLUSHED)
1421 break;
1423 //// Make sure that node is defined
1424 //if (node == NULL) {
1425 // MACHSTATE(3, "PollForMessage() - WARNING: Received message but node == NULL... ignoring...");
1426 // break;
1429 // HORRIBLE HACK!!! -- In the OtherNode structure, a myNode was placed directly after the rq_wr structure. This
1430 // was done so this function could tell from which node the received message came.
1431 fromNode = *((int*)((char*)wc.wr_id + sizeof(cc_rq_wr_t)));
1432 //fromNode = node->myNode;
1435 int j;
1436 MACHSTATE1(3, "PollForMessage() - INFO: fromNode = %d", fromNode);
1437 MACHSTATE1(3, "PollForMessage() - INFO: wc.status = %d", wc.status);
1438 MACHSTATE1(3, "PollForMessage() - INFO: wc.bytes_rcvd = %d", wc.bytes_rcvd);
1439 MACHSTATE1(3, "PollForMessage() - INFO: wc.wr_id = %d", wc.wr_id);
1440 //MACHSTATE1(3, "PollForMessage() - INFO: &(nodes[0].rq_wr) = %p", &(nodes[0].rq_wr));
1441 //MACHSTATE1(3, "PollForMessage() - INFO: &(nodes[1].rq_wr) = %p", &(nodes[1].rq_wr));
1442 //MACHSTATE(3, "PollForMessage() - INFO: Raw Message Data:");
1443 //for (j = 0; j < wc.bytes_rcvd; j++) {
1444 // MACHSTATE1(3, " [%02x]", nodes[fromNode].recv_buf[j]);
1448 // Process the Message
1449 ProcessMessage(nodes[fromNode].recv_buf, wc.bytes_rcvd);
1451 // Re-Post the receive buffer
1452 tmp = 0;
1453 rtn = cc_qp_post_rq(contextBlock->rnic, nodes[fromNode].qp, &(nodes[fromNode].rq_wr), 1, &tmp);
1454 if (rtn != CC_OK) {
1455 // Let the user know what happened
1456 MACHSTATE2(3, "PollForMessage() - Ammasso - ERROR: Unable to Post Work Request to Queue Pair: %d, \"%s\"", rtn, cc_status_to_string(rtn));
1459 break;
1461 // default
1462 default:
1463 MACHSTATE1(3, "PollForMessage() - Ammasso - WARNING - Unknown WC.wr_type: %d", wc.wr_type);
1464 break;
1466 } // end switch (wc.wr_type)
1468 return 1;
1470 ****************************************************/
1473 static void CommunicationServer_nolock(int withDelayMs) {
1475 int i;
1477 MACHSTATE(2, "CommunicationServer_nolock start {");
1479 // DMK : TODO : In spare time (here), check for messages and/or completions and/or errors
1480 //while(PollForMessage(contextBlock->send_cq)); // Clear all the completed sends
1481 //while(PollForMessage(contextBlock->recv_cq)); // Keep looping while there are still messages that have been received
1483 for (i = 0; i < contextBlock->numNodes; i++) {
1484 if (i == contextBlock->myNode) continue;
1485 //CompletionEventHandlerWithAckFlag(contextBlock->rnic, nodes[i].recv_cq, &(nodes[i]), 1);
1486 //CompletionEventHandler(contextBlock->rnic, nodes[i].send_cq, &(nodes[i]));
1487 CheckRecvBufForMessage(&(nodes[i]));
1490 MACHSTATE(2, "}CommunicationServer_nolock end");
1494 /* CommunicationServer()
1495 * where:
1496 * 0: from smp thread
1497 * 1: from interrupt
1498 * 2: from worker thread
1500 static void CommunicationServer(int withDelayMs, int where) {
1502 //// Check to see if we are running in standalone mode... if so, do nothing
1503 //if (Cmi_charmrun_pid == 0)
1504 // return;
1506 AMMASSO_STATS_START(CommunicationServer)
1508 MACHSTATE2(2, "CommunicationServer(%d) from %d {",withDelayMs, where);
1510 // Check to see if this call is from an interrupt
1511 if (where == 1) {
1513 // Don't service charmrum if converse exits, this fixed a hang bug
1514 if (!machine_initiated_shutdown)
1515 ServiceCharmrun_nolock();
1517 // Don't process any further for interrupts
1518 return;
1521 CmiCommLock();
1522 inProgress[CmiMyRank()] += 1;
1523 CommunicationServer_nolock(withDelayMs);
1524 CmiCommUnlock();
1525 inProgress[CmiMyRank()] -= 1;
1527 #if CMK_IMMEDIATE_MSG
1528 if (where == 0)
1529 CmiHandleImmediate();
1530 #endif
1532 MACHSTATE(2,"} CommunicationServer");
1534 AMMASSO_STATS_END(CommunicationServer)
1539 /* CmiMachineExit()
1542 void CmiMachineExit(void) {
1544 char buf[128];
1545 cc_status_t rtn;
1546 int i;
1548 MACHSTATE(2, "CmiMachineExit() - INFO: Called...");
1550 // DMK - This is a sleep to help keep the output from the stat displays below separated in the program output
1551 if (contextBlock->myNode)
1552 usleep(10000*contextBlock->myNode);
1554 AMMASSO_STATS_DISPLAY(MachineInit)
1556 AMMASSO_STATS_DISPLAY(AmmassoDoIdle)
1558 AMMASSO_STATS_DISPLAY(DeliverViaNetwork)
1559 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_pre_lock)
1560 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_lock)
1561 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_post_lock)
1562 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_send)
1564 AMMASSO_STATS_DISPLAY(getQPSendBuffer)
1565 AMMASSO_STATS_DISPLAY(getQPSendBuffer_lock)
1566 AMMASSO_STATS_DISPLAY(getQPSendBuffer_CEH)
1567 AMMASSO_STATS_DISPLAY(getQPSendBuffer_loop)
1569 AMMASSO_STATS_DISPLAY(sendDataOnQP)
1570 AMMASSO_STATS_DISPLAY(sendDataOnQP_pre_send)
1571 AMMASSO_STATS_DISPLAY(sendDataOnQP_send)
1572 AMMASSO_STATS_DISPLAY(sendDataOnQP_post_send)
1574 AMMASSO_STATS_DISPLAY(sendDataOnQP_1024)
1575 AMMASSO_STATS_DISPLAY(sendDataOnQP_2048)
1576 AMMASSO_STATS_DISPLAY(sendDataOnQP_4096)
1577 AMMASSO_STATS_DISPLAY(sendDataOnQP_16384)
1578 AMMASSO_STATS_DISPLAY(sendDataOnQP_over)
1580 AMMASSO_STATS_DISPLAY(AsynchronousEventHandler)
1581 AMMASSO_STATS_DISPLAY(CompletionEventHandler)
1582 AMMASSO_STATS_DISPLAY(ProcessMessage)
1583 AMMASSO_STATS_DISPLAY(processAmmassoControlMessage)
1585 AMMASSO_STATS_DISPLAY(sendAck)
1587 // TODO: It would probably be a good idea to make a "closing connection" control message so all of the nodes
1588 // can agree to close the connections in a graceful way when they are all done. Similar to the READY packet
1589 // but for closing so the closing is graceful (and the other node does not try to reconnect). This barrier
1590 // would go here.
1592 // Check to see if in stand-alone mode
1593 if (Cmi_charmrun_pid != 0 && contextBlock->rnic != -1) {
1595 // Do Clean-up for each of the OtherNode structures (the connections and related data)
1596 for (i = 0; i < contextBlock->numNodes; i++) {
1598 // Close all the connections and destroy all the QPs (and related data structures)
1599 closeQPConnection(nodes + i, 1); // The program is closing so destroy the QPs
1601 // Destroy the locks
1602 CmiDestroyLock(nodes[i].sendBufLock);
1603 CmiDestroyLock(nodes[i].send_next_lock);
1604 CmiDestroyLock(nodes[i].recv_expect_lock);
1607 // DMK : TODO : Clean up the buffer pool here (unregister the memory from the RNIC and free it)
1609 //// Close the RNIC interface
1610 rtn = cc_rnic_close(contextBlock->rnic);
1611 if (rtn != CC_OK) {
1612 MACHSTATE2(5, "CmiMachineExit() - ERROR: Unable to close the RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
1613 sprintf(buf, "CmiMachineExit() - ERROR: Unable to close the RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
1614 CmiAbort(buf);
1617 MACHSTATE(2, "CmiMachineExit() - INFO: RNIC Closed.");
1623 OtherNode getNodeFromQPId(cc_qp_id_t qp_id) {
1625 int i;
1627 // DMK : FIXME : This is a horrible linear search through the nodes table to find the node structure that has this
1628 // qp_id but currently this is the fastest way that I know how to do this... In the future to speed it up.
1629 for (i = 0; i < contextBlock->numNodes; i++)
1630 if (nodes[i].qp_id == qp_id)
1631 return (nodes + i);
1633 return NULL;
1636 OtherNode getNodeFromQPHandle(cc_qp_handle_t qp) {
1638 int i;
1640 // DMK : FIXME : This is a horrible linear search through the nodes table to find the node structure that has this
1641 // qp_id but currently this is the fastest way that I know how to do this... In the future to speed it up.
1642 for (i = 0; i < contextBlock->numNodes; i++)
1643 if (nodes[i].qp == qp)
1644 return (nodes + i);
1646 return NULL;
1650 void AsynchronousEventHandler(cc_rnic_handle_t rnic, cc_event_record_t *er, void *cb) {
1652 int nodeNumber, i;
1653 OtherNode node;
1654 cc_ep_handle_t connReqEP;
1655 cc_status_t rtn;
1656 char buf[64];
1657 cc_qp_modify_attrs_t modAttrs;
1658 AmmassoPrivateData *priv;
1659 AmmassoToken *token;
1661 AMMASSO_STATS_START(AsynchronousEventHandler)
1663 MACHSTATE2(2, "AsynchronousEventHandler() - INFO: Called... event_id = %d, \"%s\"", er->event_id, cc_event_id_to_string(er->event_id));
1665 // Do a couple of checks... the reasons for these stem from some example code
1666 if (er->rnic_handle != contextBlock->rnic) {
1667 MACHSTATE(5, "AsynchronousEventHandler() - WARNING: er->rnic_handle != contextBlock->rnic");
1669 if (er->rnic_user_context != contextBlock) {
1670 MACHSTATE(5, "AsynchronousEventHandler() - WARNING: er->rnic_user_context != contextBlock");
1673 // Based on the er->event_id, do something about it
1674 switch (er->event_id) {
1676 // er->event_id == CCAE_LLP_CLOSE_COMPLETE
1677 case CCAE_LLP_CLOSE_COMPLETE:
1678 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Connection Closed.");
1680 // Get the OtherNode structure for the other node
1681 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: er->resource_indicator = %d (CC_RES_IND_QP: %d)", er->resource_indicator, CC_RES_IND_QP);
1682 node = getNodeFromQPId(er->resource_id.qp_id);
1683 if (node == NULL) {
1684 MACHSTATE(5, "AsynchronousEventHandler() - ERROR: Unable to find QP from QP ID... Unable to create/recover connection");
1685 break;
1688 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
1689 #if CMK_SHARED_VARS_UNAVAILABLE
1690 while (node->sendBufLock != 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
1691 #endif
1692 CmiLock(node->sendBufLock);
1694 // Set the state of the connection to closed
1695 node->connectionState = QP_CONN_STATE_CONNECTION_CLOSED;
1697 CmiUnlock(node->sendBufLock);
1698 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
1700 break;
1702 // er->event_id == CCAE_CONNECTION_REQUEST
1703 case CCAE_CONNECTION_REQUEST:
1705 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: Incomming Connection Request -> %s:%d",
1706 inet_ntoa(*(struct in_addr*) &(er->event_data.connection_request.laddr)),
1707 ntohs(er->event_data.connection_request.lport)
1710 connReqEP = er->event_data.connection_request.cr_handle;
1712 priv = (AmmassoPrivateData*)er->event_data.connection_request.private_data;
1714 nodeNumber = priv->node;
1716 MACHSTATE1(3, "AsynchronousEventHandler() - INFO: Connection Request from node %d", nodeNumber);
1718 if (nodeNumber < 0 || nodeNumber >= contextBlock->numNodes) {
1720 // Refuse the connection and log the rejection
1721 MACHSTATE1(1, "AsynchronousEventHandler() - WARNING: Unknown entity attempting to connect (node %d)... rejecting connection.", nodeNumber);
1722 cc_cr_reject(contextBlock->rnic, connReqEP);
1724 } else {
1727 { int j;
1728 for (j = 0; j < 16; j++) {
1729 MACHSTATE2(3, " private_data[%d] = %02X", j, ((char*)er->event_data.connection_request.private_data)[j]);
1733 // Grab the remote stag and to, by the protocol, all the tokens are
1734 // consecutive in memory
1735 for (i=0; i<(AMMASSO_INITIAL_BUFFERS/(contextBlock->numNodes-1)); ++i) {
1736 LIST_DEQUEUE(contextBlock->,freeTokens,token);
1737 token->wr.wr_u.rdma_write.remote_stag = priv->stag;
1738 token->wr.wr_u.rdma_write.remote_to = priv->to + (i * sizeof(AmmassoBuffer));
1739 token->remoteBuf = (AmmassoBuffer*)(priv->to + (i * sizeof(AmmassoBuffer)));
1740 LIST_ENQUEUE(nodes[nodeNumber].,sendTokens,token);
1743 nodes[nodeNumber].ack_sq_wr->wr_u.rdma_write.remote_to = priv->ack_to;
1744 nodes[nodeNumber].ack_sq_wr->wr_u.rdma_write.remote_stag = priv->stag;
1746 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: tokens starting from %p, stag = %d",priv->to,priv->stag);
1748 // Keep a copy of the end point handle
1749 nodes[nodeNumber].cr = connReqEP;
1751 // Accept the connection
1752 priv = (AmmassoPrivateData*)buf;
1753 priv->node = contextBlock->myNode;
1754 priv->stag = nodes[nodeNumber].recv_buf->stag;
1755 priv->to = (cc_uint64_t)nodes[nodeNumber].recv_buf;
1756 priv->ack_to = (cc_uint64_t)nodes[nodeNumber].directAck;
1758 MACHSTATE1(1, " node = %d", priv->node);
1759 MACHSTATE1(1, " stag = %d", priv->stag);
1760 MACHSTATE1(1, " to = %p", priv->to);
1762 { int j;
1763 MACHSTATE2(3, " buf = %p, priv = %p", buf, priv);
1764 for (j = 0; j < 16; j++) {
1765 MACHSTATE2(3, " ((char*)priv)[%d] = %02X", j, ((char*)priv)[j]);
1769 MACHSTATE(1, "AsynchronousEventHandler() - Ammasso - INFO: Accepting Connection...");
1771 rtn = cc_cr_accept(contextBlock->rnic, connReqEP, nodes[nodeNumber].qp, sizeof(AmmassoPrivateData), (cc_uint8_t*)priv);
1772 if (rtn != CC_OK) {
1774 // Let the user know what happened
1775 MACHSTATE1(3, "AsynchronousEventHandler() - Ammasso - WARNING: Unable to accept connection from node %d", nodeNumber);
1777 } else {
1779 MACHSTATE1(3, "AsynchronousEventHandler() - Ammasso - INFO: Accepted Connection from node %d", nodeNumber);
1781 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
1782 #if CMK_SHARED_VARS_UNAVAILABLE
1783 while (nodes[nodeNumber].sendBufLock != 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
1784 #endif
1785 CmiLock(nodes[nodeNumber].sendBufLock);
1787 // Indicate that this connection has been made only if is it the first time the connection was made (don't count re-connects)
1788 if (nodes[nodeNumber].connectionState == QP_CONN_STATE_PRE_CONNECT)
1789 (contextBlock->outstandingConnectionCount)--;
1790 nodes[nodeNumber].connectionState = QP_CONN_STATE_CONNECTED;
1792 CmiUnlock(nodes[nodeNumber].sendBufLock);
1793 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
1795 MACHSTATE1(1, "AsynchronousEventHandler() - Connected to node %d", nodes[nodeNumber].myNode);
1799 break;
1801 // er->event_id == CCAE_ACTIVE_CONNECT_RESULTS
1802 case CCAE_ACTIVE_CONNECT_RESULTS:
1803 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Connection Results");
1805 // Get the OtherNode structure for the other node
1806 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: er->resource_indicator = %d (CC_RES_IND_QP: %d)", er->resource_indicator, CC_RES_IND_QP);
1807 node = getNodeFromQPId(er->resource_id.qp_id);
1808 if (node == NULL) {
1809 MACHSTATE(5, "AsynchronousEventHandler() - ERROR: Unable to find QP from QP ID... Unable to create/recover connection");
1810 break;
1813 // Check to see if the connection was established or not
1814 if (er->event_data.active_connect_results.status != CC_CONN_STATUS_SUCCESS) {
1816 MACHSTATE(5, " Connection Failed.");
1817 MACHSTATE1(5, " - status: \"%s\"", cc_connect_status_to_string(er->event_data.active_connect_results.status));
1818 MACHSTATE1(5, " - private_data_length = %d", er->event_data.active_connect_results.private_data_length);
1819 displayQueueQuery(node->qp, &(node->qp_attrs));
1821 // Attempt to reconnect (try again... don't give up... you can do it!)
1822 reestablishQPConnection(node);
1824 } else { // Connection was a success
1826 MACHSTATE(3, " Connection Success...");
1827 MACHSTATE2(1, " l -> %s:%d", inet_ntoa(*(struct in_addr*) &(er->event_data.active_connect_results.laddr)), ntohs(er->event_data.active_connect_results.lport));
1828 MACHSTATE2(1, " r -> %s:%d", inet_ntoa(*(struct in_addr*) &(er->event_data.active_connect_results.raddr)), ntohs(er->event_data.active_connect_results.rport));
1829 MACHSTATE4(1, " private_data_length = %d (%d, %d, %d)", er->event_data.active_connect_results.private_data_length, sizeof(int), sizeof(cc_stag_t), sizeof(cc_uint64_t));
1831 priv = (AmmassoPrivateData*)((char*)er->event_data.active_connect_results.private_data);
1833 { int j;
1834 MACHSTATE2(1, " private_data = %p, priv = %p", er->event_data.active_connect_results.private_data, priv);
1835 for (j = 0; j < 16; j++) {
1836 MACHSTATE3(1, " private_data[%d] = %02X (priv:%02X)", j, ((char*)(er->event_data.active_connect_results.private_data))[j], ((char*)priv)[j]);
1840 // Grab the remote stag and to, by the protocol, all the tokens are
1841 // consecutive in memory
1842 for (i=0; i<(AMMASSO_INITIAL_BUFFERS/(contextBlock->numNodes-1)); ++i) {
1843 LIST_DEQUEUE(contextBlock->,freeTokens,token);
1844 token->wr.wr_u.rdma_write.remote_stag = priv->stag;
1845 token->wr.wr_u.rdma_write.remote_to = priv->to + (i * sizeof(AmmassoBuffer));
1846 token->remoteBuf = (AmmassoBuffer*)(priv->to + (i * sizeof(AmmassoBuffer)));
1847 LIST_ENQUEUE(node->,sendTokens,token);
1850 node->ack_sq_wr->wr_u.rdma_write.remote_to = priv->ack_to;
1851 node->ack_sq_wr->wr_u.rdma_write.remote_stag = priv->stag;
1853 MACHSTATE1(3, " node = %d", priv->node);
1854 MACHSTATE1(3, " stag = %d", priv->stag);
1855 MACHSTATE1(3, " to = %p", priv->to);
1857 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: tokens from %p, stag = %d",priv->to,priv->stag);
1859 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
1860 #if CMK_SHARED_VARS_UNAVAILABLE
1861 while (node->sendBufLock != 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
1862 #endif
1863 CmiLock(node->sendBufLock);
1865 // Indicate that this connection has been made only if it is the first time the connection was made (don't count re-connects)
1866 if (node->connectionState == QP_CONN_STATE_PRE_CONNECT)
1867 (contextBlock->outstandingConnectionCount)--;
1868 node->connectionState = QP_CONN_STATE_CONNECTED;
1870 CmiUnlock(node->sendBufLock);
1871 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
1873 MACHSTATE1(1, "AsynchronousEventHandler() - Connected to node %d", node->myNode);
1876 break;
1878 // er->event_id is set to a value that indicates the QP will transition into the TERMINATE state
1879 // Remotely Detected Errors
1880 case CCAE_TERMINATE_MESSAGE_RECEIVED:
1881 // LLP Errors
1882 case CCAE_LLP_SEGMENT_SIZE_INVALID:
1883 case CCAE_LLP_INVALID_CRC:
1884 case CCAE_LLP_BAD_FPDU:
1885 // Remote Operation Errors
1886 case CCAE_INVALID_DDP_VERSION:
1887 case CCAE_INVALID_RDMA_VERSION:
1888 case CCAE_UNEXPECTED_OPCODE:
1889 case CCAE_INVALID_DDP_QUEUE_NUMBER:
1890 case CCAE_RDMA_READ_NOT_ENABLED:
1891 case CCAE_NO_L_BIT:
1892 // Remote Protection Errors (not associated with RQ)
1893 case CCAE_TAGGED_INVALID_STAG:
1894 case CCAE_TAGGED_BASE_BOUNDS_VIOLATION:
1895 case CCAE_TAGGED_ACCESS_RIGHTS_VIOLATION:
1896 case CCAE_TAGGED_INVALID_PD:
1897 case CCAE_WRAP_ERROR:
1898 // Remote Closing Error
1899 case CCAE_BAD_LLP_CLOSE:
1900 // Remote Protection Errors (associated with RQ)
1901 case CCAE_INVALID_MSN_RANGE:
1902 case CCAE_INVALID_MSN_GAP:
1903 // IRRQ Proection Errors
1904 case CCAE_IRRQ_INVALID_STAG:
1905 case CCAE_IRRQ_BASE_BOUNDS_VIOLATION:
1906 case CCAE_IRRQ_ACCESS_RIGHTS_VIOLATION:
1907 case CCAE_IRRQ_INVALID_PD:
1908 case CCAE_IRRQ_WRAP_ERROR: // For these, the VERBS
1909 case CCAE_IRRQ_OVERFLOW: // spec is 100% clear as to
1910 case CCAE_IRRQ_MSN_GAP: // what to do... but I think
1911 case CCAE_IRRQ_MSN_RANGE: // this is correct... DMK
1912 // Local Errors - ??? Not 100% sure about these (they are written differently in cc_ae.h than in the verbs spec... but I think they go here)
1913 case CCAE_CQ_SQ_COMPLETION_OVERFLOW:
1914 case CCAE_CQ_RQ_COMPLETION_ERROR:
1915 case CCAE_QP_SRQ_WQE_ERROR:
1917 // er->event_id is set to a value that indicates the QP will transition into the ERROR state
1918 // LLP Errors
1919 case CCAE_LLP_CONNECTION_LOST:
1920 case CCAE_LLP_CONNECTION_RESET:
1921 // Remote Closing Error
1922 case CCAE_BAD_CLOSE:
1923 // Local Errors
1924 case CCAE_QP_LOCAL_CATASTROPHIC_ERROR:
1925 MACHSTATE3(5, "AsynchronousEventHandler() - WARNING: Connection Error \"%s\" - er->resource_indicator = %d (CC_RES_IND_QP: %d)", cc_event_id_to_string(er->event_id), er->resource_indicator, CC_RES_IND_QP);
1926 CmiPrintf("AsynchronousEventHandler() - WARNING: Connection Error \"%s\" - er->resource_indicator = %d (CC_RES_IND_QP: %d)\n", cc_event_id_to_string(er->event_id), er->resource_indicator, CC_RES_IND_QP);
1928 // Figure out which QP went down
1929 node = getNodeFromQPId(er->resource_id.qp_id);
1930 if (node == NULL) {
1931 MACHSTATE(5, "AsynchronousEventHandler() - ERROR: Unable to find QP from QP ID... Unable to recover connection");
1932 break;
1935 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
1936 #if CMK_SHARED_VARS_UNAVAILABLE
1937 while (node->sendBufLock != 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
1938 #endif
1939 CmiLock(node->sendBufLock);
1941 // Indicate that the connection was lost or will be lost in the very near future (depending on the er->event_id)
1942 node->connectionState = QP_CONN_STATE_CONNECTION_LOST;
1944 CmiUnlock(node->sendBufLock);
1945 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
1947 MACHSTATE1(1, "AsynchronousEventHandler() - Connection ERROR Occured - node %d", node->myNode);
1948 displayQueueQuery(node->qp, &(node->qp_attrs));
1950 // Attempt to bring the connection back to life
1951 reestablishQPConnection(node);
1953 break;
1955 // er->event_id == ???
1956 default:
1957 MACHSTATE1(5, "AsynchronousEventHandler() - WARNING - Unknown/Unexpected Asynchronous Event: er->event_id = %d", er->event_id);
1958 break;
1960 } // end switch (er->event_id)
1962 AMMASSO_STATS_END(AsynchronousEventHandler)
1965 void CheckRecvBufForMessage(OtherNode node) {
1967 int needAck;
1968 unsigned int len;
1969 AmmassoBuffer *current;
1971 MACHSTATE1(2, "CheckRecvBufForMessage() - INFO: Called... (node->recv_buf = %p)...", node->recv_buf);
1973 // Process all messages, identified by a length not zero
1974 while ((len = node->recv_buf->tail.length) != 0) {
1976 MACHSTATE1(2, " (len = %d)...", len);
1978 // Start by zero-ing out the length of the message so it isn't picked up again
1979 node->recv_buf->tail.length = 0;
1980 (*node->remoteAck) ++;
1981 node->messagesNotYetAcknowledged ++;
1983 current = node->recv_buf;
1984 // Move the recv_buf back at the end of the receiving queue. This works also
1985 // if the queue if formed by a single element
1986 node->last_recv_buf->next = node->recv_buf;
1987 node->last_recv_buf = node->recv_buf;
1988 node->recv_buf = node->recv_buf->next;
1989 node->last_recv_buf->next = NULL;
1991 // Process the messsage, NOTE that the message start is aligned to 8 bytes!
1992 needAck = ProcessMessage(&(current->buf[AMMASSO_BUFSIZE - ALIGN8(len)]), len, &current->tail, node);
1994 // If an ACK is needed in response to this message, send one
1995 if (needAck) sendAck(node);
2001 void CompletionEventHandler(cc_rnic_handle_t rnic, cc_cq_handle_t cq, void *cb) {
2002 CompletionEventHandlerWithAckFlag(rnic, cq, cb, 0);
2005 void CompletionEventHandlerWithAckFlag(cc_rnic_handle_t rnic, cc_cq_handle_t cq, void *cb, int breakOnAck) {
2007 OtherNode node = (OtherNode)cb;
2008 cc_status_t rtn;
2009 cc_wc_t wc;
2010 int tmp;
2011 cc_rq_wr_t *rq_wr;
2012 char* recvBuf;
2013 char needAck;
2015 AMMASSO_STATS_START(CompletionEventHandler)
2017 //MACHSTATE(3, "CompletionEventHandler() - Called...");
2019 //// Reset the request notification type
2020 //rtn = cc_cq_request_notification(contextBlock->rnic, cq, CC_CQ_NOTIFICATION_TYPE_NEXT);
2021 //if (rtn != CC_OK) {
2022 // // Let the user know what happened
2023 // MACHSTATE2(3, "CompletionEventHandler() - Ammasso - WARNING - Unable to reset CQ request notification type: %d, \"%s\"", rtn, cc_status_to_string(rtn));
2026 // Keep polling the Completion Queue until it is empty of Work Completions
2027 while (1) {
2029 //if (PollForMessage(cq) == 0) break;
2031 // Poll the Completion Queue
2032 rtn = cc_cq_poll(contextBlock->rnic, cq, &wc);
2033 //if (rtn == CCERR_CQ_EMPTY) break;
2034 if (rtn != CC_OK) break;
2036 // Let the user know if there was an error
2037 if (wc.status != CC_OK) {
2038 MACHSTATE2(3, "CompletionEventHandler() - Ammasso - WARNING - WC status not CC_OK: %d, \"%s\"", wc.status, cc_status_to_string(wc.status));
2040 // DMK : TODO : Add code here that will recover from a Work Request Gone Wrong... i.e. - if a connection is lost, the
2041 // WRs that are still pending in the queues will complete in failure with a FLUSHED error status... these
2042 // WRs will have to be reposed after the QP is back into the IDLE state in reestablishQPConnection (or later).
2045 // Depending on the WC.type, react accordingly
2046 switch (wc.wr_type) {
2048 // wc.wr_type == CC_WR_TYPE_RDMA_READ
2049 case CC_WR_TYPE_RDMA_READ:
2050 // DMK : TODO : Fill this in
2051 break;
2053 // wc.wr_type == CC_WR_TYPE_RDMA_WRITE
2054 case CC_WR_TYPE_RDMA_WRITE:
2055 // DMK : TODO : Fill this in
2056 break;
2058 // wc.wr_type == CC_WR_TYPE_SEND
2059 case CC_WR_TYPE_SEND:
2061 // This will be called as a result of a cc_qp_post_sq()'s Send Work Request in DeliverViaNetwork finishing
2063 // DMK : NOTE : I left this here as an example (if message memory is registered with the RNIC on the fly
2064 // in DeliverViaNetwork(), the memory would have to be un-registered/un-pinned here).
2065 // Free the STag that was created
2066 //rtn = cc_stag_dealloc(contextBlock->rnic, wc.stag);
2068 // Unlock the send_buf so it can be used again
2069 // DMK : TODO : Make this a real lock
2070 //MACHSTATE1(3, "CompletionEventHandler() - INFO: Send completed... clearing sendBufLock for next send (%p)", node);
2071 //CmiUnlock(node->sendBufLock);
2073 MACHSTATE1(3, "CompletionEventHandler() - INFO: Send completed with node %d... still waiting for acknowledge...", node->myNode);
2075 break;
2077 // wc.wr_type == CC_WR_TYPE_RECV
2078 case CC_WR_TYPE_RECV:
2080 // Check for CCERR_FLUSHED which mean "we're shutting down" according to the example code
2081 if (wc.status == CCERR_FLUSHED) {
2082 displayQueueQuery(node->qp, &(node->qp_attrs));
2083 break;
2086 // Make sure that node is defined
2087 if (node == NULL) {
2088 MACHSTATE(3, "CompletionEventHandler() - WARNING: Received message but node == NULL... ignoring...");
2089 break;
2092 displayQueueQuery(node->qp, &(node->qp_attrs));
2095 int j;
2096 MACHSTATE1(3, "CompletionEventHandler() - INFO: fromNode = %d", node->myNode);
2097 MACHSTATE1(3, "CompletionEventHandler() - INFO: wc.status = %d", wc.status);
2098 MACHSTATE1(3, "CompletionEventHandler() - INFO: wc.bytes_rcvd = %d", wc.bytes_rcvd);
2099 MACHSTATE1(3, "CompletionEventHandler() - INFO: node = %p", node);
2100 //MACHSTATE1(3, "CompletionEventHandler() - INFO: &(nodes[0]) = %p", &(nodes[0]));
2101 //MACHSTATE1(3, "CompletionEventHandler() - INFO: &(nodes[1]) = %p", &(nodes[1]));
2102 //MACHSTATE1(3, "CompletionEventHandler() - INFO: nodes[0].recv_buf = %p", nodes[0].recv_buf);
2103 //MACHSTATE1(3, "CompletionEventHandler() - INFO: nodes[1].recv_buf = %p", nodes[1].recv_buf);
2104 //MACHSTATE(3, "CompletionEventHandler() - INFO: Raw Message Data:");
2105 //for (j = 0; j < DGRAM_HEADER_SIZE; j++) {
2106 // //MACHSTATE1(3, " [%02x]", node->recv_buf[j]);
2107 // MACHSTATE2(3, " [0:%02x, 1:%02x]", nodes[0].recv_buf[j], nodes[1].recv_buf[j]);
2111 // Get the address of the receive buffer used
2112 rq_wr = (cc_rq_wr_t*)wc.wr_id;
2114 // Process the Message
2115 ProcessMessage((char*)(rq_wr->local_sgl.sge_list[0].to), wc.bytes_rcvd, &needAck);
2116 //ProcessMessage(node->recv_buf, wc.bytes_rcvd);
2118 // Re-Post the receive buffer
2119 // DMK : NOTE : I'm doing this after processing the message because I'm not 100% sure if the RNIC is "smart" enought
2120 // to not reuse the buffer if still in this event handler.
2121 tmp = 0;
2122 rtn = cc_qp_post_rq(contextBlock->rnic, node->qp, rq_wr, 1, &tmp);
2123 if (rtn != CC_OK || tmp != 1) {
2124 // Let the user know what happened
2125 MACHSTATE2(3, "CompletionEventHandler() - Ammasso - ERROR: Unable to Post Work Request to Queue Pair: %d, \"%s\"", rtn, cc_status_to_string(rtn));
2128 // Send and ACK to indicate this node is ready to receive another message
2129 if (needAck)
2130 sendAck(node);
2132 // Check to see if the function should return (stop polling for messages)
2133 if (breakOnAck && (!needAck)) // NOTE: Should only not need an ACK if ACK arrived or error
2134 return;
2136 break;
2138 // default
2139 default:
2140 MACHSTATE1(3, "CompletionEventHandler() - Ammasso - WARNING - Unknown WC.wr_type: %d", wc.wr_type);
2141 break;
2143 } // end switch (wc.wr_type)
2144 } // end while (1)
2146 AMMASSO_STATS_END(CompletionEventHandler)
2150 // NOTE: DMK: The code here follows from open_tcp_sockets() in machine-tcp.c.
2151 void CmiAmmassoOpenQueuePairs(void) {
2153 char buf[128];
2154 int i, myNode, numNodes, keepWaiting;
2155 int buffersPerNode;
2156 cc_qp_create_attrs_t qpCreateAttrs;
2157 cc_status_t rtn;
2158 cc_inet_addr_t address;
2159 cc_inet_port_t port;
2160 AmmassoBuffer *sendBuffer;
2161 AmmassoBuffer *bufferScanner;
2162 AmmassoToken *newTokens, *tokenScanner;
2163 cc_data_addr_t *newSgls;
2164 cc_stag_index_t newStagIndex;
2165 ammasso_ack_t *ack_location;
2168 MACHSTATE1(2, "CmiAmmassoOpenQueuePairs() - INFO: Called... (Cmi_charmrun_pid = %d)", Cmi_charmrun_pid);
2170 // Check for stand-alone mode... no connections needed
2171 if (Cmi_charmrun_pid == 0) return;
2173 if (nodes == NULL) {
2174 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - WARNING: nodes = NULL");
2175 return;
2177 MACHSTATE1(1, "CmiAmmassoOpenQueuePairs() - INFO: nodes = %p (remove this line)", nodes);
2179 // DMK : FIXME : At this point, CmiMyNode() seems to be returning 0 on any node while _Cmi_mynode is
2180 // !!!!!!!!!!! returning the correct value. However, _Cmi_mynode and _Cmi_numnodes may not work with
2181 // !!!!!!!!!!! SMP. Resolve this issue and fix this code. For now, using _Cmi_mynode and _Cmi_numnodes.
2182 //myNode = CmiMyNode();
2183 //numNodes = CmiNumNodes();
2184 contextBlock->myNode = myNode = _Cmi_mynode;
2185 contextBlock->numNodes = numNodes = _Cmi_numnodes;
2186 contextBlock->outstandingConnectionCount = contextBlock->numNodes - 1; // No connection with self
2187 contextBlock->nodeReadyCount = contextBlock->numNodes - 1; // No ready packet from self
2188 contextBlock->conditionRegistered = 0;
2190 MACHSTATE2(1, "CmiAmmassoOpenQueuePairs() - INFO: myNode = %d, numNodes = %d", myNode, numNodes);
2192 CmiAssert(sizeof(AmmassoBuffer) == (sizeof(AmmassoBuffer)&(~63)));
2194 // Try to allocate the memory for the receiving buffers
2195 contextBlock->freeRecvBuffers = (AmmassoBuffer*) CmiAlloc(AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t));
2197 ack_location = (ammasso_ack_t*)&(contextBlock->freeRecvBuffers[AMMASSO_INITIAL_BUFFERS]);
2198 if (contextBlock->freeRecvBuffers == NULL) {
2200 // Attempt to close the RNIC
2201 cc_rnic_close(contextBlock->rnic);
2203 // Let the user know what happened and bail
2204 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for RECV buffers");
2205 sprintf(buf, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for RECV buffers");
2206 CmiAbort(buf);
2209 contextBlock->pinnedMemory = AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t);
2210 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
2211 CC_ADDR_TYPE_VA_BASED,
2212 (cc_byte_t*)contextBlock->freeRecvBuffers,
2213 AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t),
2214 contextBlock->pd_id,
2215 0, 0,
2216 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE | CC_ACF_REMOTE_WRITE,
2217 &newStagIndex)
2220 for (i=0; i<AMMASSO_INITIAL_BUFFERS; ++i) {
2221 contextBlock->freeRecvBuffers[i].tail.length = 0;
2222 contextBlock->freeRecvBuffers[i].next = &(contextBlock->freeRecvBuffers[i+1]);
2223 contextBlock->freeRecvBuffers[i].stag = newStagIndex;
2225 contextBlock->freeRecvBuffers[AMMASSO_INITIAL_BUFFERS-1].next = NULL;
2226 contextBlock->last_freeRecvBuffers = &contextBlock->freeRecvBuffers[AMMASSO_INITIAL_BUFFERS-1];
2228 buffersPerNode = AMMASSO_INITIAL_BUFFERS / (contextBlock->numNodes-1);
2230 // distribute all the buffers allocated to the different processors, together
2231 // the the buffer where to receive the directly sent ACK
2232 bufferScanner = contextBlock->freeRecvBuffers;
2233 contextBlock->freeRecvBuffers = contextBlock->freeRecvBuffers[(contextBlock->numNodes-1)*buffersPerNode-1].next;
2234 contextBlock->num_freeRecvBuffers = AMMASSO_INITIAL_BUFFERS - (contextBlock->numNodes-1)*buffersPerNode;
2235 for (i=0; i<contextBlock->numNodes; ++i) {
2236 if (i == contextBlock->myNode) continue;
2237 nodes[i].num_recv_buf = buffersPerNode;
2238 nodes[i].recv_buf = bufferScanner;
2239 bufferScanner[buffersPerNode-1].next = NULL;
2240 nodes[i].last_recv_buf = &(bufferScanner[buffersPerNode-1]);
2241 nodes[i].pending = NULL;
2242 bufferScanner += buffersPerNode; // move forward of buffersPerNode buffers (of size sizeof(AmmassoBuffer))
2243 nodes[i].directAck = ack_location;
2244 ack_location++;
2247 // Try to allocate the memory for the sending buffers, together with the
2248 // buffers from where the direct ACK will be sent
2249 sendBuffer = (AmmassoBuffer*) CmiAlloc(AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t));
2251 if (sendBuffer == NULL) {
2253 // Attempt to close the RNIC
2254 cc_rnic_close(contextBlock->rnic);
2256 // Let the user know what happened and bail
2257 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2258 sprintf(buf, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2259 CmiAbort(buf);
2262 contextBlock->pinnedMemory += AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t);
2263 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
2264 CC_ADDR_TYPE_VA_BASED,
2265 (cc_byte_t*)sendBuffer,
2266 AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t),
2267 contextBlock->pd_id,
2268 0, 0,
2269 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE,
2270 &newStagIndex)
2273 contextBlock->freeTokens = NULL;
2274 contextBlock->num_freeTokens = 0;
2276 // Allocate the send tokens, together with the tokens for the ACK buffers
2277 newTokens = (AmmassoToken*) CmiAlloc((AMMASSO_INITIAL_BUFFERS+contextBlock->numNodes-1)*ALIGN8(sizeof(AmmassoToken)));
2279 if (newTokens == NULL) {
2281 // Attempt to close the RNIC
2282 cc_rnic_close(contextBlock->rnic);
2284 // Let the user know what happened and bail
2285 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2286 sprintf(buf, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2287 CmiAbort(buf);
2290 newSgls = (cc_data_addr_t*) CmiAlloc((AMMASSO_INITIAL_BUFFERS+contextBlock->numNodes-1)*ALIGN8(sizeof(cc_data_addr_t)));
2292 if (newSgls == NULL) {
2294 // Attempt to close the RNIC
2295 cc_rnic_close(contextBlock->rnic);
2297 // Let the user know what happened and bail
2298 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2299 sprintf(buf, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
2300 CmiAbort(buf);
2303 contextBlock->num_freeTokens = 0;
2304 contextBlock->last_freeTokens = NULL;
2305 contextBlock->freeTokens = NULL;
2306 tokenScanner = newTokens;
2307 for (i=0; i<AMMASSO_INITIAL_BUFFERS; ++i) {
2308 newSgls->stag = newStagIndex;
2309 newSgls->length = AMMASSO_BUFSIZE + sizeof(Tailer);
2310 newSgls->to = (unsigned long)&(sendBuffer[i]);
2311 tokenScanner->wr.wr_id = (unsigned long)tokenScanner;
2312 tokenScanner->wr.wr_type = CC_WR_TYPE_RDMA_WRITE;
2313 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_count = 1;
2314 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_list = newSgls;
2315 tokenScanner->wr.signaled = 1;
2316 tokenScanner->localBuf = (AmmassoBuffer*)&(sendBuffer[i]);
2317 LIST_ENQUEUE(contextBlock->,freeTokens,tokenScanner);
2318 newSgls = (cc_data_addr_t*)(((char*)newSgls)+ALIGN8(sizeof(cc_data_addr_t)));
2319 tokenScanner = (AmmassoToken*)(((char*)tokenScanner)+ALIGN8(sizeof(AmmassoToken)));
2322 /* At this point, newSgls, tokenScanner, and ack_location point to the first
2323 element to be used for the ACK buffers */
2325 // Setup the ack_sq_wr for all nodes
2326 for (i=0; i<contextBlock->numNodes; ++i) {
2327 if (i == contextBlock->myNode) continue;
2328 newSgls->stag = newStagIndex;
2329 newSgls->length = sizeof(ammasso_ack_t);
2330 newSgls->to = (unsigned long)ack_location;
2331 nodes[i].remoteAck = ack_location;
2332 tokenScanner->wr.wr_id = (unsigned long)tokenScanner;
2333 tokenScanner->wr.wr_type = CC_WR_TYPE_RDMA_WRITE;
2334 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_count = 1;
2335 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_list = newSgls;
2336 tokenScanner->wr.signaled = 1;
2337 nodes[i].ack_sq_wr = &tokenScanner->wr;
2338 newSgls = (cc_data_addr_t*)(((char*)newSgls)+ALIGN8(sizeof(cc_data_addr_t)));
2339 tokenScanner = (AmmassoToken*)(((char*)tokenScanner)+ALIGN8(sizeof(AmmassoToken)));
2340 ack_location++;
2343 // Loop through all of the PEs
2344 // Begin setting up the Queue Pairs (common code for both "server" and "client")
2345 // For nodes with a lower PE, set up a "server" connection (accept)
2346 // For nodes with a higher PE, connect as "client" (connect)
2348 // Loop through all the nodes in the setup
2349 for (i = 0; i < numNodes; i++) {
2351 // Setup any members of this nodes OtherNode structure that need setting up
2352 nodes[i].myNode = i;
2353 nodes[i].connectionState = QP_CONN_STATE_PRE_CONNECT;
2354 nodes[i].messagesNotYetAcknowledged = 0;
2355 nodes[i].usedTokens = NULL;
2356 nodes[i].last_usedTokens = NULL;
2357 nodes[i].num_usedTokens = 0;
2358 nodes[i].localAck = 0;
2359 nodes[i].sendBufLock = CmiCreateLock();
2360 nodes[i].send_next_lock = CmiCreateLock();
2361 nodes[i].recv_expect_lock = CmiCreateLock();
2362 nodes[i].max_used_tokens = 0;
2363 //nodes[i].send_UseIndex = 0;
2364 //nodes[i].send_InUseCounter = 0;
2365 //nodes[i].recv_UseIndex = 0;
2367 // If you walk around talking to yourself people will look at you all funny-like. Try not to do that.
2368 if (i == myNode) continue;
2370 *nodes[i].remoteAck = 0;
2372 // Establish the Connection
2373 establishQPConnection(nodes + i, 0); // Don't reuse the QP (there isn't one yet)
2375 } // end for (i < numNodes)
2378 // Need to block here until all the connections for this node are made
2379 MACHSTATE(2, "CmiAmmassoOpenQueuePairs() - INFO: Waiting for all connections to be established...");
2380 while (contextBlock->outstandingConnectionCount > 0) {
2382 usleep(1000);
2384 for (i = 0; i < contextBlock->numNodes; i++) {
2385 if (i == contextBlock->myNode) continue;
2386 //CompletionEventHandler(contextBlock->rnic, nodes[i].recv_cq, &(nodes[i]));
2387 //CompletionEventHandler(contextBlock->rnic, nodes[i].send_cq, &(nodes[i]));
2388 CheckRecvBufForMessage(&(nodes[i]));
2391 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: All Connections have been established... Continuing");
2393 // Pause a little so both ends of the connection have time to receive and process the asynchronous events
2394 //usleep(800000); // 800ms
2395 sleep(1);
2397 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: Sending ready to all neighboors...");
2399 // Send all the ready packets
2400 for (i = 0; i < numNodes; i++) {
2401 int tmp;
2402 char buf[24];
2404 if (i == myNode) continue; // Skip self
2406 MACHSTATE1(1, "CmiAmmassoOpenQueuePairs() - INFO: Sending READY to node %d", i);
2408 // Send a READY control message to the node, give a non-null length
2409 sendDataOnQP(buf, 1, &(nodes[i]), AMMASSO_READY);
2412 // Set the sendBufLock
2413 CmiLock(nodes[i].sendBufLock);
2415 // Setup the message
2416 *( (int*)(nodes[i].send_buf )) = Cmi_charmrun_pid; // Send the charmrun PID
2417 *((short*)(nodes[i].send_buf + 4)) = myNode; // Send this node's number
2419 // Post the send
2420 nodes[i].send_sgl.length = 6; // DMK : TODO : FIXME : Change this later when multiple buffers are supported
2421 rtn = cc_qp_post_sq(contextBlock->rnic, nodes[i].qp, &(nodes[i].sq_wr), 1, &tmp);
2422 if (rtn != CC_OK || tmp != 1) {
2423 // Free the sendBufLock
2424 CmiUnlock(nodes[i].sendBufLock);
2426 // Let the user know what happened
2427 MACHSTATE1(3, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to send READY packet to node %d.", i);
2431 // Note: The code in the completion event handler will unlock sendBufLock again after the packet has actually been sent
2434 // DMK : NOTE : I don't think this is really needed... (leaving for now just incase I find out it really is)
2435 //// Wait until all the ready packets have been sent
2436 //while (keepWaiting) {
2437 // int j;
2439 // // Assume we won't find a lock that is set
2440 // keepWaiting = 0;
2442 // // Check all the locks, if one is set, sleep and try again
2443 // for (j = 0; i < numNodes; j++)
2444 // if (nodes[j].sendBufLock) {
2445 // keepWaiting = 1;
2446 // usleep(10000); // sleep 10ms
2447 // break;
2448 // }
2451 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: All ready packets sent to neighboors...");
2453 // Need to block here until all of the ready packets have been received
2454 // NOTE : Because this is a fully connection graph of connections between the nodes, this will block all the nodes
2455 // until all the nodes are ready (and all the PEs since there is a node barrier in the run pe function that
2456 // all the threads execute... the thread executing this is one of those so it has to reach that node barrier
2457 // before any of the other can start doing much of anything).
2458 MACHSTATE(2, "CmiAmmassoOpenQueuePairs() - INFO: Waiting for all neighboors to be ready...");
2459 while (contextBlock->nodeReadyCount > 0) {
2460 usleep(10000); // Sleep 10ms
2462 for (i = 0; i < contextBlock->numNodes; i++) {
2463 if (i == contextBlock->myNode) continue;
2464 //CompletionEventHandler(contextBlock->rnic, nodes[i].recv_cq, &(nodes[i]));
2465 //CompletionEventHandler(contextBlock->rnic, nodes[i].send_cq, &(nodes[i]));
2466 CheckRecvBufForMessage(&(nodes[i]));
2469 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: All neighboors ready...");
2471 MACHSTATE(2, "CmiAmmassoOpenQueuePairs() - INFO: Finished.");
2476 // NOTE: When reestablishing a connection, the QP can be reused so don't recreate a new one (reuseQPFlag = 1).
2477 // When openning the connection for the first time, there is no QP so create one (reuseQPFlag = 0).
2478 // DMK : TODO : Fix the comment and parameter (I've been playing with what reuseQPFlag actually does and got
2479 // tired of updating comments)... update the comment when this is finished).
2480 void establishQPConnection(OtherNode node, int reuseQPFlag) {
2482 cc_qp_create_attrs_t qpCreateAttrs;
2483 cc_status_t rtn;
2484 int i;
2485 cc_uint32_t numWRsPosted;
2487 MACHSTATE1(2, "establishQPConnection() - INFO: Called for node %d...", node->myNode);
2489 ///// Shared "Client" and "Server" Code /////
2491 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-RECV-CQ-CREATE)");
2493 // Create the Completion Queue, just create a fake one since with rdma writes it is not used
2494 node->recv_cq_depth = 1;
2495 CC_CHECK(cc_cq_create,(contextBlock->rnic, &(node->recv_cq_depth), contextBlock->eh_id, node, &(node->recv_cq)));
2497 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-SEND-CQ-CREATE)");
2499 // Create the Completion Queue
2500 //node->send_cq_depth = AMMASSO_NUMMSGBUFS_PER_QP * 4;
2501 node->send_cq_depth = AMMASSO_BUFFERS_INFLY;
2502 CC_CHECK(cc_cq_create,(contextBlock->rnic, &(node->send_cq_depth), contextBlock->eh_id, node, &(node->send_cq)));
2504 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-QP-CREATE)");
2506 // Create the Queue Pair
2507 // Set some initial Create Queue Pair Attributes that will be reused for all Queue Pairs Created
2508 qpCreateAttrs.sq_cq = node->send_cq; // Set the Send Queue's Completion Queue
2509 qpCreateAttrs.rq_cq = node->recv_cq; // Set the Request Queue's Completion Queue
2510 qpCreateAttrs.sq_depth = node->send_cq_depth;
2511 qpCreateAttrs.rq_depth = node->recv_cq_depth;
2512 qpCreateAttrs.srq = 0;
2513 qpCreateAttrs.rdma_read_enabled = 1;
2514 qpCreateAttrs.rdma_write_enabled = 1;
2515 qpCreateAttrs.rdma_read_response_enabled = 1;
2516 qpCreateAttrs.mw_bind_enabled = 0;
2517 qpCreateAttrs.zero_stag_enabled = 0;
2518 qpCreateAttrs.send_sgl_depth = 1;
2519 qpCreateAttrs.recv_sgl_depth = 1;
2520 qpCreateAttrs.rdma_write_sgl_depth = 1;
2521 qpCreateAttrs.ord = 1;
2522 qpCreateAttrs.ird = 1;
2523 qpCreateAttrs.pdid = contextBlock->pd_id; // Set the Protection Domain
2524 qpCreateAttrs.user_context = node; // Set the User Context Block that will be passed into function calls
2526 CC_CHECK(cc_qp_create,(contextBlock->rnic, &qpCreateAttrs, &(node->qp), &(node->qp_id)));
2528 // Since the QP was just created (or re-created), reset the sequence number and any other variables that need reseting
2529 //node->send_InUseCounter = 0;
2530 //node->send_UseIndex = 0;
2531 node->sendBufLock = 0;
2532 node->send_next = 0;
2533 node->send_next_lock = CmiCreateLock();
2534 node->recv_expect = 0;
2535 node->recv_expect_lock = CmiCreateLock();
2536 node->recv_UseIndex = 0;
2538 if (!reuseQPFlag) {
2540 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-NSMR-REGISTER-VIRT QP-QUERY-ATTRS)");
2542 // Attempt to register the qp_attrs member of the OtherNode structure with the RNIC so the Queue Pair's state can be queried
2543 contextBlock->pinnedMemory += sizeof(cc_qp_query_attrs_t);
2544 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
2545 CC_ADDR_TYPE_VA_BASED,
2546 (cc_byte_t*)(&(node->qp_attrs)),
2547 sizeof(cc_qp_query_attrs_t),
2548 contextBlock->pd_id,
2549 0, 0,
2550 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE,
2551 &(node->qp_attrs_stag_index))
2555 ///// "Server" Specific /////
2556 if (node->myNode < contextBlock->myNode) {
2558 int count = 64;
2559 char value[64];
2560 int j;
2562 MACHSTATE(1, "establishQPConnection() - INFO: Starting \"Server\" Code...");
2564 // Setup the address
2565 CC_CHECK(cc_rnic_getconfig,(contextBlock->rnic, CC_GETCONFIG_ADDRS, &count, &value));
2567 //MACHSTATE1(3, "establishQPConnection() - count = %4d", count);
2568 //for (j = 0; j < count; j++) {
2569 // MACHSTATE2(3, "establishQPConnection() - value[%d] = %4d", j, (int)value[j]);
2572 // Setup the Address
2573 // DMK : TODO : FIXME : Fix this code so that it handles host-network/big-little endian ordering
2574 *(((char*)&(node->address)) + 0) = value[0];
2575 *(((char*)&(node->address)) + 1) = value[1];
2576 *(((char*)&(node->address)) + 2) = value[2];
2577 *(((char*)&(node->address)) + 3) = value[3];
2579 // Setup the Port
2580 node->port = htons(AMMASSO_PORT + node->myNode);
2582 MACHSTATE4(1, "establishQPConnection() - Using Address (Hex) 0x%02X 0x%02X 0x%02X 0x%02X", ((node->address >> 24) & 0xFF), ((node->address >> 16) & 0xFF), ((node->address >> 8) & 0xFF), (node->address & 0xFF));
2583 MACHSTATE4(1, " (Dec) %4d %4d %4d %4d", ((node->address >> 24) & 0xFF), ((node->address >> 16) & 0xFF), ((node->address >> 8) & 0xFF), (node->address & 0xFF));
2584 MACHSTATE2(1, " Port (Hex) 0x%02X 0x%02X", ((node->port >> 8) & 0xFF), (node->port & 0xFF));
2586 /* Listen for an incomming connection (NOTE: This call will return
2587 immediately; when a connection attempt is made by a "client", the
2588 asynchronous handler will be called.) */
2589 CC_CHECK(cc_ep_listen_create,(contextBlock->rnic, node->address, &(node->port), 3, contextBlock, &(node->ep)));
2591 MACHSTATE(1, "establishQPConnection() - Listening...");
2595 ///// "Client" Specific /////
2596 if (node->myNode > contextBlock->myNode) {
2597 int first;
2598 AmmassoPrivateData priv;
2600 // A one-time sleep that should give the passive side QPs time to post the listens before the active sides start trying to connect
2601 if (node->myNode == contextBlock->myNode + 1 || reuseQPFlag) // Only do once if all the connections are being made for the first time, do this for all
2602 usleep(400000); // Sleep 400ms // connections if reconnecting so the other RNIC has time to setup the listen
2604 MACHSTATE(1, "establishQPConnection() - INFO: Starting \"Client\" Code...");
2606 // Setup the Address
2607 // DMK : TODO : FIXME : Fix this code so that it handles host-network/big-little endian ordering
2608 *(((char*)&(node->address)) + 0) = *(((char*)&(node->addr.sin_addr.s_addr)) + 0);
2609 *(((char*)&(node->address)) + 1) = *(((char*)&(node->addr.sin_addr.s_addr)) + 1);
2610 *(((char*)&(node->address)) + 2) = *(((char*)&(node->addr.sin_addr.s_addr)) + 2);
2611 *(((char*)&(node->address)) + 3) = (*(((char*)&(node->addr.sin_addr.s_addr)) + 3)) - 1;
2613 // Setup the Port
2614 node->port = htons(AMMASSO_PORT + contextBlock->myNode);
2616 MACHSTATE4(1, "establishQPConnection() - Using Address (Hex) 0x%02X 0x%02X 0x%02X 0x%02X", ((node->address >> 24) & 0xFF), ((node->address >> 16) & 0xFF), ((node->address >> 8) & 0xFF), (node->address & 0xFF));
2617 MACHSTATE4(1, " (Dec) %4d %4d %4d %4d", ((node->address >> 24) & 0xFF), ((node->address >> 16) & 0xFF), ((node->address >> 8) & 0xFF), (node->address & 0xFF));
2618 MACHSTATE2(1, " Port (Hex) 0x%02X 0x%02X", ((node->port >> 8) & 0xFF), (node->port & 0xFF));
2620 /* Attempt to make a connection to a "server" (NOTE: This call will return
2621 immediately; when the connection to the "server" is established, the
2622 asynchronous handler will be called.) */
2624 // by allocation protocol, the stags will be the same
2625 priv.node = contextBlock->myNode;
2626 priv.stag = node->recv_buf->stag;
2627 priv.to = (cc_uint64_t)node->recv_buf;
2628 priv.ack_to = (cc_uint64_t)node->directAck;
2630 CC_CHECK(cc_qp_connect,(contextBlock->rnic, node->qp, node->address, node->port, sizeof(AmmassoPrivateData), (cc_uint8_t*)&priv));
2636 // NOTE: This will be called in the event of a connection error
2637 void reestablishQPConnection(OtherNode node) {
2639 cc_status_t rtn;
2640 char buf[16];
2641 cc_qp_modify_attrs_t modAttrs;
2642 cc_wc_t wc;
2644 MACHSTATE1(2, "reestablishQPConnection() - INFO: For node %d: Clearing Outstanding WRs...", node->myNode);
2646 // Drain the RECV completion Queue (if a connection is lost, all pending Work Requests are completed with a FLUSHED error)
2647 while (1) {
2648 // Pool for a message
2649 rtn = cc_cq_poll(contextBlock->rnic, node->recv_cq, &wc);
2650 if (rtn == CCERR_CQ_EMPTY) break;
2652 // DMK : TODO : FIXME : Something should be done with the WRs that are pulled off so they can be reissued
2655 // Drain the SEND completion Queue (if a connection is lost, all pending Work Requests are completed with a FLUSHED error)
2656 while (1) {
2657 // Pool for a message
2658 rtn = cc_cq_poll(contextBlock->rnic, node->send_cq, &wc);
2659 if (rtn == CCERR_CQ_EMPTY) break;
2661 // DMK : TODO : FIXME : Something should be done with the WRs that are pulled off so they can be reissued
2664 MACHSTATE1(1, "reestablishQPConnection() - INFO: For node %d: Waiting for QP to enter ERROR state...", node->myNode);
2666 do {
2668 // Query the QP's state
2669 rtn = cc_qp_query(contextBlock->rnic, node->qp, &(node->qp_attrs));
2670 if (rtn != CC_OK) {
2671 MACHSTATE2(5, "AsynchronousEventHandler() - ERROR: Unable to Query Queue Pair (l): %d, \"%s\"", rtn, cc_status_to_string(rtn));
2672 break;
2675 // Check to see if the state is ERROR, if so, break from the loop... otherwise, keep waiting, it will be soon
2676 if (node->qp_attrs.qp_state == CC_QP_STATE_ERROR)
2677 break;
2678 else
2679 usleep(1000); // 1ms
2681 } while (1);
2683 MACHSTATE2(1, "reestablishQPConnection() - INFO: Finished waiting node %d: QP state = \"%s\"...", node->myNode, cc_qp_state_to_string(node->qp_attrs.qp_state));
2684 MACHSTATE1(1, "reestablishQPConnection() - INFO: Attempting to transition QP into IDLE state for node %d", node->myNode);
2686 // Transition the Queue Pair from ERROR into IDLE state
2687 modAttrs.llp_ep = node->ep;
2688 modAttrs.next_qp_state = CC_QP_STATE_IDLE;
2689 modAttrs.ord = CC_QP_NO_ATTR_CHANGE;
2690 modAttrs.ird = CC_QP_NO_ATTR_CHANGE;
2691 modAttrs.sq_depth = CC_QP_NO_ATTR_CHANGE;
2692 modAttrs.rq_depth = CC_QP_NO_ATTR_CHANGE;
2693 modAttrs.stream_message_buffer = NULL;
2694 modAttrs.stream_message_length = 0;
2695 rtn = cc_qp_modify(contextBlock->rnic, node->qp, &modAttrs);
2696 if (rtn != CC_OK) {
2697 // Let the user know what happened
2698 MACHSTATE2(5, "reestablishQPConnection() - ERROR: Unable to Modify QP State: %d, \"%s\"", rtn, cc_status_to_string(rtn));
2701 rtn = cc_qp_query(contextBlock->rnic, node->qp, &(node->qp_attrs));
2702 if (rtn != CC_OK) {
2703 MACHSTATE2(5, "reestablishQPConnection() - ERROR: Unable to Query Queue Pair (1): %d, \"%s\"", rtn, cc_status_to_string(rtn));
2705 MACHSTATE2(1, "reestablishQPConnection() - INFO: Transition results for node %d: QP state = \"%s\"...", node->myNode, cc_qp_state_to_string(node->qp_attrs.qp_state));
2707 closeQPConnection(node, 0); // Close the connection but do not destroy the QP
2708 establishQPConnection(node, 1); // Reopen the connection and reuse the QP that has already been created
2712 // NOTE: When reestablishing a connection, the QP can be reused so don't destroy it and create a new one (destroyQPFlag = 0).
2713 // When closing the connection because the application is terminating, destroy the QP (destroyQPFlat != 0).
2714 // DMK : TODO : Fix the comment and parameter (I've been playing with what destroyQPFlag actually does and got
2715 // tired of updating comments)... update the comment when this is finished).
2716 void closeQPConnection(OtherNode node, int destroyQPFlag) {
2718 MACHSTATE(2, "closeQPConnection() - INFO: Called...");
2721 // Close the Completion Queues
2722 cc_qp_destroy(contextBlock->rnic, node->qp);
2723 cc_cq_destroy(contextBlock->rnic, node->send_cq);
2724 cc_cq_destroy(contextBlock->rnic, node->recv_cq);
2726 if (destroyQPFlag) {
2727 // De-Register Memory with the RNIC
2728 cc_stag_dealloc(contextBlock->rnic, node->qp_attrs_stag_index);
2729 cc_stag_dealloc(contextBlock->rnic, node->recv_stag_index);
2730 cc_stag_dealloc(contextBlock->rnic, node->send_stag_index);
2731 //cc_stag_dealloc(contextBlock->rnic, node->rdma_stag_index);
2733 CmiFree(node->recv_buf);
2734 CmiFree(node->rq_wr);
2735 CmiFree(node->recv_sgl);
2737 CmiFree(node->send_buf);
2738 CmiFree(node->sq_wr);
2739 CmiFree(node->send_sgl);
2740 //CmiFree(node->send_bufFree);
2746 char* cc_status_to_string(cc_status_t errorCode) {
2748 switch (errorCode) {
2750 case CC_OK: return "OK";
2751 case CCERR_INSUFFICIENT_RESOURCES: return "Insufficient Resources";
2752 case CCERR_INVALID_MODIFIER: return "Invalid Modifier";
2753 case CCERR_INVALID_MODE: return "Invalid Mode";
2754 case CCERR_IN_USE: return "In Use";
2755 case CCERR_INVALID_RNIC: return "Invalid RNIC";
2756 case CCERR_INTERRUPTED_OPERATION: return "Interrupted Operation";
2757 case CCERR_INVALID_EH: return "Invalid EH";
2758 case CCERR_INVALID_CQ: return "Invalid CQ";
2759 case CCERR_CQ_EMPTY: return "CQ Empty";
2760 case CCERR_NOT_IMPLEMENTED: return "Not Implemented";
2761 case CCERR_CQ_DEPTH_TOO_SMALL: return "CQ Depth Too Small";
2762 case CCERR_PD_IN_USE: return "PD In Use";
2763 case CCERR_INVALID_PD: return "Invalid PD";
2764 case CCERR_INVALID_SRQ: return "Invalid SRQ";
2765 case CCERR_INVALID_ADDRESS: return "Invalid Address";
2766 case CCERR_INVALID_NETMASK: return "Invalid Netmask";
2767 case CCERR_INVALID_QP: return "Invalid QP";
2768 case CCERR_INVALID_QP_STATE: return "Invalid QP State";
2769 case CCERR_TOO_MANY_WRS_POSTED: return "Too Many WRs Posted";
2770 case CCERR_INVALID_WR_TYPE: return "Invalid WR Type";
2771 case CCERR_INVALID_SGL_LENGTH: return "Invalid SGL Length";
2772 case CCERR_INVALID_SQ_DEPTH: return "Invalid SQ Depth";
2773 case CCERR_INVALID_RQ_DEPTH: return "Invalid RQ Depth";
2774 case CCERR_INVALID_ORD: return "Invalid ORD";
2775 case CCERR_INVALID_IRD: return "Invalid IRD";
2776 case CCERR_QP_ATTR_CANNOT_CHANGE: return "QP_ATTR_CANNON_CHANGE";
2777 case CCERR_INVALID_STAG: return "Invalid STag";
2778 case CCERR_QP_IN_USE: return "QP In Use";
2779 case CCERR_OUTSTANDING_WRS: return "Outstanding WRs";
2780 // case CCERR_MR_IN_USE: NOTE : "CCERR_MR_IN_USE = CCERR_STAG_IN_USE" in "cc_status.h"
2781 case CCERR_STAG_IN_USE: return "STag In Use";
2782 case CCERR_INVALID_STAG_INDEX: return "Invalid STag Index";
2783 case CCERR_INVALID_SGL_FORMAT: return "Invalid SGL Format";
2784 case CCERR_ADAPTER_TIMEOUT: return "Adapter Timeout";
2785 case CCERR_INVALID_CQ_DEPTH: return "Invalid CQ Depth";
2786 case CCERR_INVALID_PRIVATE_DATA_LENGTH: return "Invalid Private Data Length";
2787 case CCERR_INVALID_EP: return "Invalid EP";
2788 case CCERR_FLUSHED: return "Flushed";
2789 case CCERR_INVALID_WQE: return "Invalid WQE";
2790 case CCERR_LOCAL_QP_CATASTROPHIC_ERROR: return "Local QP Catastrophic Error";
2791 case CCERR_REMOTE_TERMINATION_ERROR: return "Remote Termination Error";
2792 case CCERR_BASE_AND_BOUNDS_VIOLATION: return "Base and Bounds Violation";
2793 case CCERR_ACCESS_VIOLATION: return "Access Violation";
2794 case CCERR_INVALID_PD_ID: return "Invalid PD ID";
2795 case CCERR_WRAP_ERROR: return "Wrap Error";
2796 case CCERR_INV_STAG_ACCESS_ERROR: return "Invalid STag Access Error";
2797 case CCERR_ZERO_RDMA_READ_RESOURCES: return "Zero RDMA Read Resources";
2798 case CCERR_QP_NOT_PRIVILEGED: return "QP Not Privileged";
2799 case CCERR_STAG_STATE_NOT_INVALID: return "STag State Not Invalid"; // ???
2800 case CCERR_INVALID_PAGE_SIZE: return "Invalid Page Size";
2801 case CCERR_INVALID_BUFFER_SIZE: return "Invalid Buffer Size";
2802 case CCERR_INVALID_PBE: return "Invalid PBE";
2803 case CCERR_INVALID_FBO: return "Invalid FBO";
2804 case CCERR_INVALID_LENGTH: return "Invalid Length";
2805 case CCERR_INVALID_ACCESS_RIGHTS: return "Invalid Access Rights";
2806 case CCERR_PBL_TOO_BIG: return "PBL Too Big";
2807 case CCERR_INVALID_VA: return "Invalid VA";
2808 case CCERR_INVALID_REGION: return "Invalid Region";
2809 case CCERR_INVALID_WINDOW: return "Invalid Window";
2810 case CCERR_TOTAL_LENGTH_TOO_BIG: return "Total Length Too Big";
2811 case CCERR_INVALID_QP_ID: return "Invalid QP ID";
2812 case CCERR_ADDR_IN_USE: return "Address In Use";
2813 case CCERR_ADDR_NOT_AVAIL: return "Address Not Available";
2814 case CCERR_NET_DOWN: return "Network Down";
2815 case CCERR_NET_UNREACHABLE: return "Network Unreachable";
2816 case CCERR_CONN_ABORTED: return "Connection Aborted";
2817 case CCERR_CONN_RESET: return "Connection Reset";
2818 case CCERR_NO_BUFS: return "No Buffers";
2819 case CCERR_CONN_TIMEDOUT: return "Connection Timed-Out";
2820 case CCERR_CONN_REFUSED: return "Connection Refused";
2821 case CCERR_HOST_UNREACHABLE: return "Host Unreachable";
2822 case CCERR_INVALID_SEND_SGL_DEPTH: return "Invalid Send SGL Depth";
2823 case CCERR_INVALID_RECV_SGL_DEPTH: return "Invalid Receive SGL Depth";
2824 case CCERR_INVALID_RDMA_WRITE_SGL_DEPTH: return "Ivalid RDMA Write SGL Depth";
2825 case CCERR_INSUFFICIENT_PRIVILEGES: return "Insufficient Privileges";
2826 case CCERR_STACK_ERROR: return "Stack Error";
2827 case CCERR_INVALID_VERSION: return "Invalid Version";
2828 case CCERR_INVALID_MTU: return "Invalid MTU";
2829 case CCERR_INVALID_IMAGE: return "Invalid Image";
2830 case CCERR_PENDING: return "(PENDING: Internal to Adapter... Hopefully you aren't reading this...)"; /* not an error; user internally by adapter */
2831 case CCERR_DEFER: return "(DEFER: Internal to Adapter... Hopefully you aren't reading this...)"; /* not an error; used internally by adapter */
2832 case CCERR_FAILED_WRITE: return "Failed Write";
2833 case CCERR_FAILED_ERASE: return "Failed Erase";
2834 case CCERR_FAILED_VERIFICATION: return "Failed Verification";
2835 case CCERR_NOT_FOUND: return "Not Found";
2836 default: return "Unknown Error Code";
2840 // NOTE: Letting these string be separate incase different information should be
2841 // returned that what cc_status_to_string() would return
2842 char* cc_conn_error_to_string(cc_connect_status_t errorCode) {
2844 switch (errorCode) {
2845 case CC_CONN_STATUS_SUCCESS: return "Success";
2846 case CC_CONN_STATUS_NO_MEM: return "No Memory";
2847 case CC_CONN_STATUS_TIMEDOUT: return "Timed-Out";
2848 case CC_CONN_STATUS_REFUSED: return "Refused";
2849 case CC_CONN_STATUS_NETUNREACH: return "Network Unreachable";
2850 case CC_CONN_STATUS_HOSTUNREACH: return "Host Unreachable";
2851 case CC_CONN_STATUS_INVALID_RNIC: return "Invalid RNIC";
2852 case CC_CONN_STATUS_INVALID_QP: return "Invalid QP";
2853 case CC_CONN_STATUS_INVALID_QP_STATE: return "Invalid QP State";
2854 case CC_CONN_STATUS_REJECTED: return "Rejected";
2855 default: return (cc_status_to_string((cc_status_t)errorCode));
2859 void displayQueueQuery(cc_qp_handle_t qp, cc_qp_query_attrs_t *attrs) {
2861 //cc_qp_query_attrs_t attr;
2862 cc_status_t rtn;
2863 char buf[1024];
2865 OtherNode node = getNodeFromQPHandle(qp);
2866 if (node != NULL) {
2867 MACHSTATE1(2, "displayQueueQuery() - Called for node %d", node->myNode);
2868 } else {
2869 MACHSTATE(2, "displayQueueQuery() - Called for unknown node");
2872 // Query the Queue for its Attributes
2873 rtn = cc_qp_query(contextBlock->rnic, qp, attrs);
2874 if (rtn != CC_OK) {
2875 // Let the user know what happened
2876 MACHSTATE2(5, "displayQueueQuery() - ERROR: Unable to query queue: %d, \"%s\"", rtn, cc_status_to_string(rtn));
2877 return;
2880 // Output the results of the Query
2881 // DMK : TODO : For now I'm only putting in the ones that I care about... add more later or as needed
2882 MACHSTATE2(1, "displayQueueQuery() - qp_state = %d, \"%s\"", attrs->qp_state, cc_qp_state_to_string(attrs->qp_state));
2883 if (attrs->terminate_message_length > 0) {
2884 memcpy(buf, attrs->terminate_message, attrs->terminate_message_length);
2885 buf[attrs->terminate_message_length] = '\0';
2886 MACHSTATE1(1, "displayQueueQuery() - terminate_message = \"%s\"", buf);
2887 } else {
2888 MACHSTATE(1, "displayQueueQuery() - terminate_message = NULL");
2892 char* cc_qp_state_to_string(cc_qp_state_t qpState) {
2894 switch (qpState) {
2895 case CC_QP_STATE_IDLE: return "IDLE";
2896 case CC_QP_STATE_CONNECTING: return "CONNECTED";
2897 case CC_QP_STATE_RTS: return "RTS";
2898 case CC_QP_STATE_CLOSING: return "CLOSING";
2899 case CC_QP_STATE_TERMINATE: return "TERMINATE";
2900 case CC_QP_STATE_ERROR: return "ERROR";
2901 default: return "unknown";
2905 char* cc_event_id_to_string(cc_event_id_t id) {
2907 switch(id) {
2908 case CCAE_REMOTE_SHUTDOWN: return "Remote Shutdown";
2909 case CCAE_ACTIVE_CONNECT_RESULTS: return "Active Connect Results";
2910 case CCAE_CONNECTION_REQUEST: return "Connection Request";
2911 case CCAE_LLP_CLOSE_COMPLETE: return "LLP Close Complete";
2912 case CCAE_TERMINATE_MESSAGE_RECEIVED: return "Terminate Message Received";
2913 case CCAE_LLP_CONNECTION_RESET: return "LLP Connection Reset";
2914 case CCAE_LLP_CONNECTION_LOST: return "LLP Connection Lost";
2915 case CCAE_LLP_SEGMENT_SIZE_INVALID: return "Segment Size Invalid";
2916 case CCAE_LLP_INVALID_CRC: return "LLP Invalid CRC";
2917 case CCAE_LLP_BAD_FPDU: return "LLP Bad FPDU";
2918 case CCAE_INVALID_DDP_VERSION: return "Invalid DDP Version";
2919 case CCAE_INVALID_RDMA_VERSION: return "Invalid RMDA Version";
2920 case CCAE_UNEXPECTED_OPCODE: return "Unexpected Opcode";
2921 case CCAE_INVALID_DDP_QUEUE_NUMBER: return "Invalid DDP Queue Number";
2922 case CCAE_RDMA_READ_NOT_ENABLED: return "RDMA Read Not Enabled";
2923 case CCAE_RDMA_WRITE_NOT_ENABLED: return "RDMA Write Not Enabled";
2924 case CCAE_RDMA_READ_TOO_SMALL: return "RDMA Read Too Small";
2925 case CCAE_NO_L_BIT: return "No L Bit";
2926 case CCAE_TAGGED_INVALID_STAG: return "Tagged Invalid STag";
2927 case CCAE_TAGGED_BASE_BOUNDS_VIOLATION: return "Tagged Base Bounds Violation";
2928 case CCAE_TAGGED_ACCESS_RIGHTS_VIOLATION: return "Tagged Access Rights Violation";
2929 case CCAE_TAGGED_INVALID_PD: return "Tagged Invalid PD";
2930 case CCAE_WRAP_ERROR: return "Wrap Error";
2931 case CCAE_BAD_CLOSE: return "Bad Close";
2932 case CCAE_BAD_LLP_CLOSE: return "Bad LLP Close";
2933 case CCAE_INVALID_MSN_RANGE: return "Invalid MSN Range";
2934 case CCAE_INVALID_MSN_GAP: return "Invalid MSN Gap";
2935 case CCAE_IRRQ_OVERFLOW: return "IRRQ Overflow";
2936 case CCAE_IRRQ_MSN_GAP: return "IRRQ MSG Gap";
2937 case CCAE_IRRQ_MSN_RANGE: return "IRRQ MSN Range";
2938 case CCAE_IRRQ_INVALID_STAG: return "IRRQ Invalid STag";
2939 case CCAE_IRRQ_BASE_BOUNDS_VIOLATION: return "IRRQ Base Bounds Violation";
2940 case CCAE_IRRQ_ACCESS_RIGHTS_VIOLATION: return "IRRQ Access Rights Violation";
2941 case CCAE_IRRQ_INVALID_PD: return "IRRQ Invalid PD";
2942 case CCAE_IRRQ_WRAP_ERROR: return "IRRQ Wrap Error";
2943 case CCAE_CQ_SQ_COMPLETION_OVERFLOW: return "CQ SQ Completion Overflow";
2944 case CCAE_CQ_RQ_COMPLETION_ERROR: return "CQ RQ Completion Overflow";
2945 case CCAE_QP_SRQ_WQE_ERROR: return "QP SRQ WQE Error";
2946 case CCAE_QP_LOCAL_CATASTROPHIC_ERROR: return "QP Local Catastrophic Error";
2947 case CCAE_CQ_OVERFLOW: return "CQ Overflow";
2948 case CCAE_CQ_OPERATION_ERROR: return "CQ Operation Error";
2949 case CCAE_SRQ_LIMIT_REACHED: return "SRQ Limit Reached";
2950 case CCAE_QP_RQ_LIMIT_REACHED: return "QP RQ Limit Reached";
2951 case CCAE_SRQ_CATASTROPHIC_ERROR: return "SRQ Catastrophic Error";
2952 case CCAE_RNIC_CATASTROPHIC_ERROR: return "RNIC Catastrophic Error";
2953 default: return "Unknown Event ID";
2957 char* cc_connect_status_to_string(cc_connect_status_t status) {
2959 switch (status) {
2960 case CC_CONN_STATUS_SUCCESS: return "Success";
2961 case CC_CONN_STATUS_TIMEDOUT: return "Timedout";
2962 case CC_CONN_STATUS_REFUSED: return "Refused";
2963 case CC_CONN_STATUS_NETUNREACH: return "Network Unreachable";
2964 default: return "Unknown";