From 81f00d788030abd2a5df0e6d95057df41f658ed3 Mon Sep 17 00:00:00 2001 From: Bilge Acun Date: Tue, 17 Mar 2015 15:22:22 -0500 Subject: [PATCH] Bug #642: Use fine grain locking in smp mode in net layer Changed CmiCommlock to finer grain locks; send_queue_lock, Cmi_freelist_mutex and Cmi_comm_var_mutex. send_queue_lock is for each of the outgoing msg queues where worker threads drops the messages and comm thread consumes. Cmi_freelist_mutex is for the shared free list where the used datagrams are stored and reused. Cmi_comm_var_mutex is for protecting other variables that worker and comm thread share such as writeableDgrams. Change-Id: I08f8cf71846d91dd48afd28bd0ad83447f5ba0d4 --- src/arch/netlrts/machine-dgram.c | 26 +++++++++++++++++----- src/arch/netlrts/machine-eth.c | 47 +++++++++++++++++++++++++++++++--------- src/arch/netlrts/machine-tcp.c | 30 ++++++++++++++++--------- src/arch/netlrts/machine.c | 26 ++++------------------ 4 files changed, 81 insertions(+), 48 deletions(-) diff --git a/src/arch/netlrts/machine-dgram.c b/src/arch/netlrts/machine-dgram.c index b5dec6acfc..c8d7a5b256 100644 --- a/src/arch/netlrts/machine-dgram.c +++ b/src/arch/netlrts/machine-dgram.c @@ -97,7 +97,7 @@ static double Cmi_ack_delay; static int Cmi_dgram_max_data; static int Cmi_comm_periodic_delay; static int Cmi_comm_clock_delay; -static int writeableAcks,writeableDgrams;/*Write-queue counts (to know when to sleep)*/ +static int CMK_SMP_volatile writeableAcks,writeableDgrams;/*Write-queue counts (to know when to sleep)*/ static void setspeed_atm() { @@ -180,7 +180,7 @@ typedef struct OutgoingMsgStruct typedef struct ExplicitDgramStruct { - struct ExplicitDgramStruct *next; + struct ExplicitDgramStruct CMK_SMP_volatile *next; int srcpe, rank, seqno, broot; unsigned int len, dummy; /* dummy to fix bug in rs6k alignment */ double data[1]; @@ -189,7 +189,7 @@ typedef struct ExplicitDgramStruct typedef struct ImplicitDgramStruct { - struct ImplicitDgramStruct *next; + struct ImplicitDgramStruct CMK_SMP_volatile *next; struct OtherNodeStruct *dest; int srcpe, rank, seqno, broot; char *dataptr; @@ -215,12 +215,13 @@ typedef struct OtherNodeStruct #if CMK_USE_TCP SOCKET sock; /* for TCP */ #endif + CmiNodeLock send_queue_lock; unsigned int send_last; /* seqno of last dgram sent */ ImplicitDgram *send_window; /* datagrams sent, not acked */ - ImplicitDgram send_queue_h; /* head of send queue */ - ImplicitDgram send_queue_t; /* tail of send queue */ - unsigned int send_next; /* next seqno to go into queue */ + ImplicitDgram CMK_SMP_volatile send_queue_h; /* head of send queue */ + ImplicitDgram CMK_SMP_volatile send_queue_t; /* tail of send queue */ + unsigned int CMK_SMP_volatile send_next; /* next seqno to go into queue */ unsigned int send_good; /* last acknowledged seqno */ double send_primer; /* time to send retransmit */ unsigned int send_ack_seqno; /* next ack seqno to send */ @@ -270,6 +271,8 @@ static void OtherNode_init(OtherNode node) node->send_good=(unsigned int)(-1); node->send_ack_seqno=0; + node->send_queue_lock = CmiCreateLock(); + /* TODO: The initial values of the Ammasso related members will be set by the machine layer as the QPs are being created (along with any initial values). After all the details @@ -462,37 +465,46 @@ void printNetStatistics(void) /************** free list management *****************/ +static CmiNodeLock Cmi_freelist_mutex; static ExplicitDgram Cmi_freelist_explicit; static ImplicitDgram Cmi_freelist_implicit; /*static OutgoingMsg Cmi_freelist_outgoing;*/ #define FreeImplicitDgram(dg) {\ + CmiLock(Cmi_freelist_mutex);\ ImplicitDgram d=(dg);\ d->next = Cmi_freelist_implicit;\ Cmi_freelist_implicit = d;\ + CmiUnlock(Cmi_freelist_mutex);\ } #define MallocImplicitDgram(dg) {\ + CmiLock(Cmi_freelist_mutex);\ ImplicitDgram d = Cmi_freelist_implicit;\ if (d==0) {d = ((ImplicitDgram)malloc(sizeof(struct ImplicitDgramStruct)));\ _MEMCHECK(d);\ } else Cmi_freelist_implicit = d->next;\ dg = d;\ + CmiUnlock(Cmi_freelist_mutex);\ } #define FreeExplicitDgram(dg) {\ + CmiLock(Cmi_freelist_mutex);\ ExplicitDgram d=(dg);\ d->next = Cmi_freelist_explicit;\ Cmi_freelist_explicit = d;\ + CmiUnlock(Cmi_freelist_mutex);\ } #define MallocExplicitDgram(dg) {\ + CmiLock(Cmi_freelist_mutex);\ ExplicitDgram d = Cmi_freelist_explicit;\ if (d==0) { d = ((ExplicitDgram)malloc \ (sizeof(struct ExplicitDgramStruct) + Cmi_max_dgram_size));\ _MEMCHECK(d);\ } else Cmi_freelist_explicit = d->next;\ dg = d;\ + CmiUnlock(Cmi_freelist_mutex);\ } /* Careful with these next two, need concurrency control */ @@ -557,7 +569,9 @@ static void CommunicationsClock(void) MACHSTATE(2,"CommunicationsClock timing out acks"); Cmi_ack_last=Cmi_clock; writeableAcks=1; + CmiLock(Cmi_comm_var_mutex); writeableDgrams=1; + CmiUnlock(Cmi_comm_var_mutex); } if (Cmi_clock > Cmi_check_last + Cmi_check_delay) { diff --git a/src/arch/netlrts/machine-eth.c b/src/arch/netlrts/machine-eth.c index 4ebc91b124..3c909be47d 100644 --- a/src/arch/netlrts/machine-eth.c +++ b/src/arch/netlrts/machine-eth.c @@ -39,8 +39,11 @@ void LrtsBeginIdle() {} ***************************************************************************/ int CheckSocketsReady(int withDelayMs) -{ - int nreadable,dataWrite=writeableDgrams || writeableAcks; +{ + int nreadable,dataWrite; + CmiLock(Cmi_comm_var_mutex); + dataWrite = writeableDgrams || writeableAcks; + CmiUnlock(Cmi_comm_var_mutex); CMK_PIPE_DECL(withDelayMs); @@ -250,6 +253,7 @@ int TransmitDatagram() for (skip=0; skipsend_queue_lock); dg = node->send_queue_h; if (dg) { seqno = dg->seqno; @@ -257,10 +261,13 @@ int TransmitDatagram() if (node->send_window[slot] == 0) { node->send_queue_h = dg->next; node->send_window[slot] = dg; - TransmitImplicitDgram(dg); + CmiUnlock(node->send_queue_lock); + TransmitImplicitDgram(dg); /*Actual transmisson happens here*/ + CmiLock(node->send_queue_lock); if (seqno == ((node->send_last+1)&DGRAM_SEQNO_MASK)) node->send_last = seqno; node->send_primer = Cmi_clock + Cmi_delay_retransmit; + CmiUnlock(node->send_queue_lock); return 1; } } @@ -268,15 +275,21 @@ int TransmitDatagram() slot = (node->send_last % Cmi_window_size); for (count=0; countsend_window[slot]; - if (dg) break; + if (dg) { + break; + } slot = ((slot+Cmi_window_size-1) % Cmi_window_size); } if (dg) { - TransmitImplicitDgram1(node->send_window[slot]); + CmiUnlock(node->send_queue_lock); + TransmitImplicitDgram1(node->send_window[slot]); /*Actual transmisson happens here*/ + CmiLock(node->send_queue_lock); node->send_primer = Cmi_clock + Cmi_delay_retransmit; + CmiUnlock(node->send_queue_lock); return 1; } } + CmiUnlock(node->send_queue_lock); } return 0; } @@ -333,18 +346,22 @@ void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int b ogm->size,node->nodestart+rank); size = ogm->size - DGRAM_HEADER_SIZE; data = ogm->data + DGRAM_HEADER_SIZE; - writeableDgrams++; + + CmiLock(node->send_queue_lock); while (size > Cmi_dgram_max_data) { EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot); data += Cmi_dgram_max_data; size -= Cmi_dgram_max_data; } EnqueueOutgoingDgram(ogm, data, size, node, rank, broot); + CmiUnlock(node->send_queue_lock); myNode->sent_msgs++; myNode->sent_bytes += ogm->size; /*Try to immediately send the packets off*/ + CmiLock(Cmi_comm_var_mutex); writeableDgrams=1; + CmiUnlock(Cmi_comm_var_mutex); } @@ -559,7 +576,9 @@ void IntegrateAckDatagram(ExplicitDgram dg) } /* higher ack so adjust */ node->recv_ack_seqno = ackseqno; + CmiLock(Cmi_comm_var_mutex); writeableDgrams=1; /* May have freed up some send slots */ + CmiUnlock(Cmi_comm_var_mutex); for (i=Cmi_window_size-1; i>=0; i--) { slot--; if (slot== ((unsigned int)-1)) slot+=Cmi_window_size; @@ -676,8 +695,7 @@ static void CommunicationServerNet(int sleepTime, int where) { unsigned int nTimes=0; /* Loop counter */ LOG(GetClock(), Cmi_nodestartGlobal, 'I', 0, 0); - MACHSTATE2(1,"CommunicationsServer(%d,%d)", - sleepTime,writeableAcks||writeableDgrams) + MACHSTATE1(1,"CommunicationsServer(%d)", sleepTime) #if !CMK_SHARED_VARS_UNAVAILABLE /*SMP mode: comm. lock is precious*/ if (sleepTime!=0) {/*Sleep *without* holding the comm. lock*/ MACHSTATE(1,"CommServer going to sleep (NO LOCK)"); @@ -708,8 +726,17 @@ static void CommunicationServerNet(int sleepTime, int where) if (dataskt_ready_write) { if (writeableAcks) if (0!=(writeableAcks=TransmitAcknowledgement())) again=1; - if (writeableDgrams) - if (0!=(writeableDgrams=TransmitDatagram())) again=1; + CmiLock(Cmi_comm_var_mutex); + if (writeableDgrams) { + CmiUnlock(Cmi_comm_var_mutex); + int temp; + if (0!=(temp=TransmitDatagram())) { + again=1; + CmiLock(Cmi_comm_var_mutex); + writeableDgrams = temp; + CmiUnlock(Cmi_comm_var_mutex); + } + } else CmiUnlock(Cmi_comm_var_mutex); } if (CmiStdoutNeedsService()) {CmiStdoutService();} if (!again) break; /* Nothing more to do */ diff --git a/src/arch/netlrts/machine-tcp.c b/src/arch/netlrts/machine-tcp.c index f343cdb187..12223137c9 100644 --- a/src/arch/netlrts/machine-tcp.c +++ b/src/arch/netlrts/machine-tcp.c @@ -82,7 +82,9 @@ static char sockWriteStates[1000] = {0}; #define CMK_PIPE_ADDREADWRITE(afd) \ CMK_PIPE_ADDREAD(afd); \ - if (nodes[i].send_queue_h) fds[(*nFds)-1].events |= POLLOUT; + CmiLock(nodes[i].send_queue_lock); \ + if (nodes[i].send_queue_h) fds[(*nFds)-1].events |= POLLOUT; \ + CmiUnlock(nodes[i].send_queue_lock); #undef CMK_PIPE_CHECKWRITE #define CMK_PIPE_CHECKWRITE(afd) \ @@ -108,8 +110,10 @@ static char sockWriteStates[1000] = {0}; for (i=0; i0) { int again=0; sleepTime=0; - CmiCheckSocks(); + CmiCheckSocks(); /* Actual recv and send happens in here */ if (ctrlskt_ready_read) {again=1;ctrl_getone();} if (dataskt_ready_read || dataskt_ready_write) {again=1;} if (CmiStdoutNeedsService()) {CmiStdoutService();} @@ -461,14 +466,17 @@ int TransmitDatagram(int pe) unsigned int seqno; node = nodes+pe; + CmiLock(node->send_queue_lock); dg = node->send_queue_h; if (dg) { - if (TransmitImplicitDgram(dg)) { - node->send_queue_h = dg->next; - if (node->send_queue_h == NULL) node->send_queue_t = NULL; + node->send_queue_h = dg->next; + if (node->send_queue_h == NULL) node->send_queue_t = NULL; + CmiUnlock(node->send_queue_lock); + if (TransmitImplicitDgram(dg)) { /*Actual transmission of the datagram happens here*/ DiscardImplicitDgram(dg); } } + else CmiUnlock(node->send_queue_lock); return 0; } @@ -478,19 +486,19 @@ void EnqueueOutgoingDgram int seqno, dst, src; ImplicitDgram dg; src = ogm->src; dst = ogm->dst; - seqno = node->send_next; - node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK); MallocImplicitDgram(dg); dg->dest = node; dg->srcpe = src; dg->rank = rank; - dg->seqno = seqno; dg->broot = broot; dg->dataptr = ptr; dg->datalen = len; dg->ogm = ogm; ogm->refcount++; dg->next = 0; + seqno = node->send_next; + node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK); + dg->seqno = seqno; if (node->send_queue_h == 0) { node->send_queue_h = dg; node->send_queue_t = dg; @@ -510,12 +518,14 @@ void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int b size = ogm->size - DGRAM_HEADER_SIZE; data = ogm->data + DGRAM_HEADER_SIZE; + CmiLock(node->send_queue_lock); while (size > Cmi_dgram_max_data) { EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot); data += Cmi_dgram_max_data; size -= Cmi_dgram_max_data; } EnqueueOutgoingDgram(ogm, data, size, node, rank, broot); + CmiUnlock(node->send_queue_lock); } /*********************************************************************** diff --git a/src/arch/netlrts/machine.c b/src/arch/netlrts/machine.c index 87271b558a..ad7a80886b 100644 --- a/src/arch/netlrts/machine.c +++ b/src/arch/netlrts/machine.c @@ -786,6 +786,7 @@ void printLog(void) *****************************************************************************/ +static CmiNodeLock Cmi_comm_var_mutex; static CmiNodeLock Cmi_scanf_mutex; static double Cmi_clock; static double Cmi_check_delay = 3.0; @@ -1514,15 +1515,8 @@ int DeliverOutgoingMessage(OutgoingMsg ogm) rank = dst - node->nodestart; int acqLock = 0; if (node->nodestart != Cmi_nodestartGlobal) { -#if !CMK_SMP_NOT_RELAX_LOCK - LOCK_AND_SET(); -#endif DeliverViaNetwork(ogm, node, rank, DGRAM_ROOTPE_MASK, 0); - GarbageCollectMsg(ogm); -#if !CMK_SMP_NOT_RELAX_LOCK - UNLOCK_AND_UNSET(); -#endif - } + } #if CMK_MULTICORE network = 0; #endif @@ -1569,21 +1563,7 @@ CmiCommHandle LrtsSendFunc(int destNode, int pe, int size, char *data, int freem ogm=PrepareOutgoing(pe,size,'F',data); - int acqLock = 0; -#if CMK_SMP_NOT_RELAX_LOCK - LOCK_AND_SET(); -#endif - sendonnetwork = DeliverOutgoingMessage(ogm); - -#if CMK_SMP_NOT_RELAX_LOCK - UNLOCK_AND_UNSET(); -#endif - -//#if CMK_SMP -// if (sendonnetwork!=0) /* only call server when we send msg on network in SMP */ -// CommunicationServerNet(0, COMM_SERVER_FROM_WORKER); -//#endif MACHSTATE(1,"} LrtsSend"); return (CmiCommHandle)ogm; } @@ -1940,6 +1920,8 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) #endif extract_args(*argv); log_init(); + Cmi_comm_var_mutex = CmiCreateLock(); + Cmi_freelist_mutex = CmiCreateLock(); Cmi_scanf_mutex = CmiCreateLock(); /* NOTE: can not acutally call timer before timerInit ! GZ */ -- 2.11.4.GIT