From 07424787985341049294309c2e1f9e53ac8eee3a Mon Sep 17 00:00:00 2001 From: Matthew Dillon Date: Fri, 23 May 2008 06:55:11 +0000 Subject: [PATCH] Fix a pipelining performance issue due to the way reading from the socket was implemented. Instead of having the threads compete for read-access to the socket, create a single thread whos responsibility is to read traffic from the socket and route it to the appropriate transaction. This greatly improves parallel threading performance for cpdup's to remote hosts. --- bin/cpdup/cpdup.c | 6 +- bin/cpdup/hclink.c | 173 ++++++++++++++++++++++++++++++++++------------------- bin/cpdup/hclink.h | 5 +- 3 files changed, 118 insertions(+), 66 deletions(-) diff --git a/bin/cpdup/cpdup.c b/bin/cpdup/cpdup.c index eef6a6f6e6..64547be6af 100644 --- a/bin/cpdup/cpdup.c +++ b/bin/cpdup/cpdup.c @@ -45,7 +45,7 @@ * - Is able to do incremental mirroring/backups via hardlinks from * the 'previous' version (supplied with -H path). * - * $DragonFly: src/bin/cpdup/cpdup.c,v 1.27 2008/05/19 16:39:09 dillon Exp $ + * $DragonFly: src/bin/cpdup/cpdup.c,v 1.28 2008/05/23 06:55:11 dillon Exp $ */ /*- @@ -335,9 +335,9 @@ main(int ac, char **av) /* not reached */ } bzero(&info, sizeof(info)); +#if USE_PTHREADS info.r = 0; info.children = 0; -#if USE_PTHREADS pthread_cond_init(&info.cond, NULL); #endif if (dst) { @@ -404,7 +404,9 @@ main(int ac, char **av) static struct hlink * hltlookup(struct stat *stp) { +#if USE_PTHREADS struct timespec ts = { 0, 100000 }; +#endif struct hlink *hl; int n; diff --git a/bin/cpdup/hclink.c b/bin/cpdup/hclink.c index 82ac944aff..330b192025 100644 --- a/bin/cpdup/hclink.c +++ b/bin/cpdup/hclink.c @@ -3,14 +3,17 @@ * * This module implements a simple remote control protocol * - * $DragonFly: src/bin/cpdup/hclink.c,v 1.8 2008/04/14 05:40:51 dillon Exp $ + * $DragonFly: src/bin/cpdup/hclink.c,v 1.9 2008/05/23 06:55:11 dillon Exp $ */ #include "cpdup.h" #include "hclink.h" #include "hcproto.h" -static struct HCHead *hcc_read_command(hctransaction_t trans, int anypkt); +#if USE_PTHREADS +static void * hcc_reader_thread(void *arg); +#endif +static struct HCHead *hcc_read_command(struct HostConf *hc, hctransaction_t trans); static void hcc_start_reply(hctransaction_t trans, struct HCHead *rhead); int @@ -66,6 +69,9 @@ hcc_connect(struct HostConf *hc) hc->fdin = fdin[0]; close(fdout[0]); hc->fdout = fdout[1]; +#if USE_PTHREADS + pthread_create(&hc->reader_thread, NULL, hcc_reader_thread, hc); +#endif return(0); } } @@ -104,6 +110,9 @@ hcc_slave(int fdin, int fdout, struct HCDesc *descs, int count) hcslave.fdout = fdout; trans.hc = &hcslave; +#if USE_PTHREADS + pthread_mutex_unlock(&MasterMutex); +#endif /* * Process commands on fdin and write out results on fdout */ @@ -111,7 +120,7 @@ hcc_slave(int fdin, int fdout, struct HCDesc *descs, int count) /* * Get the command */ - head = hcc_read_command(&trans, 1); + head = hcc_read_command(trans.hc, &trans); if (head == NULL) break; @@ -153,91 +162,118 @@ hcc_slave(int fdin, int fdout, struct HCDesc *descs, int count) return(0); } +#if USE_PTHREADS +/* + * This thread collects responses from the link. It is run without + * the MasterMutex. + */ +static void * +hcc_reader_thread(void *arg) +{ + struct HostConf *hc = arg; + struct HCHead *rhead; + hctransaction_t scan; + int i; + + pthread_detach(pthread_self()); + while (hcc_read_command(hc, NULL) != NULL) + ; + hc->reader_thread = NULL; + + /* + * Clean up any threads stuck waiting for a reply. + */ + pthread_mutex_lock(&MasterMutex); + for (i = 0; i < HCTHASH_SIZE; ++i) { + for (scan = hc->hct_hash[i]; scan; scan = scan->next) { + if (scan->state == HCT_SENT) { + scan->state = HCT_REPLIED; + rhead = (void *)scan->rbuf; + rhead->error = ENOTCONN; + if (scan->waiting) + pthread_cond_signal(&scan->cond); + } + } + } + pthread_mutex_unlock(&MasterMutex); + return(NULL); +} + +#endif + /* * This reads a command from fdin, fixes up the byte ordering, and returns * a pointer to HCHead. * - * If id is -1 we do not match response id's + * The MasterMutex may or may not be held. When threaded this command + * is serialized by a reader thread. */ static struct HCHead * -hcc_read_command(hctransaction_t trans, int anypkt) +hcc_read_command(struct HostConf *hc, hctransaction_t trans) { - struct HostConf *hc = trans->hc; hctransaction_t fill; struct HCHead tmp; int aligned_bytes; int n; int r; -#if USE_PTHREADS - /* - * Shortcut if the reply has already been read in by another thread. - */ - if (trans->state == HCT_REPLIED) { - return((void *)trans->rbuf); + n = 0; + while (n < (int)sizeof(struct HCHead)) { + r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n); + if (r <= 0) + goto fail; + n += r; } - pthread_mutex_unlock(&MasterMutex); - pthread_mutex_lock(&hc->read_mutex); -#endif - while (trans->state != HCT_REPLIED) { - n = 0; - while (n < (int)sizeof(struct HCHead)) { - r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n); - if (r <= 0) - goto fail; - n += r; - } - assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536); - assert(tmp.magic == HCMAGIC); + assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536); + assert(tmp.magic == HCMAGIC); - if (anypkt == 0 && tmp.id != trans->id && trans->hc->version > 0) { + if (trans) { + fill = trans; + } else { #if USE_PTHREADS - for (fill = trans->hc->hct_hash[tmp.id & HCTHASH_MASK]; - fill; - fill = fill->next) - { - if (fill->state == HCT_SENT && fill->id == tmp.id) - break; - } - if (fill == NULL) + for (fill = hc->hct_hash[tmp.id & HCTHASH_MASK]; + fill; + fill = fill->next) + { + if (fill->state == HCT_SENT && fill->id == tmp.id) + break; + } + if (fill == NULL) #endif - { - fprintf(stderr, - "cpdup hlink protocol error with %s (%04x %04x)\n", - trans->hc->host, trans->id, tmp.id); - exit(1); - } - } else { - fill = trans; + { + fprintf(stderr, + "cpdup hlink protocol error with %s (%04x %04x)\n", + hc->host, trans->id, tmp.id); + exit(1); } + } - bcopy(&tmp, fill->rbuf, n); - aligned_bytes = HCC_ALIGN(tmp.bytes); + bcopy(&tmp, fill->rbuf, n); + aligned_bytes = HCC_ALIGN(tmp.bytes); - while (n < aligned_bytes) { - r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n); - if (r <= 0) - goto fail; - n += r; - } + while (n < aligned_bytes) { + r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n); + if (r <= 0) + goto fail; + n += r; + } #ifdef DEBUG - hcc_debug_dump(head); + hcc_debug_dump(head); #endif - fill->state = HCT_REPLIED; - } - trans->state = HCT_DONE; #if USE_PTHREADS - pthread_mutex_unlock(&hc->read_mutex); - pthread_mutex_lock(&MasterMutex); + n = pthread_mutex_lock(&MasterMutex); + assert(n == 0); #endif - return((void *)trans->rbuf); -fail: + fill->state = HCT_REPLIED; #if USE_PTHREADS - pthread_mutex_unlock(&hc->read_mutex); - pthread_mutex_lock(&MasterMutex); + if (fill->waiting) + pthread_cond_signal(&fill->cond); + pthread_mutex_unlock(&MasterMutex); #endif + return((void *)fill->rbuf); +fail: return(NULL); } @@ -263,6 +299,7 @@ hcc_get_trans(struct HostConf *hc) bzero(trans, sizeof(*trans)); trans->tid = tid; trans->id = i; + pthread_cond_init(&trans->cond, NULL); do { for (scan = hc->hct_hash[i]; scan; scan = scan->next) { if (scan->id == trans->id) { @@ -292,6 +329,7 @@ hcc_free_trans(struct HostConf *hc) trans = *transp; if (trans->tid == tid) { *transp = trans->next; + pthread_cond_destroy(&trans->cond); free(trans); break; } @@ -307,9 +345,8 @@ hcc_get_trans(struct HostConf *hc) return(&hc->trans); } -static void -hcc_free_trans(struct HostConf *hc) +hcc_free_trans(struct HostConf *hc __unused) { /* nop */ } @@ -391,7 +428,16 @@ hcc_finish_command(hctransaction_t trans) * whead is invalid when we call hcc_read_command() because * we may switch to another thread. */ - if ((rhead = hcc_read_command(trans, 0)) == NULL) { +#if USE_PTHREADS + while (trans->state != HCT_REPLIED && trans->hc->reader_thread) { + trans->waiting = 1; + pthread_cond_wait(&trans->cond, &MasterMutex); + } + rhead = (void *)trans->rbuf; +#else + rhead = hcc_read_command(trans->hc, trans); +#endif + if (trans->state != HCT_REPLIED || rhead->id != trans->id) { #ifdef __error *__error = EIO; #else @@ -402,6 +448,7 @@ hcc_finish_command(hctransaction_t trans) fprintf(stderr, "cpdup lost connection to %s\n", trans->hc->host); exit(1); } + trans->state = HCT_DONE; if (rhead->error) { #ifdef __error diff --git a/bin/cpdup/hclink.h b/bin/cpdup/hclink.h index afa27de0d2..d979bd7d24 100644 --- a/bin/cpdup/hclink.h +++ b/bin/cpdup/hclink.h @@ -1,7 +1,7 @@ /* * HCLINK.H * - * $DragonFly: src/bin/cpdup/hclink.h,v 1.5 2008/04/14 05:40:51 dillon Exp $ + * $DragonFly: src/bin/cpdup/hclink.h,v 1.6 2008/05/23 06:55:11 dillon Exp $ */ #ifndef _HCLINK_H_ @@ -24,6 +24,8 @@ typedef struct HCTransaction { enum { HCT_IDLE, HCT_SENT, HCT_REPLIED, HCT_DONE } state; #if USE_PTHREADS pthread_t tid; + pthread_cond_t cond; + int waiting; #endif char rbuf[65536]; /* input buffer */ char wbuf[65536]; /* output buffer */ @@ -45,6 +47,7 @@ struct HostConf { #if USE_PTHREADS pthread_mutex_t read_mutex; hctransaction_t hct_hash[HCTHASH_SIZE]; + pthread_t reader_thread; #else struct HCTransaction trans; #endif -- 2.11.4.GIT