2 Unix SMB/CIFS implementation.
4 Samba internal messaging functions
6 Copyright (C) Andrew Tridgell 2004
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "lib/events/events.h"
24 #include "system/filesys.h"
25 #include "messaging/messaging.h"
26 #include "../lib/util/dlinklist.h"
27 #include "lib/socket/socket.h"
28 #include "librpc/gen_ndr/ndr_irpc.h"
29 #include "lib/messaging/irpc.h"
31 #include "../lib/util/unix_privs.h"
32 #include "librpc/rpc/dcerpc.h"
33 #include "../tdb/include/tdb.h"
34 #include "../lib/util/util_tdb.h"
35 #include "cluster/cluster.h"
37 /* change the message version with any incompatible changes in the protocol */
38 #define MESSAGING_VERSION 1
40 struct messaging_context
{
41 struct server_id server_id
;
42 struct socket_context
*sock
;
43 const char *base_path
;
45 struct dispatch_fn
**dispatch
;
47 struct idr_context
*dispatch_tree
;
48 struct messaging_rec
*pending
;
49 struct messaging_rec
*retry_queue
;
50 struct smb_iconv_convenience
*iconv_convenience
;
51 struct irpc_list
*irpc
;
52 struct idr_context
*idr
;
54 struct timeval start_time
;
55 struct tevent_timer
*retry_te
;
57 struct tevent_context
*ev
;
58 struct tevent_fd
*fde
;
62 /* we have a linked list of dispatch handlers for each msg_type that
63 this messaging server can deal with */
65 struct dispatch_fn
*next
, *prev
;
71 /* an individual message */
72 struct messaging_rec
{
73 struct messaging_rec
*next
, *prev
;
74 struct messaging_context
*msg
;
77 struct messaging_header
{
80 struct server_id from
;
90 static void irpc_handler(struct messaging_context
*, void *,
91 uint32_t, struct server_id
, DATA_BLOB
*);
95 A useful function for testing the message system.
97 static void ping_message(struct messaging_context
*msg
, void *private_data
,
98 uint32_t msg_type
, struct server_id src
, DATA_BLOB
*data
)
100 DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n",
101 (unsigned int)src
.node
, (unsigned int)src
.id
, (int)data
->length
,
102 data
->data
?(const char *)data
->data
:""));
103 messaging_send(msg
, src
, MSG_PONG
, data
);
107 return uptime of messaging server via irpc
109 static NTSTATUS
irpc_uptime(struct irpc_message
*msg
,
110 struct irpc_uptime
*r
)
112 struct messaging_context
*ctx
= talloc_get_type(msg
->private_data
, struct messaging_context
);
113 *r
->out
.start_time
= timeval_to_nttime(&ctx
->start_time
);
118 return the path to a messaging socket
120 static char *messaging_path(struct messaging_context
*msg
, struct server_id server_id
)
122 TALLOC_CTX
*tmp_ctx
= talloc_new(msg
);
123 const char *id
= cluster_id_string(tmp_ctx
, server_id
);
128 s
= talloc_asprintf(msg
, "%s/msg.%s", msg
->base_path
, id
);
129 talloc_steal(s
, tmp_ctx
);
134 dispatch a fully received message
136 note that this deliberately can match more than one message handler
137 per message. That allows a single messasging context to register
138 (for example) a debug handler for more than one piece of code
140 static void messaging_dispatch(struct messaging_context
*msg
, struct messaging_rec
*rec
)
142 struct dispatch_fn
*d
, *next
;
144 /* temporary IDs use an idtree, the rest use a array of pointers */
145 if (rec
->header
->msg_type
>= MSG_TMP_BASE
) {
146 d
= (struct dispatch_fn
*)idr_find(msg
->dispatch_tree
,
147 rec
->header
->msg_type
);
148 } else if (rec
->header
->msg_type
< msg
->num_types
) {
149 d
= msg
->dispatch
[rec
->header
->msg_type
];
154 for (; d
; d
= next
) {
157 data
.data
= rec
->packet
.data
+ sizeof(*rec
->header
);
158 data
.length
= rec
->header
->length
;
159 d
->fn(msg
, d
->private_data
, d
->msg_type
, rec
->header
->from
, &data
);
161 rec
->header
->length
= 0;
165 handler for messages that arrive from other nodes in the cluster
167 static void cluster_message_handler(struct messaging_context
*msg
, DATA_BLOB packet
)
169 struct messaging_rec
*rec
;
171 rec
= talloc(msg
, struct messaging_rec
);
173 smb_panic("Unable to allocate messaging_rec");
177 rec
->path
= msg
->path
;
178 rec
->header
= (struct messaging_header
*)packet
.data
;
179 rec
->packet
= packet
;
182 if (packet
.length
!= sizeof(*rec
->header
) + rec
->header
->length
) {
183 DEBUG(0,("messaging: bad message header size %d should be %d\n",
184 rec
->header
->length
, (int)(packet
.length
- sizeof(*rec
->header
))));
189 messaging_dispatch(msg
, rec
);
196 try to send the message
198 static NTSTATUS
try_send(struct messaging_rec
*rec
)
200 struct messaging_context
*msg
= rec
->msg
;
204 struct socket_address
*path
;
206 /* rec->path is the path of the *other* socket, where we want
208 path
= socket_address_from_strings(msg
, msg
->sock
->backend_name
,
211 return NT_STATUS_NO_MEMORY
;
214 /* we send with privileges so messages work from any context */
215 priv
= root_privileges();
216 status
= socket_sendto(msg
->sock
, &rec
->packet
, &nsent
, path
);
224 retry backed off messages
226 static void msg_retry_timer(struct tevent_context
*ev
, struct tevent_timer
*te
,
227 struct timeval t
, void *private_data
)
229 struct messaging_context
*msg
= talloc_get_type(private_data
,
230 struct messaging_context
);
231 msg
->retry_te
= NULL
;
233 /* put the messages back on the main queue */
234 while (msg
->retry_queue
) {
235 struct messaging_rec
*rec
= msg
->retry_queue
;
236 DLIST_REMOVE(msg
->retry_queue
, rec
);
237 DLIST_ADD_END(msg
->pending
, rec
, struct messaging_rec
*);
240 EVENT_FD_WRITEABLE(msg
->event
.fde
);
244 handle a socket write event
246 static void messaging_send_handler(struct messaging_context
*msg
)
248 while (msg
->pending
) {
249 struct messaging_rec
*rec
= msg
->pending
;
251 status
= try_send(rec
);
252 if (NT_STATUS_EQUAL(status
, STATUS_MORE_ENTRIES
)) {
254 if (rec
->retries
> 3) {
255 /* we're getting continuous write errors -
256 backoff this record */
257 DLIST_REMOVE(msg
->pending
, rec
);
258 DLIST_ADD_END(msg
->retry_queue
, rec
,
259 struct messaging_rec
*);
260 if (msg
->retry_te
== NULL
) {
262 event_add_timed(msg
->event
.ev
, msg
,
263 timeval_current_ofs(1, 0),
264 msg_retry_timer
, msg
);
270 if (!NT_STATUS_IS_OK(status
)) {
271 TALLOC_CTX
*tmp_ctx
= talloc_new(msg
);
272 DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n",
273 cluster_id_string(tmp_ctx
, rec
->header
->from
),
274 cluster_id_string(tmp_ctx
, rec
->header
->to
),
275 rec
->header
->msg_type
,
277 talloc_free(tmp_ctx
);
279 DLIST_REMOVE(msg
->pending
, rec
);
282 if (msg
->pending
== NULL
) {
283 EVENT_FD_NOT_WRITEABLE(msg
->event
.fde
);
288 handle a new incoming packet
290 static void messaging_recv_handler(struct messaging_context
*msg
)
292 struct messaging_rec
*rec
;
297 /* see how many bytes are in the next packet */
298 status
= socket_pending(msg
->sock
, &msize
);
299 if (!NT_STATUS_IS_OK(status
)) {
300 DEBUG(0,("socket_pending failed in messaging - %s\n",
305 packet
= data_blob_talloc(msg
, NULL
, msize
);
306 if (packet
.data
== NULL
) {
307 /* assume this is temporary and retry */
311 status
= socket_recv(msg
->sock
, packet
.data
, msize
, &msize
);
312 if (!NT_STATUS_IS_OK(status
)) {
313 data_blob_free(&packet
);
317 if (msize
< sizeof(*rec
->header
)) {
318 DEBUG(0,("messaging: bad message of size %d\n", (int)msize
));
319 data_blob_free(&packet
);
323 rec
= talloc(msg
, struct messaging_rec
);
325 smb_panic("Unable to allocate messaging_rec");
328 talloc_steal(rec
, packet
.data
);
330 rec
->path
= msg
->path
;
331 rec
->header
= (struct messaging_header
*)packet
.data
;
332 rec
->packet
= packet
;
335 if (msize
!= sizeof(*rec
->header
) + rec
->header
->length
) {
336 DEBUG(0,("messaging: bad message header size %d should be %d\n",
337 rec
->header
->length
, (int)(msize
- sizeof(*rec
->header
))));
342 messaging_dispatch(msg
, rec
);
348 handle a socket event
350 static void messaging_handler(struct tevent_context
*ev
, struct tevent_fd
*fde
,
351 uint16_t flags
, void *private_data
)
353 struct messaging_context
*msg
= talloc_get_type(private_data
,
354 struct messaging_context
);
355 if (flags
& EVENT_FD_WRITE
) {
356 messaging_send_handler(msg
);
358 if (flags
& EVENT_FD_READ
) {
359 messaging_recv_handler(msg
);
365 Register a dispatch function for a particular message type.
367 NTSTATUS
messaging_register(struct messaging_context
*msg
, void *private_data
,
368 uint32_t msg_type
, msg_callback_t fn
)
370 struct dispatch_fn
*d
;
372 /* possibly expand dispatch array */
373 if (msg_type
>= msg
->num_types
) {
374 struct dispatch_fn
**dp
;
376 dp
= talloc_realloc(msg
, msg
->dispatch
, struct dispatch_fn
*, msg_type
+1);
377 NT_STATUS_HAVE_NO_MEMORY(dp
);
379 for (i
=msg
->num_types
;i
<=msg_type
;i
++) {
380 msg
->dispatch
[i
] = NULL
;
382 msg
->num_types
= msg_type
+1;
385 d
= talloc_zero(msg
->dispatch
, struct dispatch_fn
);
386 NT_STATUS_HAVE_NO_MEMORY(d
);
387 d
->msg_type
= msg_type
;
388 d
->private_data
= private_data
;
391 DLIST_ADD(msg
->dispatch
[msg_type
], d
);
397 register a temporary message handler. The msg_type is allocated
400 NTSTATUS
messaging_register_tmp(struct messaging_context
*msg
, void *private_data
,
401 msg_callback_t fn
, uint32_t *msg_type
)
403 struct dispatch_fn
*d
;
406 d
= talloc_zero(msg
->dispatch
, struct dispatch_fn
);
407 NT_STATUS_HAVE_NO_MEMORY(d
);
408 d
->private_data
= private_data
;
411 id
= idr_get_new_above(msg
->dispatch_tree
, d
, MSG_TMP_BASE
, UINT16_MAX
);
414 return NT_STATUS_TOO_MANY_CONTEXT_IDS
;
417 d
->msg_type
= (uint32_t)id
;
418 (*msg_type
) = d
->msg_type
;
424 De-register the function for a particular message type.
426 void messaging_deregister(struct messaging_context
*msg
, uint32_t msg_type
, void *private_data
)
428 struct dispatch_fn
*d
, *next
;
430 if (msg_type
>= msg
->num_types
) {
431 d
= (struct dispatch_fn
*)idr_find(msg
->dispatch_tree
,
434 idr_remove(msg
->dispatch_tree
, msg_type
);
439 for (d
= msg
->dispatch
[msg_type
]; d
; d
= next
) {
441 if (d
->private_data
== private_data
) {
442 DLIST_REMOVE(msg
->dispatch
[msg_type
], d
);
449 Send a message to a particular server
451 NTSTATUS
messaging_send(struct messaging_context
*msg
, struct server_id server
,
452 uint32_t msg_type
, DATA_BLOB
*data
)
454 struct messaging_rec
*rec
;
456 size_t dlength
= data
?data
->length
:0;
458 rec
= talloc(msg
, struct messaging_rec
);
460 return NT_STATUS_NO_MEMORY
;
463 rec
->packet
= data_blob_talloc(rec
, NULL
, sizeof(*rec
->header
) + dlength
);
464 if (rec
->packet
.data
== NULL
) {
466 return NT_STATUS_NO_MEMORY
;
471 rec
->header
= (struct messaging_header
*)rec
->packet
.data
;
473 ZERO_STRUCTP(rec
->header
);
474 rec
->header
->version
= MESSAGING_VERSION
;
475 rec
->header
->msg_type
= msg_type
;
476 rec
->header
->from
= msg
->server_id
;
477 rec
->header
->to
= server
;
478 rec
->header
->length
= dlength
;
480 memcpy(rec
->packet
.data
+ sizeof(*rec
->header
),
481 data
->data
, dlength
);
484 if (!cluster_node_equal(&msg
->server_id
, &server
)) {
485 /* the destination is on another node - dispatch via
487 status
= cluster_message_send(server
, &rec
->packet
);
492 rec
->path
= messaging_path(msg
, server
);
493 talloc_steal(rec
, rec
->path
);
495 if (msg
->pending
!= NULL
) {
496 status
= STATUS_MORE_ENTRIES
;
498 status
= try_send(rec
);
501 if (NT_STATUS_EQUAL(status
, STATUS_MORE_ENTRIES
)) {
502 if (msg
->pending
== NULL
) {
503 EVENT_FD_WRITEABLE(msg
->event
.fde
);
505 DLIST_ADD_END(msg
->pending
, rec
, struct messaging_rec
*);
515 Send a message to a particular server, with the message containing a single pointer
517 NTSTATUS
messaging_send_ptr(struct messaging_context
*msg
, struct server_id server
,
518 uint32_t msg_type
, void *ptr
)
522 blob
.data
= (uint8_t *)&ptr
;
523 blob
.length
= sizeof(void *);
525 return messaging_send(msg
, server
, msg_type
, &blob
);
530 destroy the messaging context
532 static int messaging_destructor(struct messaging_context
*msg
)
535 while (msg
->names
&& msg
->names
[0]) {
536 irpc_remove_name(msg
, msg
->names
[0]);
542 create the listening socket and setup the dispatcher
544 struct messaging_context
*messaging_init(TALLOC_CTX
*mem_ctx
,
546 struct server_id server_id
,
547 struct smb_iconv_convenience
*iconv_convenience
,
548 struct tevent_context
*ev
)
550 struct messaging_context
*msg
;
552 struct socket_address
*path
;
558 msg
= talloc_zero(mem_ctx
, struct messaging_context
);
563 /* setup a handler for messages from other cluster nodes, if appropriate */
564 status
= cluster_message_init(msg
, server_id
, cluster_message_handler
);
565 if (!NT_STATUS_IS_OK(status
)) {
570 /* create the messaging directory if needed */
573 msg
->base_path
= talloc_reference(msg
, dir
);
574 msg
->path
= messaging_path(msg
, server_id
);
575 msg
->server_id
= server_id
;
576 msg
->iconv_convenience
= iconv_convenience
;
577 msg
->idr
= idr_init(msg
);
578 msg
->dispatch_tree
= idr_init(msg
);
579 msg
->start_time
= timeval_current();
581 status
= socket_create("unix", SOCKET_TYPE_DGRAM
, &msg
->sock
, 0);
582 if (!NT_STATUS_IS_OK(status
)) {
587 /* by stealing here we ensure that the socket is cleaned up (and even
589 talloc_steal(msg
, msg
->sock
);
591 path
= socket_address_from_strings(msg
, msg
->sock
->backend_name
,
598 status
= socket_listen(msg
->sock
, path
, 50, 0);
599 if (!NT_STATUS_IS_OK(status
)) {
600 DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg
->path
, nt_errstr(status
)));
605 /* it needs to be non blocking for sends */
606 set_blocking(socket_get_fd(msg
->sock
), false);
609 msg
->event
.fde
= event_add_fd(ev
, msg
, socket_get_fd(msg
->sock
),
610 EVENT_FD_READ
, messaging_handler
, msg
);
611 tevent_fd_set_auto_close(msg
->event
.fde
);
613 talloc_set_destructor(msg
, messaging_destructor
);
615 messaging_register(msg
, NULL
, MSG_PING
, ping_message
);
616 messaging_register(msg
, NULL
, MSG_IRPC
, irpc_handler
);
617 IRPC_REGISTER(msg
, irpc
, IRPC_UPTIME
, irpc_uptime
, msg
);
623 A hack, for the short term until we get 'client only' messaging in place
625 struct messaging_context
*messaging_client_init(TALLOC_CTX
*mem_ctx
,
627 struct smb_iconv_convenience
*iconv_convenience
,
628 struct tevent_context
*ev
)
632 id
.id
= random() % 0x10000000;
633 return messaging_init(mem_ctx
, dir
, id
, iconv_convenience
, ev
);
636 a list of registered irpc server functions
639 struct irpc_list
*next
, *prev
;
641 const struct ndr_interface_table
*table
;
649 register a irpc server function
651 NTSTATUS
irpc_register(struct messaging_context
*msg_ctx
,
652 const struct ndr_interface_table
*table
,
653 int callnum
, irpc_function_t fn
, void *private_data
)
655 struct irpc_list
*irpc
;
657 /* override an existing handler, if any */
658 for (irpc
=msg_ctx
->irpc
; irpc
; irpc
=irpc
->next
) {
659 if (irpc
->table
== table
&& irpc
->callnum
== callnum
) {
664 irpc
= talloc(msg_ctx
, struct irpc_list
);
665 NT_STATUS_HAVE_NO_MEMORY(irpc
);
666 DLIST_ADD(msg_ctx
->irpc
, irpc
);
670 irpc
->callnum
= callnum
;
672 irpc
->private_data
= private_data
;
673 irpc
->uuid
= irpc
->table
->syntax_id
.uuid
;
680 handle an incoming irpc reply message
682 static void irpc_handler_reply(struct messaging_context
*msg_ctx
, struct irpc_message
*m
)
684 struct irpc_request
*irpc
;
685 enum ndr_err_code ndr_err
;
687 irpc
= (struct irpc_request
*)idr_find(msg_ctx
->idr
, m
->header
.callid
);
688 if (irpc
== NULL
) return;
690 /* parse the reply data */
691 ndr_err
= irpc
->table
->calls
[irpc
->callnum
].ndr_pull(m
->ndr
, NDR_OUT
, irpc
->r
);
692 if (NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
693 irpc
->status
= m
->header
.status
;
694 talloc_steal(irpc
->mem_ctx
, m
);
696 irpc
->status
= ndr_map_error2ntstatus(ndr_err
);
697 talloc_steal(irpc
, m
);
700 if (irpc
->async
.fn
) {
701 irpc
->async
.fn(irpc
);
708 NTSTATUS
irpc_send_reply(struct irpc_message
*m
, NTSTATUS status
)
710 struct ndr_push
*push
;
712 enum ndr_err_code ndr_err
;
714 m
->header
.status
= status
;
716 /* setup the reply */
717 push
= ndr_push_init_ctx(m
->ndr
, m
->msg_ctx
->iconv_convenience
);
719 status
= NT_STATUS_NO_MEMORY
;
723 m
->header
.flags
|= IRPC_FLAG_REPLY
;
725 /* construct the packet */
726 ndr_err
= ndr_push_irpc_header(push
, NDR_SCALARS
|NDR_BUFFERS
, &m
->header
);
727 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
728 status
= ndr_map_error2ntstatus(ndr_err
);
732 ndr_err
= m
->irpc
->table
->calls
[m
->irpc
->callnum
].ndr_push(push
, NDR_OUT
, m
->data
);
733 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
734 status
= ndr_map_error2ntstatus(ndr_err
);
738 /* send the reply message */
739 packet
= ndr_push_blob(push
);
740 status
= messaging_send(m
->msg_ctx
, m
->from
, MSG_IRPC
, &packet
);
741 if (!NT_STATUS_IS_OK(status
)) goto failed
;
749 handle an incoming irpc request message
751 static void irpc_handler_request(struct messaging_context
*msg_ctx
,
752 struct irpc_message
*m
)
756 enum ndr_err_code ndr_err
;
758 for (i
=msg_ctx
->irpc
; i
; i
=i
->next
) {
759 if (GUID_equal(&i
->uuid
, &m
->header
.uuid
) &&
760 i
->table
->syntax_id
.if_version
== m
->header
.if_version
&&
761 i
->callnum
== m
->header
.callnum
) {
767 /* no registered handler for this message */
772 /* allocate space for the structure */
773 r
= talloc_zero_size(m
->ndr
, i
->table
->calls
[m
->header
.callnum
].struct_size
);
774 if (r
== NULL
) goto failed
;
776 /* parse the request data */
777 ndr_err
= i
->table
->calls
[i
->callnum
].ndr_pull(m
->ndr
, NDR_IN
, r
);
778 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
781 m
->private_data
= i
->private_data
;
782 m
->defer_reply
= false;
783 m
->msg_ctx
= msg_ctx
;
786 m
->ev
= msg_ctx
->event
.ev
;
788 m
->header
.status
= i
->fn(m
, r
);
790 if (m
->defer_reply
) {
791 /* the server function has asked to defer the reply to later */
792 talloc_steal(msg_ctx
, m
);
796 irpc_send_reply(m
, m
->header
.status
);
804 handle an incoming irpc message
806 static void irpc_handler(struct messaging_context
*msg_ctx
, void *private_data
,
807 uint32_t msg_type
, struct server_id src
, DATA_BLOB
*packet
)
809 struct irpc_message
*m
;
810 enum ndr_err_code ndr_err
;
812 m
= talloc(msg_ctx
, struct irpc_message
);
813 if (m
== NULL
) goto failed
;
817 m
->ndr
= ndr_pull_init_blob(packet
, m
, msg_ctx
->iconv_convenience
);
818 if (m
->ndr
== NULL
) goto failed
;
820 m
->ndr
->flags
|= LIBNDR_FLAG_REF_ALLOC
;
822 ndr_err
= ndr_pull_irpc_header(m
->ndr
, NDR_BUFFERS
|NDR_SCALARS
, &m
->header
);
823 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
825 if (m
->header
.flags
& IRPC_FLAG_REPLY
) {
826 irpc_handler_reply(msg_ctx
, m
);
828 irpc_handler_request(msg_ctx
, m
);
838 destroy a irpc request
840 static int irpc_destructor(struct irpc_request
*irpc
)
842 if (irpc
->callid
!= -1) {
843 idr_remove(irpc
->msg_ctx
->idr
, irpc
->callid
);
847 if (irpc
->reject_free
) {
854 timeout a irpc request
856 static void irpc_timeout(struct tevent_context
*ev
, struct tevent_timer
*te
,
857 struct timeval t
, void *private_data
)
859 struct irpc_request
*irpc
= talloc_get_type(private_data
, struct irpc_request
);
860 irpc
->status
= NT_STATUS_IO_TIMEOUT
;
862 if (irpc
->async
.fn
) {
863 irpc
->async
.fn(irpc
);
869 make a irpc call - async send
871 struct irpc_request
*irpc_call_send(struct messaging_context
*msg_ctx
,
872 struct server_id server_id
,
873 const struct ndr_interface_table
*table
,
874 int callnum
, void *r
, TALLOC_CTX
*ctx
)
876 struct irpc_header header
;
877 struct ndr_push
*ndr
;
880 struct irpc_request
*irpc
;
881 enum ndr_err_code ndr_err
;
883 irpc
= talloc(msg_ctx
, struct irpc_request
);
884 if (irpc
== NULL
) goto failed
;
886 irpc
->msg_ctx
= msg_ctx
;
888 irpc
->callnum
= callnum
;
889 irpc
->callid
= idr_get_new(msg_ctx
->idr
, irpc
, UINT16_MAX
);
890 if (irpc
->callid
== -1) goto failed
;
893 irpc
->async
.fn
= NULL
;
895 irpc
->reject_free
= false;
897 talloc_set_destructor(irpc
, irpc_destructor
);
899 /* setup the header */
900 header
.uuid
= table
->syntax_id
.uuid
;
902 header
.if_version
= table
->syntax_id
.if_version
;
903 header
.callid
= irpc
->callid
;
904 header
.callnum
= callnum
;
906 header
.status
= NT_STATUS_OK
;
908 /* construct the irpc packet */
909 ndr
= ndr_push_init_ctx(irpc
, msg_ctx
->iconv_convenience
);
910 if (ndr
== NULL
) goto failed
;
912 ndr_err
= ndr_push_irpc_header(ndr
, NDR_SCALARS
|NDR_BUFFERS
, &header
);
913 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
915 ndr_err
= table
->calls
[callnum
].ndr_push(ndr
, NDR_IN
, r
);
916 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
919 packet
= ndr_push_blob(ndr
);
920 status
= messaging_send(msg_ctx
, server_id
, MSG_IRPC
, &packet
);
921 if (!NT_STATUS_IS_OK(status
)) goto failed
;
923 event_add_timed(msg_ctx
->event
.ev
, irpc
,
924 timeval_current_ofs(IRPC_CALL_TIMEOUT
, 0),
936 wait for a irpc reply
938 NTSTATUS
irpc_call_recv(struct irpc_request
*irpc
)
942 NT_STATUS_HAVE_NO_MEMORY(irpc
);
944 irpc
->reject_free
= true;
946 while (!irpc
->done
) {
947 if (event_loop_once(irpc
->msg_ctx
->event
.ev
) != 0) {
948 return NT_STATUS_CONNECTION_DISCONNECTED
;
952 irpc
->reject_free
= false;
954 status
= irpc
->status
;
960 perform a synchronous irpc request
962 NTSTATUS
irpc_call(struct messaging_context
*msg_ctx
,
963 struct server_id server_id
,
964 const struct ndr_interface_table
*table
,
965 int callnum
, void *r
,
968 struct irpc_request
*irpc
= irpc_call_send(msg_ctx
, server_id
,
969 table
, callnum
, r
, mem_ctx
);
970 return irpc_call_recv(irpc
);
974 open the naming database
976 static struct tdb_wrap
*irpc_namedb_open(struct messaging_context
*msg_ctx
)
979 char *path
= talloc_asprintf(msg_ctx
, "%s/names.tdb", msg_ctx
->base_path
);
983 t
= tdb_wrap_open(msg_ctx
, path
, 0, 0, O_RDWR
|O_CREAT
, 0660);
990 add a string name that this irpc server can be called on
992 NTSTATUS
irpc_add_name(struct messaging_context
*msg_ctx
, const char *name
)
997 NTSTATUS status
= NT_STATUS_OK
;
999 t
= irpc_namedb_open(msg_ctx
);
1000 NT_STATUS_HAVE_NO_MEMORY(t
);
1002 if (tdb_lock_bystring(t
->tdb
, name
) != 0) {
1004 return NT_STATUS_LOCK_NOT_GRANTED
;
1006 rec
= tdb_fetch_bystring(t
->tdb
, name
);
1007 count
= rec
.dsize
/ sizeof(struct server_id
);
1008 rec
.dptr
= (unsigned char *)realloc_p(rec
.dptr
, struct server_id
, count
+1);
1009 rec
.dsize
+= sizeof(struct server_id
);
1010 if (rec
.dptr
== NULL
) {
1011 tdb_unlock_bystring(t
->tdb
, name
);
1013 return NT_STATUS_NO_MEMORY
;
1015 ((struct server_id
*)rec
.dptr
)[count
] = msg_ctx
->server_id
;
1016 if (tdb_store_bystring(t
->tdb
, name
, rec
, 0) != 0) {
1017 status
= NT_STATUS_INTERNAL_ERROR
;
1020 tdb_unlock_bystring(t
->tdb
, name
);
1023 msg_ctx
->names
= str_list_add(msg_ctx
->names
, name
);
1024 talloc_steal(msg_ctx
, msg_ctx
->names
);
1030 return a list of server ids for a server name
1032 struct server_id
*irpc_servers_byname(struct messaging_context
*msg_ctx
,
1033 TALLOC_CTX
*mem_ctx
,
1039 struct server_id
*ret
;
1041 t
= irpc_namedb_open(msg_ctx
);
1046 if (tdb_lock_bystring(t
->tdb
, name
) != 0) {
1050 rec
= tdb_fetch_bystring(t
->tdb
, name
);
1051 if (rec
.dptr
== NULL
) {
1052 tdb_unlock_bystring(t
->tdb
, name
);
1056 count
= rec
.dsize
/ sizeof(struct server_id
);
1057 ret
= talloc_array(mem_ctx
, struct server_id
, count
+1);
1059 tdb_unlock_bystring(t
->tdb
, name
);
1063 for (i
=0;i
<count
;i
++) {
1064 ret
[i
] = ((struct server_id
*)rec
.dptr
)[i
];
1066 ret
[i
] = cluster_id(0, 0);
1068 tdb_unlock_bystring(t
->tdb
, name
);
1075 remove a name from a messaging context
1077 void irpc_remove_name(struct messaging_context
*msg_ctx
, const char *name
)
1082 struct server_id
*ids
;
1084 str_list_remove(msg_ctx
->names
, name
);
1086 t
= irpc_namedb_open(msg_ctx
);
1091 if (tdb_lock_bystring(t
->tdb
, name
) != 0) {
1095 rec
= tdb_fetch_bystring(t
->tdb
, name
);
1096 if (rec
.dptr
== NULL
) {
1097 tdb_unlock_bystring(t
->tdb
, name
);
1101 count
= rec
.dsize
/ sizeof(struct server_id
);
1104 tdb_unlock_bystring(t
->tdb
, name
);
1108 ids
= (struct server_id
*)rec
.dptr
;
1109 for (i
=0;i
<count
;i
++) {
1110 if (cluster_id_equal(&ids
[i
], &msg_ctx
->server_id
)) {
1112 memmove(ids
+i
, ids
+i
+1,
1113 sizeof(struct server_id
) * (count
-(i
+1)));
1115 rec
.dsize
-= sizeof(struct server_id
);
1119 tdb_store_bystring(t
->tdb
, name
, rec
, 0);
1121 tdb_unlock_bystring(t
->tdb
, name
);
1125 struct server_id
messaging_get_server_id(struct messaging_context
*msg_ctx
)
1127 return msg_ctx
->server_id
;