4 * This module implements a simple remote control protocol
6 * $DragonFly: src/bin/cpdup/hclink.c,v 1.10 2008/05/24 17:21:36 dillon Exp $
14 static void * hcc_reader_thread(void *arg
);
16 static struct HCHead
*hcc_read_command(struct HostConf
*hc
, hctransaction_t trans
);
17 static void hcc_start_reply(hctransaction_t trans
, struct HCHead
*rhead
);
20 hcc_connect(struct HostConf
*hc
)
26 if (hc
== NULL
|| hc
->host
== NULL
)
31 if (pipe(fdout
) < 0) {
36 if ((hc
->pid
= fork()) == 0) {
59 execv("/usr/bin/ssh", (void *)av
);
61 } else if (hc
->pid
< 0) {
65 * Parent process. Do the initial handshake to make sure we are
66 * actually talking to a cpdup slave.
73 pthread_create(&hc
->reader_thread
, NULL
, hcc_reader_thread
, hc
);
80 rc_badop(hctransaction_t trans __unused
, struct HCHead
*head
)
82 head
->error
= EOPNOTSUPP
;
87 hcc_slave(int fdin
, int fdout
, struct HCDesc
*descs
, int count
)
89 struct HostConf hcslave
;
92 struct HCTransaction trans
;
93 int (*dispatch
[256])(hctransaction_t
, struct HCHead
*);
98 bzero(&hcslave
, sizeof(hcslave
));
99 bzero(&trans
, sizeof(trans
));
100 for (i
= 0; i
< count
; ++i
) {
101 struct HCDesc
*desc
= &descs
[i
];
102 assert(desc
->cmd
>= 0 && desc
->cmd
< 256);
103 dispatch
[desc
->cmd
] = desc
->func
;
105 for (i
= 0; i
< 256; ++i
) {
106 if (dispatch
[i
] == NULL
)
107 dispatch
[i
] = rc_badop
;
110 hcslave
.fdout
= fdout
;
114 pthread_mutex_unlock(&MasterMutex
);
117 * Process commands on fdin and write out results on fdout
123 head
= hcc_read_command(trans
.hc
, &trans
);
128 * Start the reply and dispatch, then process the return code.
131 hcc_start_reply(&trans
, head
);
133 r
= dispatch
[head
->cmd
& 255](&trans
, head
);
137 head
->error
= EINVAL
;
150 * Write out the reply
152 whead
= (void *)trans
.wbuf
;
153 whead
->bytes
= trans
.windex
;
154 whead
->error
= head
->error
;
155 aligned_bytes
= HCC_ALIGN(trans
.windex
);
157 hcc_debug_dump(whead
);
159 if (write(hcslave
.fdout
, whead
, aligned_bytes
) != aligned_bytes
)
167 * This thread collects responses from the link. It is run without
171 hcc_reader_thread(void *arg
)
173 struct HostConf
*hc
= arg
;
174 struct HCHead
*rhead
;
175 hctransaction_t scan
;
178 pthread_detach(pthread_self());
179 while (hcc_read_command(hc
, NULL
) != NULL
)
181 hc
->reader_thread
= NULL
;
184 * Clean up any threads stuck waiting for a reply.
186 pthread_mutex_lock(&MasterMutex
);
187 for (i
= 0; i
< HCTHASH_SIZE
; ++i
) {
188 pthread_mutex_lock(&hc
->hct_mutex
[i
]);
189 for (scan
= hc
->hct_hash
[i
]; scan
; scan
= scan
->next
) {
190 if (scan
->state
== HCT_SENT
) {
191 scan
->state
= HCT_REPLIED
;
192 rhead
= (void *)scan
->rbuf
;
193 rhead
->error
= ENOTCONN
;
195 pthread_cond_signal(&scan
->cond
);
198 pthread_mutex_unlock(&hc
->hct_mutex
[i
]);
200 pthread_mutex_unlock(&MasterMutex
);
207 * This reads a command from fdin, fixes up the byte ordering, and returns
208 * a pointer to HCHead.
210 * The MasterMutex may or may not be held. When threaded this command
211 * is serialized by a reader thread.
215 hcc_read_command(struct HostConf
*hc
, hctransaction_t trans
)
217 hctransaction_t fill
;
224 while (n
< (int)sizeof(struct HCHead
)) {
225 r
= read(hc
->fdin
, (char *)&tmp
+ n
, sizeof(struct HCHead
) - n
);
231 assert(tmp
.bytes
>= (int)sizeof(tmp
) && tmp
.bytes
< 65536);
232 assert(tmp
.magic
== HCMAGIC
);
238 pthread_mutex_lock(&hc
->hct_mutex
[tmp
.id
& HCTHASH_MASK
]);
239 for (fill
= hc
->hct_hash
[tmp
.id
& HCTHASH_MASK
];
243 if (fill
->state
== HCT_SENT
&& fill
->id
== tmp
.id
)
246 pthread_mutex_unlock(&hc
->hct_mutex
[tmp
.id
& HCTHASH_MASK
]);
251 "cpdup hlink protocol error with %s (%04x %04x)\n",
252 hc
->host
, trans
->id
, tmp
.id
);
257 bcopy(&tmp
, fill
->rbuf
, n
);
258 aligned_bytes
= HCC_ALIGN(tmp
.bytes
);
260 while (n
< aligned_bytes
) {
261 r
= read(hc
->fdin
, fill
->rbuf
+ n
, aligned_bytes
- n
);
267 hcc_debug_dump(head
);
270 pthread_mutex_lock(&hc
->hct_mutex
[fill
->id
& HCTHASH_MASK
]);
272 fill
->state
= HCT_REPLIED
;
275 pthread_cond_signal(&fill
->cond
);
276 pthread_mutex_unlock(&hc
->hct_mutex
[fill
->id
& HCTHASH_MASK
]);
278 return((void *)fill
->rbuf
);
287 hcc_get_trans(struct HostConf
*hc
)
289 hctransaction_t trans
;
290 hctransaction_t scan
;
291 pthread_t tid
= pthread_self();
294 i
= ((intptr_t)tid
>> 7) & HCTHASH_MASK
;
296 pthread_mutex_lock(&hc
->hct_mutex
[i
]);
297 for (trans
= hc
->hct_hash
[i
]; trans
; trans
= trans
->next
) {
298 if (trans
->tid
== tid
)
302 trans
= malloc(sizeof(*trans
));
303 bzero(trans
, sizeof(*trans
));
306 pthread_cond_init(&trans
->cond
, NULL
);
308 for (scan
= hc
->hct_hash
[i
]; scan
; scan
= scan
->next
) {
309 if (scan
->id
== trans
->id
) {
310 trans
->id
+= HCTHASH_SIZE
;
314 } while (scan
!= NULL
);
316 trans
->next
= hc
->hct_hash
[i
];
317 hc
->hct_hash
[i
] = trans
;
319 pthread_mutex_unlock(&hc
->hct_mutex
[i
]);
324 hcc_free_trans(struct HostConf
*hc
)
326 hctransaction_t trans
;
327 hctransaction_t
*transp
;
328 pthread_t tid
= pthread_self();
331 i
= ((intptr_t)tid
>> 7) & HCTHASH_MASK
;
333 pthread_mutex_lock(&hc
->hct_mutex
[i
]);
334 for (transp
= &hc
->hct_hash
[i
]; *transp
; transp
= &trans
->next
) {
336 if (trans
->tid
== tid
) {
337 *transp
= trans
->next
;
338 pthread_cond_destroy(&trans
->cond
);
343 pthread_mutex_unlock(&hc
->hct_mutex
[i
]);
350 hcc_get_trans(struct HostConf
*hc
)
356 hcc_free_trans(struct HostConf
*hc __unused
)
364 * Initialize for a new command
367 hcc_start_command(struct HostConf
*hc
, int16_t cmd
)
369 struct HCHead
*whead
;
370 hctransaction_t trans
;
372 trans
= hcc_get_trans(hc
);
374 whead
= (void *)trans
->wbuf
;
375 whead
->magic
= HCMAGIC
;
378 whead
->id
= trans
->id
;
381 trans
->windex
= sizeof(*whead
);
383 trans
->state
= HCT_IDLE
;
389 hcc_start_reply(hctransaction_t trans
, struct HCHead
*rhead
)
391 struct HCHead
*whead
= (void *)trans
->wbuf
;
393 whead
->magic
= HCMAGIC
;
395 whead
->cmd
= rhead
->cmd
| HCF_REPLY
;
396 whead
->id
= rhead
->id
;
399 trans
->windex
= sizeof(*whead
);
403 * Finish constructing a command, transmit it, and await the reply.
404 * Return the HCHead of the reply.
407 hcc_finish_command(hctransaction_t trans
)
410 struct HCHead
*whead
;
411 struct HCHead
*rhead
;
416 whead
= (void *)trans
->wbuf
;
417 whead
->bytes
= trans
->windex
;
418 aligned_bytes
= HCC_ALIGN(trans
->windex
);
420 trans
->state
= HCT_SENT
;
422 if (write(hc
->fdout
, whead
, aligned_bytes
) != aligned_bytes
) {
428 if (whead
->cmd
< 0x0010)
430 fprintf(stderr
, "cpdup lost connection to %s\n", hc
->host
);
437 * whead is invalid when we call hcc_read_command() because
438 * we may switch to another thread.
441 pthread_mutex_unlock(&MasterMutex
);
442 while (trans
->state
!= HCT_REPLIED
&& hc
->reader_thread
) {
443 pthread_mutex_t
*mtxp
= &hc
->hct_mutex
[trans
->id
& HCTHASH_MASK
];
444 pthread_mutex_lock(mtxp
);
446 if (trans
->state
!= HCT_REPLIED
&& hc
->reader_thread
)
447 pthread_cond_wait(&trans
->cond
, mtxp
);
449 pthread_mutex_unlock(mtxp
);
451 pthread_mutex_lock(&MasterMutex
);
452 rhead
= (void *)trans
->rbuf
;
454 rhead
= hcc_read_command(hc
, trans
);
456 if (trans
->state
!= HCT_REPLIED
|| rhead
->id
!= trans
->id
) {
464 fprintf(stderr
, "cpdup lost connection to %s\n", hc
->host
);
467 trans
->state
= HCT_DONE
;
471 *__error
= rhead
->error
;
473 errno
= rhead
->error
;
480 hcc_leaf_string(hctransaction_t trans
, int16_t leafid
, const char *str
)
483 int bytes
= strlen(str
) + 1;
485 item
= (void *)(trans
->wbuf
+ trans
->windex
);
486 assert(trans
->windex
+ sizeof(*item
) + bytes
< 65536);
487 item
->leafid
= leafid
;
489 item
->bytes
= sizeof(*item
) + bytes
;
490 bcopy(str
, item
+ 1, bytes
);
491 trans
->windex
= HCC_ALIGN(trans
->windex
+ item
->bytes
);
495 hcc_leaf_data(hctransaction_t trans
, int16_t leafid
, const void *ptr
, int bytes
)
499 item
= (void *)(trans
->wbuf
+ trans
->windex
);
500 assert(trans
->windex
+ sizeof(*item
) + bytes
< 65536);
501 item
->leafid
= leafid
;
503 item
->bytes
= sizeof(*item
) + bytes
;
504 bcopy(ptr
, item
+ 1, bytes
);
505 trans
->windex
= HCC_ALIGN(trans
->windex
+ item
->bytes
);
509 hcc_leaf_int32(hctransaction_t trans
, int16_t leafid
, int32_t value
)
513 item
= (void *)(trans
->wbuf
+ trans
->windex
);
514 assert(trans
->windex
+ sizeof(*item
) + sizeof(value
) < 65536);
515 item
->leafid
= leafid
;
517 item
->bytes
= sizeof(*item
) + sizeof(value
);
518 *(int32_t *)(item
+ 1) = value
;
519 trans
->windex
= HCC_ALIGN(trans
->windex
+ item
->bytes
);
523 hcc_leaf_int64(hctransaction_t trans
, int16_t leafid
, int64_t value
)
527 item
= (void *)(trans
->wbuf
+ trans
->windex
);
528 assert(trans
->windex
+ sizeof(*item
) + sizeof(value
) < 65536);
529 item
->leafid
= leafid
;
531 item
->bytes
= sizeof(*item
) + sizeof(value
);
532 *(int64_t *)(item
+ 1) = value
;
533 trans
->windex
= HCC_ALIGN(trans
->windex
+ item
->bytes
);
537 hcc_alloc_descriptor(struct HostConf
*hc
, void *ptr
, int type
)
539 struct HCHostDesc
*hd
;
540 struct HCHostDesc
*hnew
;
542 hnew
= malloc(sizeof(struct HCHostDesc
));
546 if ((hd
= hc
->hostdescs
) != NULL
) {
547 hnew
->desc
= hd
->desc
+ 1;
552 hc
->hostdescs
= hnew
;
557 hcc_get_descriptor(struct HostConf
*hc
, int desc
, int type
)
559 struct HCHostDesc
*hd
;
561 for (hd
= hc
->hostdescs
; hd
; hd
= hd
->next
) {
562 if (hd
->desc
== desc
&& hd
->type
== type
)
569 hcc_set_descriptor(struct HostConf
*hc
, int desc
, void *ptr
, int type
)
571 struct HCHostDesc
*hd
;
572 struct HCHostDesc
**hdp
;
574 for (hdp
= &hc
->hostdescs
; (hd
= *hdp
) != NULL
; hdp
= &hd
->next
) {
575 if (hd
->desc
== desc
) {
587 hd
= malloc(sizeof(*hd
));
591 hd
->next
= hc
->hostdescs
;
597 hcc_firstitem(struct HCHead
*head
)
602 offset
= sizeof(*head
);
603 if (offset
== head
->bytes
)
605 assert(head
->bytes
>= offset
+ (int)sizeof(*item
));
606 item
= (void *)(head
+ 1);
607 assert(head
->bytes
>= offset
+ item
->bytes
);
608 assert(item
->bytes
>= (int)sizeof(*item
) && item
->bytes
< 65536 - offset
);
613 hcc_nextitem(struct HCHead
*head
, struct HCLeaf
*item
)
617 item
= (void *)((char *)item
+ HCC_ALIGN(item
->bytes
));
618 offset
= (char *)item
- (char *)head
;
619 if (offset
== head
->bytes
)
621 assert(head
->bytes
>= offset
+ (int)sizeof(*item
));
622 assert(head
->bytes
>= offset
+ item
->bytes
);
623 assert(item
->bytes
>= (int)sizeof(*item
) && item
->bytes
< 65536 - offset
);
630 hcc_debug_dump(struct HCHead
*head
)
633 int aligned_bytes
= HCC_ALIGN(head
->bytes
);
635 fprintf(stderr
, "DUMP %04x (%d)", (u_int16_t
)head
->cmd
, aligned_bytes
);
636 if (head
->cmd
& HCF_REPLY
)
637 fprintf(stderr
, " error %d", head
->error
);
638 fprintf(stderr
, "\n");
639 for (item
= hcc_firstitem(head
); item
; item
= hcc_nextitem(head
, item
)) {
640 fprintf(stderr
, " ITEM %04x DATA ", item
->leafid
);
641 switch(item
->leafid
& LCF_TYPEMASK
) {
643 fprintf(stderr
, "int32 %d\n", *(int32_t *)(item
+ 1));
646 fprintf(stderr
, "int64 %lld\n", *(int64_t *)(item
+ 1));
649 fprintf(stderr
, "\"%s\"\n", (char *)(item
+ 1));
652 fprintf(stderr
, "(binary)\n");