2 Unix SMB/CIFS implementation.
4 Samba internal messaging functions
6 Copyright (C) Andrew Tridgell 2004
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "lib/events/events.h"
24 #include "system/filesys.h"
25 #include "messaging/messaging.h"
26 #include "../lib/util/dlinklist.h"
27 #include "lib/socket/socket.h"
28 #include "librpc/gen_ndr/ndr_irpc.h"
29 #include "lib/messaging/irpc.h"
31 #include "../lib/util/unix_privs.h"
32 #include "librpc/rpc/dcerpc.h"
33 #include "../tdb/include/tdb.h"
34 #include "../lib/util/util_tdb.h"
35 #include "cluster/cluster.h"
37 /* change the message version with any incompatible changes in the protocol */
38 #define MESSAGING_VERSION 1
40 struct messaging_context
{
41 struct server_id server_id
;
42 struct socket_context
*sock
;
43 const char *base_path
;
45 struct dispatch_fn
**dispatch
;
47 struct idr_context
*dispatch_tree
;
48 struct messaging_rec
*pending
;
49 struct messaging_rec
*retry_queue
;
50 struct smb_iconv_convenience
*iconv_convenience
;
51 struct irpc_list
*irpc
;
52 struct idr_context
*idr
;
54 struct timeval start_time
;
55 struct tevent_timer
*retry_te
;
57 struct tevent_context
*ev
;
58 struct tevent_fd
*fde
;
62 /* we have a linked list of dispatch handlers for each msg_type that
63 this messaging server can deal with */
65 struct dispatch_fn
*next
, *prev
;
71 /* an individual message */
72 struct messaging_rec
{
73 struct messaging_rec
*next
, *prev
;
74 struct messaging_context
*msg
;
77 struct messaging_header
{
80 struct server_id from
;
90 static void irpc_handler(struct messaging_context
*, void *,
91 uint32_t, struct server_id
, DATA_BLOB
*);
95 A useful function for testing the message system.
97 static void ping_message(struct messaging_context
*msg
, void *private_data
,
98 uint32_t msg_type
, struct server_id src
, DATA_BLOB
*data
)
100 DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n",
101 (uint_t
)src
.node
, (uint_t
)src
.id
, (int)data
->length
,
102 data
->data
?(const char *)data
->data
:""));
103 messaging_send(msg
, src
, MSG_PONG
, data
);
107 return uptime of messaging server via irpc
109 static NTSTATUS
irpc_uptime(struct irpc_message
*msg
,
110 struct irpc_uptime
*r
)
112 struct messaging_context
*ctx
= talloc_get_type(msg
->private_data
, struct messaging_context
);
113 *r
->out
.start_time
= timeval_to_nttime(&ctx
->start_time
);
118 return the path to a messaging socket
120 static char *messaging_path(struct messaging_context
*msg
, struct server_id server_id
)
122 return talloc_asprintf(msg
, "%s/msg.%s", msg
->base_path
,
123 cluster_id_string(msg
, server_id
));
127 dispatch a fully received message
129 note that this deliberately can match more than one message handler
130 per message. That allows a single messasging context to register
131 (for example) a debug handler for more than one piece of code
133 static void messaging_dispatch(struct messaging_context
*msg
, struct messaging_rec
*rec
)
135 struct dispatch_fn
*d
, *next
;
137 /* temporary IDs use an idtree, the rest use a array of pointers */
138 if (rec
->header
->msg_type
>= MSG_TMP_BASE
) {
139 d
= (struct dispatch_fn
*)idr_find(msg
->dispatch_tree
,
140 rec
->header
->msg_type
);
141 } else if (rec
->header
->msg_type
< msg
->num_types
) {
142 d
= msg
->dispatch
[rec
->header
->msg_type
];
147 for (; d
; d
= next
) {
150 data
.data
= rec
->packet
.data
+ sizeof(*rec
->header
);
151 data
.length
= rec
->header
->length
;
152 d
->fn(msg
, d
->private_data
, d
->msg_type
, rec
->header
->from
, &data
);
154 rec
->header
->length
= 0;
158 handler for messages that arrive from other nodes in the cluster
160 static void cluster_message_handler(struct messaging_context
*msg
, DATA_BLOB packet
)
162 struct messaging_rec
*rec
;
164 rec
= talloc(msg
, struct messaging_rec
);
166 smb_panic("Unable to allocate messaging_rec");
170 rec
->path
= msg
->path
;
171 rec
->header
= (struct messaging_header
*)packet
.data
;
172 rec
->packet
= packet
;
175 if (packet
.length
!= sizeof(*rec
->header
) + rec
->header
->length
) {
176 DEBUG(0,("messaging: bad message header size %d should be %d\n",
177 rec
->header
->length
, (int)(packet
.length
- sizeof(*rec
->header
))));
182 messaging_dispatch(msg
, rec
);
189 try to send the message
191 static NTSTATUS
try_send(struct messaging_rec
*rec
)
193 struct messaging_context
*msg
= rec
->msg
;
197 struct socket_address
*path
;
199 /* rec->path is the path of the *other* socket, where we want
201 path
= socket_address_from_strings(msg
, msg
->sock
->backend_name
,
204 return NT_STATUS_NO_MEMORY
;
207 /* we send with privileges so messages work from any context */
208 priv
= root_privileges();
209 status
= socket_sendto(msg
->sock
, &rec
->packet
, &nsent
, path
);
217 retry backed off messages
219 static void msg_retry_timer(struct tevent_context
*ev
, struct tevent_timer
*te
,
220 struct timeval t
, void *private_data
)
222 struct messaging_context
*msg
= talloc_get_type(private_data
,
223 struct messaging_context
);
224 msg
->retry_te
= NULL
;
226 /* put the messages back on the main queue */
227 while (msg
->retry_queue
) {
228 struct messaging_rec
*rec
= msg
->retry_queue
;
229 DLIST_REMOVE(msg
->retry_queue
, rec
);
230 DLIST_ADD_END(msg
->pending
, rec
, struct messaging_rec
*);
233 EVENT_FD_WRITEABLE(msg
->event
.fde
);
237 handle a socket write event
239 static void messaging_send_handler(struct messaging_context
*msg
)
241 while (msg
->pending
) {
242 struct messaging_rec
*rec
= msg
->pending
;
244 status
= try_send(rec
);
245 if (NT_STATUS_EQUAL(status
, STATUS_MORE_ENTRIES
)) {
247 if (rec
->retries
> 3) {
248 /* we're getting continuous write errors -
249 backoff this record */
250 DLIST_REMOVE(msg
->pending
, rec
);
251 DLIST_ADD_END(msg
->retry_queue
, rec
,
252 struct messaging_rec
*);
253 if (msg
->retry_te
== NULL
) {
255 event_add_timed(msg
->event
.ev
, msg
,
256 timeval_current_ofs(1, 0),
257 msg_retry_timer
, msg
);
263 if (!NT_STATUS_IS_OK(status
)) {
264 DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n",
265 cluster_id_string(debug_ctx(), rec
->header
->from
),
266 cluster_id_string(debug_ctx(), rec
->header
->to
),
267 rec
->header
->msg_type
,
270 DLIST_REMOVE(msg
->pending
, rec
);
273 if (msg
->pending
== NULL
) {
274 EVENT_FD_NOT_WRITEABLE(msg
->event
.fde
);
279 handle a new incoming packet
281 static void messaging_recv_handler(struct messaging_context
*msg
)
283 struct messaging_rec
*rec
;
288 /* see how many bytes are in the next packet */
289 status
= socket_pending(msg
->sock
, &msize
);
290 if (!NT_STATUS_IS_OK(status
)) {
291 DEBUG(0,("socket_pending failed in messaging - %s\n",
296 packet
= data_blob_talloc(msg
, NULL
, msize
);
297 if (packet
.data
== NULL
) {
298 /* assume this is temporary and retry */
302 status
= socket_recv(msg
->sock
, packet
.data
, msize
, &msize
);
303 if (!NT_STATUS_IS_OK(status
)) {
304 data_blob_free(&packet
);
308 if (msize
< sizeof(*rec
->header
)) {
309 DEBUG(0,("messaging: bad message of size %d\n", (int)msize
));
310 data_blob_free(&packet
);
314 rec
= talloc(msg
, struct messaging_rec
);
316 smb_panic("Unable to allocate messaging_rec");
319 talloc_steal(rec
, packet
.data
);
321 rec
->path
= msg
->path
;
322 rec
->header
= (struct messaging_header
*)packet
.data
;
323 rec
->packet
= packet
;
326 if (msize
!= sizeof(*rec
->header
) + rec
->header
->length
) {
327 DEBUG(0,("messaging: bad message header size %d should be %d\n",
328 rec
->header
->length
, (int)(msize
- sizeof(*rec
->header
))));
333 messaging_dispatch(msg
, rec
);
339 handle a socket event
341 static void messaging_handler(struct tevent_context
*ev
, struct tevent_fd
*fde
,
342 uint16_t flags
, void *private_data
)
344 struct messaging_context
*msg
= talloc_get_type(private_data
,
345 struct messaging_context
);
346 if (flags
& EVENT_FD_WRITE
) {
347 messaging_send_handler(msg
);
349 if (flags
& EVENT_FD_READ
) {
350 messaging_recv_handler(msg
);
356 Register a dispatch function for a particular message type.
358 NTSTATUS
messaging_register(struct messaging_context
*msg
, void *private_data
,
359 uint32_t msg_type
, msg_callback_t fn
)
361 struct dispatch_fn
*d
;
363 /* possibly expand dispatch array */
364 if (msg_type
>= msg
->num_types
) {
365 struct dispatch_fn
**dp
;
367 dp
= talloc_realloc(msg
, msg
->dispatch
, struct dispatch_fn
*, msg_type
+1);
368 NT_STATUS_HAVE_NO_MEMORY(dp
);
370 for (i
=msg
->num_types
;i
<=msg_type
;i
++) {
371 msg
->dispatch
[i
] = NULL
;
373 msg
->num_types
= msg_type
+1;
376 d
= talloc_zero(msg
->dispatch
, struct dispatch_fn
);
377 NT_STATUS_HAVE_NO_MEMORY(d
);
378 d
->msg_type
= msg_type
;
379 d
->private_data
= private_data
;
382 DLIST_ADD(msg
->dispatch
[msg_type
], d
);
388 register a temporary message handler. The msg_type is allocated
391 NTSTATUS
messaging_register_tmp(struct messaging_context
*msg
, void *private_data
,
392 msg_callback_t fn
, uint32_t *msg_type
)
394 struct dispatch_fn
*d
;
397 d
= talloc_zero(msg
->dispatch
, struct dispatch_fn
);
398 NT_STATUS_HAVE_NO_MEMORY(d
);
399 d
->private_data
= private_data
;
402 id
= idr_get_new_above(msg
->dispatch_tree
, d
, MSG_TMP_BASE
, UINT16_MAX
);
405 return NT_STATUS_TOO_MANY_CONTEXT_IDS
;
408 d
->msg_type
= (uint32_t)id
;
409 (*msg_type
) = d
->msg_type
;
415 De-register the function for a particular message type.
417 void messaging_deregister(struct messaging_context
*msg
, uint32_t msg_type
, void *private_data
)
419 struct dispatch_fn
*d
, *next
;
421 if (msg_type
>= msg
->num_types
) {
422 d
= (struct dispatch_fn
*)idr_find(msg
->dispatch_tree
,
425 idr_remove(msg
->dispatch_tree
, msg_type
);
430 for (d
= msg
->dispatch
[msg_type
]; d
; d
= next
) {
432 if (d
->private_data
== private_data
) {
433 DLIST_REMOVE(msg
->dispatch
[msg_type
], d
);
440 Send a message to a particular server
442 NTSTATUS
messaging_send(struct messaging_context
*msg
, struct server_id server
,
443 uint32_t msg_type
, DATA_BLOB
*data
)
445 struct messaging_rec
*rec
;
447 size_t dlength
= data
?data
->length
:0;
449 rec
= talloc(msg
, struct messaging_rec
);
451 return NT_STATUS_NO_MEMORY
;
454 rec
->packet
= data_blob_talloc(rec
, NULL
, sizeof(*rec
->header
) + dlength
);
455 if (rec
->packet
.data
== NULL
) {
457 return NT_STATUS_NO_MEMORY
;
462 rec
->header
= (struct messaging_header
*)rec
->packet
.data
;
464 ZERO_STRUCTP(rec
->header
);
465 rec
->header
->version
= MESSAGING_VERSION
;
466 rec
->header
->msg_type
= msg_type
;
467 rec
->header
->from
= msg
->server_id
;
468 rec
->header
->to
= server
;
469 rec
->header
->length
= dlength
;
471 memcpy(rec
->packet
.data
+ sizeof(*rec
->header
),
472 data
->data
, dlength
);
475 if (!cluster_node_equal(&msg
->server_id
, &server
)) {
476 /* the destination is on another node - dispatch via
478 status
= cluster_message_send(server
, &rec
->packet
);
483 rec
->path
= messaging_path(msg
, server
);
484 talloc_steal(rec
, rec
->path
);
486 if (msg
->pending
!= NULL
) {
487 status
= STATUS_MORE_ENTRIES
;
489 status
= try_send(rec
);
492 if (NT_STATUS_EQUAL(status
, STATUS_MORE_ENTRIES
)) {
493 if (msg
->pending
== NULL
) {
494 EVENT_FD_WRITEABLE(msg
->event
.fde
);
496 DLIST_ADD_END(msg
->pending
, rec
, struct messaging_rec
*);
506 Send a message to a particular server, with the message containing a single pointer
508 NTSTATUS
messaging_send_ptr(struct messaging_context
*msg
, struct server_id server
,
509 uint32_t msg_type
, void *ptr
)
513 blob
.data
= (uint8_t *)&ptr
;
514 blob
.length
= sizeof(void *);
516 return messaging_send(msg
, server
, msg_type
, &blob
);
521 destroy the messaging context
523 static int messaging_destructor(struct messaging_context
*msg
)
526 while (msg
->names
&& msg
->names
[0]) {
527 irpc_remove_name(msg
, msg
->names
[0]);
533 create the listening socket and setup the dispatcher
535 struct messaging_context
*messaging_init(TALLOC_CTX
*mem_ctx
,
537 struct server_id server_id
,
538 struct smb_iconv_convenience
*iconv_convenience
,
539 struct tevent_context
*ev
)
541 struct messaging_context
*msg
;
543 struct socket_address
*path
;
549 msg
= talloc_zero(mem_ctx
, struct messaging_context
);
554 /* setup a handler for messages from other cluster nodes, if appropriate */
555 status
= cluster_message_init(msg
, server_id
, cluster_message_handler
);
556 if (!NT_STATUS_IS_OK(status
)) {
561 /* create the messaging directory if needed */
564 msg
->base_path
= talloc_reference(msg
, dir
);
565 msg
->path
= messaging_path(msg
, server_id
);
566 msg
->server_id
= server_id
;
567 msg
->iconv_convenience
= iconv_convenience
;
568 msg
->idr
= idr_init(msg
);
569 msg
->dispatch_tree
= idr_init(msg
);
570 msg
->start_time
= timeval_current();
572 status
= socket_create("unix", SOCKET_TYPE_DGRAM
, &msg
->sock
, 0);
573 if (!NT_STATUS_IS_OK(status
)) {
578 /* by stealing here we ensure that the socket is cleaned up (and even
580 talloc_steal(msg
, msg
->sock
);
582 path
= socket_address_from_strings(msg
, msg
->sock
->backend_name
,
589 status
= socket_listen(msg
->sock
, path
, 50, 0);
590 if (!NT_STATUS_IS_OK(status
)) {
591 DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg
->path
, nt_errstr(status
)));
596 /* it needs to be non blocking for sends */
597 set_blocking(socket_get_fd(msg
->sock
), false);
600 msg
->event
.fde
= event_add_fd(ev
, msg
, socket_get_fd(msg
->sock
),
601 EVENT_FD_READ
, messaging_handler
, msg
);
603 talloc_set_destructor(msg
, messaging_destructor
);
605 messaging_register(msg
, NULL
, MSG_PING
, ping_message
);
606 messaging_register(msg
, NULL
, MSG_IRPC
, irpc_handler
);
607 IRPC_REGISTER(msg
, irpc
, IRPC_UPTIME
, irpc_uptime
, msg
);
613 A hack, for the short term until we get 'client only' messaging in place
615 struct messaging_context
*messaging_client_init(TALLOC_CTX
*mem_ctx
,
617 struct smb_iconv_convenience
*iconv_convenience
,
618 struct tevent_context
*ev
)
622 id
.id
= random() % 0x10000000;
623 return messaging_init(mem_ctx
, dir
, id
, iconv_convenience
, ev
);
626 a list of registered irpc server functions
629 struct irpc_list
*next
, *prev
;
631 const struct ndr_interface_table
*table
;
639 register a irpc server function
641 NTSTATUS
irpc_register(struct messaging_context
*msg_ctx
,
642 const struct ndr_interface_table
*table
,
643 int callnum
, irpc_function_t fn
, void *private_data
)
645 struct irpc_list
*irpc
;
647 /* override an existing handler, if any */
648 for (irpc
=msg_ctx
->irpc
; irpc
; irpc
=irpc
->next
) {
649 if (irpc
->table
== table
&& irpc
->callnum
== callnum
) {
654 irpc
= talloc(msg_ctx
, struct irpc_list
);
655 NT_STATUS_HAVE_NO_MEMORY(irpc
);
656 DLIST_ADD(msg_ctx
->irpc
, irpc
);
660 irpc
->callnum
= callnum
;
662 irpc
->private_data
= private_data
;
663 irpc
->uuid
= irpc
->table
->syntax_id
.uuid
;
670 handle an incoming irpc reply message
672 static void irpc_handler_reply(struct messaging_context
*msg_ctx
, struct irpc_message
*m
)
674 struct irpc_request
*irpc
;
675 enum ndr_err_code ndr_err
;
677 irpc
= (struct irpc_request
*)idr_find(msg_ctx
->idr
, m
->header
.callid
);
678 if (irpc
== NULL
) return;
680 /* parse the reply data */
681 ndr_err
= irpc
->table
->calls
[irpc
->callnum
].ndr_pull(m
->ndr
, NDR_OUT
, irpc
->r
);
682 if (NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
683 irpc
->status
= m
->header
.status
;
684 talloc_steal(irpc
->mem_ctx
, m
);
686 irpc
->status
= ndr_map_error2ntstatus(ndr_err
);
687 talloc_steal(irpc
, m
);
690 if (irpc
->async
.fn
) {
691 irpc
->async
.fn(irpc
);
698 NTSTATUS
irpc_send_reply(struct irpc_message
*m
, NTSTATUS status
)
700 struct ndr_push
*push
;
702 enum ndr_err_code ndr_err
;
704 m
->header
.status
= status
;
706 /* setup the reply */
707 push
= ndr_push_init_ctx(m
->ndr
, m
->msg_ctx
->iconv_convenience
);
709 status
= NT_STATUS_NO_MEMORY
;
713 m
->header
.flags
|= IRPC_FLAG_REPLY
;
715 /* construct the packet */
716 ndr_err
= ndr_push_irpc_header(push
, NDR_SCALARS
|NDR_BUFFERS
, &m
->header
);
717 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
718 status
= ndr_map_error2ntstatus(ndr_err
);
722 ndr_err
= m
->irpc
->table
->calls
[m
->irpc
->callnum
].ndr_push(push
, NDR_OUT
, m
->data
);
723 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
724 status
= ndr_map_error2ntstatus(ndr_err
);
728 /* send the reply message */
729 packet
= ndr_push_blob(push
);
730 status
= messaging_send(m
->msg_ctx
, m
->from
, MSG_IRPC
, &packet
);
731 if (!NT_STATUS_IS_OK(status
)) goto failed
;
739 handle an incoming irpc request message
741 static void irpc_handler_request(struct messaging_context
*msg_ctx
,
742 struct irpc_message
*m
)
746 enum ndr_err_code ndr_err
;
748 for (i
=msg_ctx
->irpc
; i
; i
=i
->next
) {
749 if (GUID_equal(&i
->uuid
, &m
->header
.uuid
) &&
750 i
->table
->syntax_id
.if_version
== m
->header
.if_version
&&
751 i
->callnum
== m
->header
.callnum
) {
757 /* no registered handler for this message */
762 /* allocate space for the structure */
763 r
= talloc_zero_size(m
->ndr
, i
->table
->calls
[m
->header
.callnum
].struct_size
);
764 if (r
== NULL
) goto failed
;
766 /* parse the request data */
767 ndr_err
= i
->table
->calls
[i
->callnum
].ndr_pull(m
->ndr
, NDR_IN
, r
);
768 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
771 m
->private_data
= i
->private_data
;
772 m
->defer_reply
= false;
773 m
->msg_ctx
= msg_ctx
;
776 m
->ev
= msg_ctx
->event
.ev
;
778 m
->header
.status
= i
->fn(m
, r
);
780 if (m
->defer_reply
) {
781 /* the server function has asked to defer the reply to later */
782 talloc_steal(msg_ctx
, m
);
786 irpc_send_reply(m
, m
->header
.status
);
794 handle an incoming irpc message
796 static void irpc_handler(struct messaging_context
*msg_ctx
, void *private_data
,
797 uint32_t msg_type
, struct server_id src
, DATA_BLOB
*packet
)
799 struct irpc_message
*m
;
800 enum ndr_err_code ndr_err
;
802 m
= talloc(msg_ctx
, struct irpc_message
);
803 if (m
== NULL
) goto failed
;
807 m
->ndr
= ndr_pull_init_blob(packet
, m
, msg_ctx
->iconv_convenience
);
808 if (m
->ndr
== NULL
) goto failed
;
810 m
->ndr
->flags
|= LIBNDR_FLAG_REF_ALLOC
;
812 ndr_err
= ndr_pull_irpc_header(m
->ndr
, NDR_BUFFERS
|NDR_SCALARS
, &m
->header
);
813 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
815 if (m
->header
.flags
& IRPC_FLAG_REPLY
) {
816 irpc_handler_reply(msg_ctx
, m
);
818 irpc_handler_request(msg_ctx
, m
);
828 destroy a irpc request
830 static int irpc_destructor(struct irpc_request
*irpc
)
832 if (irpc
->callid
!= -1) {
833 idr_remove(irpc
->msg_ctx
->idr
, irpc
->callid
);
837 if (irpc
->reject_free
) {
844 timeout a irpc request
846 static void irpc_timeout(struct tevent_context
*ev
, struct tevent_timer
*te
,
847 struct timeval t
, void *private_data
)
849 struct irpc_request
*irpc
= talloc_get_type(private_data
, struct irpc_request
);
850 irpc
->status
= NT_STATUS_IO_TIMEOUT
;
852 if (irpc
->async
.fn
) {
853 irpc
->async
.fn(irpc
);
859 make a irpc call - async send
861 struct irpc_request
*irpc_call_send(struct messaging_context
*msg_ctx
,
862 struct server_id server_id
,
863 const struct ndr_interface_table
*table
,
864 int callnum
, void *r
, TALLOC_CTX
*ctx
)
866 struct irpc_header header
;
867 struct ndr_push
*ndr
;
870 struct irpc_request
*irpc
;
871 enum ndr_err_code ndr_err
;
873 irpc
= talloc(msg_ctx
, struct irpc_request
);
874 if (irpc
== NULL
) goto failed
;
876 irpc
->msg_ctx
= msg_ctx
;
878 irpc
->callnum
= callnum
;
879 irpc
->callid
= idr_get_new(msg_ctx
->idr
, irpc
, UINT16_MAX
);
880 if (irpc
->callid
== -1) goto failed
;
883 irpc
->async
.fn
= NULL
;
885 irpc
->reject_free
= false;
887 talloc_set_destructor(irpc
, irpc_destructor
);
889 /* setup the header */
890 header
.uuid
= table
->syntax_id
.uuid
;
892 header
.if_version
= table
->syntax_id
.if_version
;
893 header
.callid
= irpc
->callid
;
894 header
.callnum
= callnum
;
896 header
.status
= NT_STATUS_OK
;
898 /* construct the irpc packet */
899 ndr
= ndr_push_init_ctx(irpc
, msg_ctx
->iconv_convenience
);
900 if (ndr
== NULL
) goto failed
;
902 ndr_err
= ndr_push_irpc_header(ndr
, NDR_SCALARS
|NDR_BUFFERS
, &header
);
903 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
905 ndr_err
= table
->calls
[callnum
].ndr_push(ndr
, NDR_IN
, r
);
906 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
909 packet
= ndr_push_blob(ndr
);
910 status
= messaging_send(msg_ctx
, server_id
, MSG_IRPC
, &packet
);
911 if (!NT_STATUS_IS_OK(status
)) goto failed
;
913 event_add_timed(msg_ctx
->event
.ev
, irpc
,
914 timeval_current_ofs(IRPC_CALL_TIMEOUT
, 0),
926 wait for a irpc reply
928 NTSTATUS
irpc_call_recv(struct irpc_request
*irpc
)
932 NT_STATUS_HAVE_NO_MEMORY(irpc
);
934 irpc
->reject_free
= true;
936 while (!irpc
->done
) {
937 if (event_loop_once(irpc
->msg_ctx
->event
.ev
) != 0) {
938 return NT_STATUS_CONNECTION_DISCONNECTED
;
942 irpc
->reject_free
= false;
944 status
= irpc
->status
;
950 perform a synchronous irpc request
952 NTSTATUS
irpc_call(struct messaging_context
*msg_ctx
,
953 struct server_id server_id
,
954 const struct ndr_interface_table
*table
,
955 int callnum
, void *r
,
958 struct irpc_request
*irpc
= irpc_call_send(msg_ctx
, server_id
,
959 table
, callnum
, r
, mem_ctx
);
960 return irpc_call_recv(irpc
);
964 open the naming database
966 static struct tdb_wrap
*irpc_namedb_open(struct messaging_context
*msg_ctx
)
969 char *path
= talloc_asprintf(msg_ctx
, "%s/names.tdb", msg_ctx
->base_path
);
973 t
= tdb_wrap_open(msg_ctx
, path
, 0, 0, O_RDWR
|O_CREAT
, 0660);
980 add a string name that this irpc server can be called on
982 NTSTATUS
irpc_add_name(struct messaging_context
*msg_ctx
, const char *name
)
987 NTSTATUS status
= NT_STATUS_OK
;
989 t
= irpc_namedb_open(msg_ctx
);
990 NT_STATUS_HAVE_NO_MEMORY(t
);
992 if (tdb_lock_bystring(t
->tdb
, name
) != 0) {
994 return NT_STATUS_LOCK_NOT_GRANTED
;
996 rec
= tdb_fetch_bystring(t
->tdb
, name
);
997 count
= rec
.dsize
/ sizeof(struct server_id
);
998 rec
.dptr
= (unsigned char *)realloc_p(rec
.dptr
, struct server_id
, count
+1);
999 rec
.dsize
+= sizeof(struct server_id
);
1000 if (rec
.dptr
== NULL
) {
1001 tdb_unlock_bystring(t
->tdb
, name
);
1003 return NT_STATUS_NO_MEMORY
;
1005 ((struct server_id
*)rec
.dptr
)[count
] = msg_ctx
->server_id
;
1006 if (tdb_store_bystring(t
->tdb
, name
, rec
, 0) != 0) {
1007 status
= NT_STATUS_INTERNAL_ERROR
;
1010 tdb_unlock_bystring(t
->tdb
, name
);
1013 msg_ctx
->names
= str_list_add(msg_ctx
->names
, name
);
1014 talloc_steal(msg_ctx
, msg_ctx
->names
);
1020 return a list of server ids for a server name
1022 struct server_id
*irpc_servers_byname(struct messaging_context
*msg_ctx
,
1023 TALLOC_CTX
*mem_ctx
,
1029 struct server_id
*ret
;
1031 t
= irpc_namedb_open(msg_ctx
);
1036 if (tdb_lock_bystring(t
->tdb
, name
) != 0) {
1040 rec
= tdb_fetch_bystring(t
->tdb
, name
);
1041 if (rec
.dptr
== NULL
) {
1042 tdb_unlock_bystring(t
->tdb
, name
);
1046 count
= rec
.dsize
/ sizeof(struct server_id
);
1047 ret
= talloc_array(mem_ctx
, struct server_id
, count
+1);
1049 tdb_unlock_bystring(t
->tdb
, name
);
1053 for (i
=0;i
<count
;i
++) {
1054 ret
[i
] = ((struct server_id
*)rec
.dptr
)[i
];
1056 ret
[i
] = cluster_id(0, 0);
1058 tdb_unlock_bystring(t
->tdb
, name
);
1065 remove a name from a messaging context
1067 void irpc_remove_name(struct messaging_context
*msg_ctx
, const char *name
)
1072 struct server_id
*ids
;
1074 str_list_remove(msg_ctx
->names
, name
);
1076 t
= irpc_namedb_open(msg_ctx
);
1081 if (tdb_lock_bystring(t
->tdb
, name
) != 0) {
1085 rec
= tdb_fetch_bystring(t
->tdb
, name
);
1086 if (rec
.dptr
== NULL
) {
1087 tdb_unlock_bystring(t
->tdb
, name
);
1091 count
= rec
.dsize
/ sizeof(struct server_id
);
1094 tdb_unlock_bystring(t
->tdb
, name
);
1098 ids
= (struct server_id
*)rec
.dptr
;
1099 for (i
=0;i
<count
;i
++) {
1100 if (cluster_id_equal(&ids
[i
], &msg_ctx
->server_id
)) {
1102 memmove(ids
+i
, ids
+i
+1,
1103 sizeof(struct server_id
) * (count
-(i
+1)));
1105 rec
.dsize
-= sizeof(struct server_id
);
1109 tdb_store_bystring(t
->tdb
, name
, rec
, 0);
1111 tdb_unlock_bystring(t
->tdb
, name
);
1115 struct server_id
messaging_get_server_id(struct messaging_context
*msg_ctx
)
1117 return msg_ctx
->server_id
;