From 858296b2cfb60fa5eb85e8885280f89b33b8adf4 Mon Sep 17 00:00:00 2001 From: Gengbin Zheng Date: Tue, 14 Feb 2012 01:00:11 -0600 Subject: [PATCH] implement urgent send queue, default off --- src/arch/gemini_gni/machine.c | 178 +++++++++++++++++++++++------------- src/arch/util/machine-common-core.c | 9 +- 2 files changed, 119 insertions(+), 68 deletions(-) diff --git a/src/arch/gemini_gni/machine.c b/src/arch/gemini_gni/machine.c index 9853550e5d..9c1bbde2b0 100644 --- a/src/arch/gemini_gni/machine.c +++ b/src/arch/gemini_gni/machine.c @@ -6,6 +6,14 @@ Gengbin Zheng * Date: 07-01-2011 * + * Flow control by mem pool using environment variables: + + export CHARM_UGNI_MEMPOOL_SIZE=8M + export CHARM_UGNI_MEMPOOL_MAX=20M + export CHARM_UGNI_SEND_MAX=10M + + CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes + CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX */ /*@{*/ @@ -22,6 +30,8 @@ #include "converse.h" #include +#define USE_OOB 0 + #define PRINT_SYH 0 // Trace communication thread @@ -331,13 +341,25 @@ typedef struct msg_list_index MSG_LIST *tail; } MSG_LIST_INDEX; #endif + /* reuse PendingMsg memory */ static CONTROL_MSG *control_freelist=0; static MSG_LIST *msglist_freelist=0; + +typedef struct smsg_queue +{ + MSG_LIST_INDEX *smsg_msglist_index; + int smsg_head_index; +} SMSG_QUEUE; + +SMSG_QUEUE smsg_queue; +SMSG_QUEUE smsg_oob_queue; +/* static int smsg_head_index = -1; static MSG_LIST_INDEX *smsg_msglist_index= 0; -static MSG_LIST *smsg_free_head=0; -static MSG_LIST *smsg_free_tail=0; +static int smsg_oob_head_index = -1; +static MSG_LIST_INDEX *smsg_oob_msglist_index= 0; // out of band +*/ /* #define FreeMsgList(msg_head, msg_tail, free_head, free_tail) \ @@ -714,7 +736,7 @@ static gni_return_t registerMempool(void *msg) } inline -static void buffer_small_msgs(void *msg, int size, int destNode, uint8_t tag) +static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag) { MSG_LIST *msg_tmp; MallocMsgList(msg_tmp); @@ -725,17 +747,17 @@ static void buffer_small_msgs(void *msg, int size, int destNode, uint8_t tag) //msg_tmp->next = 0; #if !CMK_SMP - if (smsg_msglist_index[destNode].sendSmsgBuf == 0 ) { - smsg_msglist_index[destNode].next = smsg_head_index; - smsg_head_index = destNode; - smsg_msglist_index[destNode].tail = smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp; + if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) { + queue->smsg_msglist_index[destNode].next = queue->smsg_head_index; + queue->smsg_head_index = destNode; + queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp; }else { - smsg_msglist_index[destNode].tail->next = msg_tmp; - smsg_msglist_index[destNode].tail = msg_tmp; + queue->smsg_msglist_index[destNode].tail->next = msg_tmp; + queue->smsg_msglist_index[destNode].tail = msg_tmp; } #else - PCQueuePush(smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp); + PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp); #endif #if PRINT_SYH buffered_smsg_counter++; @@ -880,7 +902,7 @@ static int connect_to(int destNode) } //inline static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff ) -inline static gni_return_t send_smsg_message(int destNode, void *msg, int size, uint8_t tag, int inbuff ) +inline static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff ) { unsigned int remote_address; uint32_t remote_id; @@ -897,15 +919,15 @@ inline static gni_return_t send_smsg_message(int destNode, void *msg, int size, case 1: /* pending connection, do nothing */ status = GNI_RC_NOT_DONE; if(inbuff ==0) - buffer_small_msgs(msg, size, destNode, tag); + buffer_small_msgs(queue, msg, size, destNode, tag); return status; } } #if CMK_SMP - if(PCQueueEmpty(smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1) + if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1) { #else - if(smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1) + if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1) { #endif #if CMK_SMP && !COMM_THREAD_SEND @@ -937,7 +959,7 @@ inline static gni_return_t send_smsg_message(int destNode, void *msg, int size, } } if(inbuff ==0) - buffer_small_msgs(msg, size, destNode, tag); + buffer_small_msgs(queue, msg, size, destNode, tag); return status; } @@ -971,7 +993,7 @@ inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t se // Large message, send control to receiver, receiver register memory and do a GET, // return 1 - send no success inline -static int send_large_messages(int destNode, CONTROL_MSG *control_msg_tmp, int inbuff) +static int send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG *control_msg_tmp, int inbuff) { gni_return_t status = GNI_RC_NOT_DONE; uint32_t vmdh_index = -1; @@ -988,7 +1010,7 @@ static int send_large_messages(int destNode, CONTROL_MSG *control_msg_tmp, int if(buffered_send_msg >= MAX_BUFF_SEND) { if(!inbuff) - buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG); + buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG); return status; } if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others @@ -1020,7 +1042,7 @@ static int send_large_messages(int destNode, CONTROL_MSG *control_msg_tmp, int if(status == GNI_RC_SUCCESS) { - status = send_smsg_message( destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff); + status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff); if(status == GNI_RC_SUCCESS) { buffered_send_msg += register_size; @@ -1041,14 +1063,14 @@ static int send_large_messages(int destNode, CONTROL_MSG *control_msg_tmp, int }else { if(!inbuff) - buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG); + buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG); } return status; #else status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh); if(status == GNI_RC_SUCCESS) { - status = send_smsg_message( destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0); + status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0); if(status == GNI_RC_SUCCESS) { FreeControlMsg(control_msg_tmp); @@ -1058,7 +1080,7 @@ static int send_large_messages(int destNode, CONTROL_MSG *control_msg_tmp, int CmiAbort("Memory registor for large msg\n"); }else { - buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG); + buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG); } return status; #endif @@ -1074,6 +1096,14 @@ CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode) gni_return_t status = GNI_RC_SUCCESS; uint8_t tag; CONTROL_MSG *control_msg_tmp; + int oob = ( mode & OUT_OF_BAND); + SMSG_QUEUE *queue; + +#if USE_OOB + queue = oob? &smsg_oob_queue : &smsg_queue; +#else + queue = &smsg_queue; +#endif LrtsPrepareEnvelope(msg, size); @@ -1082,35 +1112,35 @@ CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode) #endif #if CMK_SMP && COMM_THREAD_SEND if(size <= SMSG_MAX_MSG) - buffer_small_msgs(msg, size, destNode, SMALL_DATA_TAG); + buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG); else if (size < BIG_MSG) { control_msg_tmp = construct_control_msg(size, msg, 0); - buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG); + buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG); } else { CmiSetMsgSeq(msg, 0); control_msg_tmp = construct_control_msg(size, msg, 1); - buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG); + buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG); } #else //non-smp, smp(worker sending) if(size <= SMSG_MAX_MSG) { - status = send_smsg_message( destNode, msg, size, SMALL_DATA_TAG, 0); + status = send_smsg_message(queue, destNode, msg, size, SMALL_DATA_TAG, 0); if(status == GNI_RC_SUCCESS) CmiFree(msg); } else if (size < BIG_MSG) { control_msg_tmp = construct_control_msg(size, msg, 0); - send_large_messages(destNode, control_msg_tmp, 0); + send_large_messages(queue, destNode, control_msg_tmp, 0); } else { #if USE_LRTS_MEMPOOL CmiSetMsgSeq(msg, 0); control_msg_tmp = construct_control_msg(size, msg, 1); - send_large_messages(destNode, control_msg_tmp, 0); + send_large_messages(queue, destNode, control_msg_tmp, 0); #else control_msg_tmp = construct_control_msg(size, msg, 0); - send_large_messages(destNode, control_msg_tmp, 0); + send_large_messages(queue, destNode, control_msg_tmp, 0); #endif } #endif @@ -1240,6 +1270,8 @@ static void PumpNetworkSmsg() int init_flag; CONTROL_MSG *control_msg_tmp, *header_tmp; uint64_t source_addr; + SMSG_QUEUE *queue = &smsg_queue; + #if CMK_SMP && !COMM_THREAD_SEND while(1) { @@ -1348,7 +1380,7 @@ static void PumpNetworkSmsg() if (control_msg_tmp->length >= ONE_SEG) control_msg_tmp->length = ONE_SEG; control_msg_tmp->seq_id = cur_seq+1+1; //send next seg - send_large_messages(inst_id, control_msg_tmp, 0); + send_large_messages(queue, inst_id, control_msg_tmp, 0); // pipelining if (header_tmp->seq_id == 1) { int i; @@ -1357,7 +1389,7 @@ static void PumpNetworkSmsg() CmiSetMsgSeq(header_tmp->source_addr, seq-1); control_msg_tmp = construct_control_msg(header_tmp->total_length, (char *)header_tmp->source_addr, seq); control_msg_tmp->dest_addr = header_tmp->dest_addr; - send_large_messages(inst_id, control_msg_tmp, 0); + send_large_messages(queue, inst_id, control_msg_tmp, 0); if (header_tmp->total_length <= ONE_SEG*seq) break; } } @@ -1625,6 +1657,8 @@ static void PumpLocalRdmaTransactions() #ifdef CMK_DIRECT CMK_DIRECT_HEADER *cmk_direct_done_msg; #endif + SMSG_QUEUE *queue = &smsg_queue; + #if CMK_SMP && !COMM_THREAD_SEND while(1) { CmiLock(tx_cq_lock); @@ -1717,10 +1751,10 @@ static void PumpLocalRdmaTransactions() } #if CMK_DIRECT if(tmp_pd->amo_cmd == 1) - status = send_smsg_message(inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); + status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); else #endif - status = send_smsg_message(inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); + status = send_smsg_message(queue, inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); if(status == GNI_RC_SUCCESS) { #if CMK_DIRECT @@ -1895,7 +1929,7 @@ static void SendRdmaMsg() } // return 1 if all messages are sent -static int SendBufferMsg() +static int SendBufferMsg(SMSG_QUEUE *queue) { MSG_LIST *ptr, *tmp_ptr, *pre=0, *current_head; CONTROL_MSG *control_msg_tmp; @@ -1904,13 +1938,13 @@ static int SendBufferMsg() int register_size; void *register_addr; int index_previous = -1; - int index = smsg_head_index; + int index = queue->smsg_head_index; #if CMI_EXERT_SEND_CAP int sent_cnt = 0; #endif #if ! CMK_SMP - index = smsg_head_index; + index = queue->smsg_head_index; #else index = 0; #endif @@ -1920,15 +1954,15 @@ static int SendBufferMsg() #if CMK_SMP while(index smsg_msglist_index[index].sendSmsgBuf); for (i=0; ismsg_msglist_index[index].sendSmsgBuf); if (ptr == NULL) break; #else while(index != -1) { - ptr = smsg_msglist_index[index].sendSmsgBuf; + ptr = queue->smsg_msglist_index[index].sendSmsgBuf; pre = 0; while(ptr != 0) { @@ -1945,7 +1979,7 @@ static int SendBufferMsg() switch(ptr->tag) { case SMALL_DATA_TAG: - status = send_smsg_message( ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1); + status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1); if(status == GNI_RC_SUCCESS) { CmiFree(ptr->msg); @@ -1953,13 +1987,13 @@ static int SendBufferMsg() break; case LMSG_INIT_TAG: control_msg_tmp = (CONTROL_MSG*)ptr->msg; - status = send_large_messages( ptr->destNode, control_msg_tmp, 1); + status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1); if(status != GNI_RC_SUCCESS) done = 0; break; case ACK_TAG: case BIG_MSG_TAG: - status = send_smsg_message( ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1); + status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1); if(status == GNI_RC_SUCCESS) { FreeControlMsg((CONTROL_MSG*)ptr->msg); @@ -1968,7 +2002,7 @@ static int SendBufferMsg() break; #ifdef CMK_DIRECT case DIRECT_PUT_DONE_TAG: - status = send_smsg_message( ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1); + status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1); if(status == GNI_RC_SUCCESS) { free((CMK_DIRECT_HEADER*)ptr->msg); @@ -1989,7 +2023,7 @@ static int SendBufferMsg() ptr = pre ->next = ptr->next; }else { - ptr = smsg_msglist_index[index].sendSmsgBuf = smsg_msglist_index[index].sendSmsgBuf->next; + ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next; } FreeMsgList(tmp_ptr); #else @@ -2006,7 +2040,7 @@ static int SendBufferMsg() #endif }else { #if CMK_SMP - PCQueuePush(smsg_msglist_index[index].sendSmsgBuf, (char*)ptr); + PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr); #else pre = ptr; ptr=ptr->next; @@ -2015,18 +2049,18 @@ static int SendBufferMsg() } } //end while #if !CMK_SMP - smsg_msglist_index[index].tail = pre; - if(smsg_msglist_index[index].sendSmsgBuf == 0) + queue->smsg_msglist_index[index].tail = pre; + if(queue->smsg_msglist_index[index].sendSmsgBuf == 0) { if(index_previous != -1) - smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next; + queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next; else - smsg_head_index = smsg_msglist_index[index].next; + queue->smsg_head_index = queue->smsg_msglist_index[index].next; }else { index_previous = index; } - index = smsg_msglist_index[index].next; + index = queue->smsg_msglist_index[index].next; #else index++; #endif @@ -2092,7 +2126,10 @@ void LrtsAdvanceCommunication(int whileidle) #if CMK_SMP_TRACE_COMMTHREAD startT = CmiWallTimer(); #endif - SendBufferMsg(); +#if USE_OOB + SendBufferMsg(&smsg_oob_queue); +#endif + SendBufferMsg(&smsg_queue); #if DEBUG_POOL MACHSTATE(8, "after SendBufferMsg\n") ; #endif @@ -2275,10 +2312,26 @@ static void _init_static_smsg() } inline -static void _init_smsg() +static void _init_send_queue(SMSG_QUEUE *queue) { - int i; + int i; + queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX)); + for(i =0; ismsg_msglist_index[i].next = -1; +#if CMK_SMP + queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate(); +#else + queue->smsg_msglist_index[i].sendSmsgBuf = 0; +#endif + + } + queue->smsg_head_index = -1; +} +inline +static void _init_smsg() +{ if(mysize > 1) { if (useDynamicSMSG) _init_dynamic_smsg(); @@ -2286,18 +2339,10 @@ static void _init_smsg() _init_static_smsg(); } - smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX)); - for(i =0; i