2 Unix SMB/CIFS implementation.
4 Samba internal messaging functions
6 Copyright (C) Andrew Tridgell 2004
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "lib/events/events.h"
24 #include "system/filesys.h"
25 #include "messaging/messaging.h"
26 #include "../lib/util/dlinklist.h"
27 #include "lib/socket/socket.h"
28 #include "librpc/gen_ndr/ndr_irpc.h"
29 #include "lib/messaging/irpc.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "../lib/util/unix_privs.h"
32 #include "librpc/rpc/dcerpc.h"
33 #include "../lib/tdb_compat/tdb_compat.h"
34 #include "../lib/util/util_tdb.h"
35 #include "cluster/cluster.h"
36 #include "../lib/util/tevent_ntstatus.h"
37 #include "lib/param/param.h"
39 /* change the message version with any incompatible changes in the protocol */
40 #define IMESSAGING_VERSION 1
42 static struct tdb_wrap
*irpc_namedb_open(TALLOC_CTX
*mem_ctx
, const char *base_path
,
43 struct loadparm_context
*lp_ctx
);
49 struct imessaging_context
*msg_ctx
;
52 void (*handler
)(struct irpc_request
*irpc
, struct irpc_message
*m
);
57 struct imessaging_context
{
58 struct server_id server_id
;
59 struct socket_context
*sock
;
60 const char *base_path
;
62 struct dispatch_fn
**dispatch
;
64 struct idr_context
*dispatch_tree
;
65 struct imessaging_rec
*pending
;
66 struct imessaging_rec
*retry_queue
;
67 struct irpc_list
*irpc
;
68 struct idr_context
*idr
;
70 struct tdb_wrap
*names_db
;
71 struct timeval start_time
;
72 struct tevent_timer
*retry_te
;
74 struct tevent_fd
*fde
;
78 /* we have a linked list of dispatch handlers for each msg_type that
79 this messaging server can deal with */
81 struct dispatch_fn
*next
, *prev
;
87 /* an individual message */
88 struct imessaging_rec
{
89 struct imessaging_rec
*next
, *prev
;
90 struct imessaging_context
*msg
;
93 struct imessaging_header
{
96 struct server_id from
;
106 static void irpc_handler(struct imessaging_context
*, void *,
107 uint32_t, struct server_id
, DATA_BLOB
*);
111 A useful function for testing the message system.
113 static void ping_message(struct imessaging_context
*msg
, void *private_data
,
114 uint32_t msg_type
, struct server_id src
, DATA_BLOB
*data
)
116 char *task_id
= server_id_str(NULL
, &src
);
117 DEBUG(1,("INFO: Received PING message from server %s [%.*s]\n",
118 task_id
, (int)data
->length
,
119 data
->data
?(const char *)data
->data
:""));
120 talloc_free(task_id
);
121 imessaging_send(msg
, src
, MSG_PONG
, data
);
125 return uptime of messaging server via irpc
127 static NTSTATUS
irpc_uptime(struct irpc_message
*msg
,
128 struct irpc_uptime
*r
)
130 struct imessaging_context
*ctx
= talloc_get_type(msg
->private_data
, struct imessaging_context
);
131 *r
->out
.start_time
= timeval_to_nttime(&ctx
->start_time
);
136 return the path to a messaging socket
138 static char *imessaging_path(struct imessaging_context
*msg
, struct server_id server_id
)
140 TALLOC_CTX
*tmp_ctx
= talloc_new(msg
);
141 const char *id
= server_id_str(tmp_ctx
, &server_id
);
146 s
= talloc_asprintf(msg
, "%s/msg.%s", msg
->base_path
, id
);
147 talloc_steal(s
, tmp_ctx
);
152 dispatch a fully received message
154 note that this deliberately can match more than one message handler
155 per message. That allows a single messasging context to register
156 (for example) a debug handler for more than one piece of code
158 static void imessaging_dispatch(struct imessaging_context
*msg
, struct imessaging_rec
*rec
)
160 struct dispatch_fn
*d
, *next
;
162 /* temporary IDs use an idtree, the rest use a array of pointers */
163 if (rec
->header
->msg_type
>= MSG_TMP_BASE
) {
164 d
= (struct dispatch_fn
*)idr_find(msg
->dispatch_tree
,
165 rec
->header
->msg_type
);
166 } else if (rec
->header
->msg_type
< msg
->num_types
) {
167 d
= msg
->dispatch
[rec
->header
->msg_type
];
172 for (; d
; d
= next
) {
175 data
.data
= rec
->packet
.data
+ sizeof(*rec
->header
);
176 data
.length
= rec
->header
->length
;
177 d
->fn(msg
, d
->private_data
, d
->msg_type
, rec
->header
->from
, &data
);
179 rec
->header
->length
= 0;
183 handler for messages that arrive from other nodes in the cluster
185 static void cluster_message_handler(struct imessaging_context
*msg
, DATA_BLOB packet
)
187 struct imessaging_rec
*rec
;
189 rec
= talloc(msg
, struct imessaging_rec
);
191 smb_panic("Unable to allocate imessaging_rec");
195 rec
->path
= msg
->path
;
196 rec
->header
= (struct imessaging_header
*)packet
.data
;
197 rec
->packet
= packet
;
200 if (packet
.length
!= sizeof(*rec
->header
) + rec
->header
->length
) {
201 DEBUG(0,("messaging: bad message header size %d should be %d\n",
202 rec
->header
->length
, (int)(packet
.length
- sizeof(*rec
->header
))));
207 imessaging_dispatch(msg
, rec
);
214 try to send the message
216 static NTSTATUS
try_send(struct imessaging_rec
*rec
)
218 struct imessaging_context
*msg
= rec
->msg
;
222 struct socket_address
*path
;
224 /* rec->path is the path of the *other* socket, where we want
226 path
= socket_address_from_strings(msg
, msg
->sock
->backend_name
,
229 return NT_STATUS_NO_MEMORY
;
232 /* we send with privileges so messages work from any context */
233 priv
= root_privileges();
234 status
= socket_sendto(msg
->sock
, &rec
->packet
, &nsent
, path
);
242 retry backed off messages
244 static void msg_retry_timer(struct tevent_context
*ev
, struct tevent_timer
*te
,
245 struct timeval t
, void *private_data
)
247 struct imessaging_context
*msg
= talloc_get_type(private_data
,
248 struct imessaging_context
);
249 msg
->retry_te
= NULL
;
251 /* put the messages back on the main queue */
252 while (msg
->retry_queue
) {
253 struct imessaging_rec
*rec
= msg
->retry_queue
;
254 DLIST_REMOVE(msg
->retry_queue
, rec
);
255 DLIST_ADD_END(msg
->pending
, rec
, struct imessaging_rec
*);
258 TEVENT_FD_WRITEABLE(msg
->event
.fde
);
262 handle a socket write event
264 static void imessaging_send_handler(struct imessaging_context
*msg
, struct tevent_context
*ev
)
266 while (msg
->pending
) {
267 struct imessaging_rec
*rec
= msg
->pending
;
269 status
= try_send(rec
);
270 if (NT_STATUS_EQUAL(status
, STATUS_MORE_ENTRIES
)) {
272 if (rec
->retries
> 3) {
273 /* we're getting continuous write errors -
274 backoff this record */
275 DLIST_REMOVE(msg
->pending
, rec
);
276 DLIST_ADD_END(msg
->retry_queue
, rec
,
277 struct imessaging_rec
*);
278 if (msg
->retry_te
== NULL
) {
280 tevent_add_timer(ev
, msg
,
281 timeval_current_ofs(1, 0),
282 msg_retry_timer
, msg
);
288 if (!NT_STATUS_IS_OK(status
)) {
289 TALLOC_CTX
*tmp_ctx
= talloc_new(msg
);
290 DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n",
291 server_id_str(tmp_ctx
, &rec
->header
->from
),
292 server_id_str(tmp_ctx
, &rec
->header
->to
),
293 rec
->header
->msg_type
,
295 talloc_free(tmp_ctx
);
297 DLIST_REMOVE(msg
->pending
, rec
);
300 if (msg
->pending
== NULL
) {
301 TEVENT_FD_NOT_WRITEABLE(msg
->event
.fde
);
306 handle a new incoming packet
308 static void imessaging_recv_handler(struct imessaging_context
*msg
, struct tevent_context
*ev
)
310 struct imessaging_rec
*rec
;
315 /* see how many bytes are in the next packet */
316 status
= socket_pending(msg
->sock
, &msize
);
317 if (!NT_STATUS_IS_OK(status
)) {
318 DEBUG(0,("socket_pending failed in messaging - %s\n",
323 packet
= data_blob_talloc(msg
, NULL
, msize
);
324 if (packet
.data
== NULL
) {
325 /* assume this is temporary and retry */
329 status
= socket_recv(msg
->sock
, packet
.data
, msize
, &msize
);
330 if (!NT_STATUS_IS_OK(status
)) {
331 data_blob_free(&packet
);
335 if (msize
< sizeof(*rec
->header
)) {
336 DEBUG(0,("messaging: bad message of size %d\n", (int)msize
));
337 data_blob_free(&packet
);
341 rec
= talloc(msg
, struct imessaging_rec
);
343 smb_panic("Unable to allocate imessaging_rec");
346 talloc_steal(rec
, packet
.data
);
348 rec
->path
= msg
->path
;
349 rec
->header
= (struct imessaging_header
*)packet
.data
;
350 rec
->packet
= packet
;
353 if (msize
!= sizeof(*rec
->header
) + rec
->header
->length
) {
354 DEBUG(0,("messaging: bad message header size %d should be %d\n",
355 rec
->header
->length
, (int)(msize
- sizeof(*rec
->header
))));
360 imessaging_dispatch(msg
, rec
);
366 handle a socket event
368 static void imessaging_handler(struct tevent_context
*ev
, struct tevent_fd
*fde
,
369 uint16_t flags
, void *private_data
)
371 struct imessaging_context
*msg
= talloc_get_type(private_data
,
372 struct imessaging_context
);
373 if (flags
& TEVENT_FD_WRITE
) {
374 imessaging_send_handler(msg
, ev
);
376 if (flags
& TEVENT_FD_READ
) {
377 imessaging_recv_handler(msg
, ev
);
383 Register a dispatch function for a particular message type.
385 NTSTATUS
imessaging_register(struct imessaging_context
*msg
, void *private_data
,
386 uint32_t msg_type
, msg_callback_t fn
)
388 struct dispatch_fn
*d
;
390 /* possibly expand dispatch array */
391 if (msg_type
>= msg
->num_types
) {
392 struct dispatch_fn
**dp
;
394 dp
= talloc_realloc(msg
, msg
->dispatch
, struct dispatch_fn
*, msg_type
+1);
395 NT_STATUS_HAVE_NO_MEMORY(dp
);
397 for (i
=msg
->num_types
;i
<=msg_type
;i
++) {
398 msg
->dispatch
[i
] = NULL
;
400 msg
->num_types
= msg_type
+1;
403 d
= talloc_zero(msg
->dispatch
, struct dispatch_fn
);
404 NT_STATUS_HAVE_NO_MEMORY(d
);
405 d
->msg_type
= msg_type
;
406 d
->private_data
= private_data
;
409 DLIST_ADD(msg
->dispatch
[msg_type
], d
);
415 register a temporary message handler. The msg_type is allocated
418 NTSTATUS
imessaging_register_tmp(struct imessaging_context
*msg
, void *private_data
,
419 msg_callback_t fn
, uint32_t *msg_type
)
421 struct dispatch_fn
*d
;
424 d
= talloc_zero(msg
->dispatch
, struct dispatch_fn
);
425 NT_STATUS_HAVE_NO_MEMORY(d
);
426 d
->private_data
= private_data
;
429 id
= idr_get_new_above(msg
->dispatch_tree
, d
, MSG_TMP_BASE
, UINT16_MAX
);
432 return NT_STATUS_TOO_MANY_CONTEXT_IDS
;
435 d
->msg_type
= (uint32_t)id
;
436 (*msg_type
) = d
->msg_type
;
442 De-register the function for a particular message type.
444 void imessaging_deregister(struct imessaging_context
*msg
, uint32_t msg_type
, void *private_data
)
446 struct dispatch_fn
*d
, *next
;
448 if (msg_type
>= msg
->num_types
) {
449 d
= (struct dispatch_fn
*)idr_find(msg
->dispatch_tree
,
452 idr_remove(msg
->dispatch_tree
, msg_type
);
457 for (d
= msg
->dispatch
[msg_type
]; d
; d
= next
) {
459 if (d
->private_data
== private_data
) {
460 DLIST_REMOVE(msg
->dispatch
[msg_type
], d
);
467 Send a message to a particular server
469 NTSTATUS
imessaging_send(struct imessaging_context
*msg
, struct server_id server
,
470 uint32_t msg_type
, const DATA_BLOB
*data
)
472 struct imessaging_rec
*rec
;
474 size_t dlength
= data
?data
->length
:0;
476 rec
= talloc(msg
, struct imessaging_rec
);
478 return NT_STATUS_NO_MEMORY
;
481 rec
->packet
= data_blob_talloc(rec
, NULL
, sizeof(*rec
->header
) + dlength
);
482 if (rec
->packet
.data
== NULL
) {
484 return NT_STATUS_NO_MEMORY
;
489 rec
->header
= (struct imessaging_header
*)rec
->packet
.data
;
491 ZERO_STRUCTP(rec
->header
);
492 rec
->header
->version
= IMESSAGING_VERSION
;
493 rec
->header
->msg_type
= msg_type
;
494 rec
->header
->from
= msg
->server_id
;
495 rec
->header
->to
= server
;
496 rec
->header
->length
= dlength
;
498 memcpy(rec
->packet
.data
+ sizeof(*rec
->header
),
499 data
->data
, dlength
);
502 if (!cluster_node_equal(&msg
->server_id
, &server
)) {
503 /* the destination is on another node - dispatch via
505 status
= cluster_message_send(server
, &rec
->packet
);
510 rec
->path
= imessaging_path(msg
, server
);
511 talloc_steal(rec
, rec
->path
);
513 if (msg
->pending
!= NULL
) {
514 status
= STATUS_MORE_ENTRIES
;
516 status
= try_send(rec
);
519 if (NT_STATUS_EQUAL(status
, STATUS_MORE_ENTRIES
)) {
520 if (msg
->pending
== NULL
) {
521 TEVENT_FD_WRITEABLE(msg
->event
.fde
);
523 DLIST_ADD_END(msg
->pending
, rec
, struct imessaging_rec
*);
533 Send a message to a particular server, with the message containing a single pointer
535 NTSTATUS
imessaging_send_ptr(struct imessaging_context
*msg
, struct server_id server
,
536 uint32_t msg_type
, void *ptr
)
540 blob
.data
= (uint8_t *)&ptr
;
541 blob
.length
= sizeof(void *);
543 return imessaging_send(msg
, server
, msg_type
, &blob
);
548 remove our messaging socket and database entry
550 int imessaging_cleanup(struct imessaging_context
*msg
)
556 DEBUG(5,("imessaging: cleaning up %s\n", msg
->path
));
558 while (msg
->names
&& msg
->names
[0]) {
559 irpc_remove_name(msg
, msg
->names
[0]);
565 create the listening socket and setup the dispatcher
567 use auto_remove=true when you want a destructor to remove the
568 associated messaging socket and database entry on talloc free. Don't
569 use this in processes that may fork and a child may talloc free this
572 struct imessaging_context
*imessaging_init(TALLOC_CTX
*mem_ctx
,
573 struct loadparm_context
*lp_ctx
,
574 struct server_id server_id
,
575 struct tevent_context
*ev
,
578 struct imessaging_context
*msg
;
580 struct socket_address
*path
;
587 msg
= talloc_zero(mem_ctx
, struct imessaging_context
);
592 /* setup a handler for messages from other cluster nodes, if appropriate */
593 status
= cluster_message_init(msg
, server_id
, cluster_message_handler
);
594 if (!NT_STATUS_IS_OK(status
)) {
598 /* create the messaging directory if needed */
600 msg
->base_path
= lpcfg_imessaging_path(msg
, lp_ctx
);
601 if (msg
->base_path
== NULL
) {
605 ok
= directory_create_or_exist_strict(msg
->base_path
, geteuid(), 0700);
610 msg
->path
= imessaging_path(msg
, server_id
);
611 if (msg
->path
== NULL
) {
615 msg
->server_id
= server_id
;
616 msg
->idr
= idr_init(msg
);
617 if (msg
->idr
== NULL
) {
621 msg
->dispatch_tree
= idr_init(msg
);
622 if (msg
->dispatch_tree
== NULL
) {
626 msg
->start_time
= timeval_current();
628 msg
->names_db
= irpc_namedb_open(msg
, msg
->base_path
, lp_ctx
);
629 if (msg
->names_db
== NULL
) {
633 status
= socket_create("unix", SOCKET_TYPE_DGRAM
, &msg
->sock
, 0);
634 if (!NT_STATUS_IS_OK(status
)) {
638 /* by stealing here we ensure that the socket is cleaned up (and even
640 talloc_steal(msg
, msg
->sock
);
642 path
= socket_address_from_strings(msg
, msg
->sock
->backend_name
,
648 status
= socket_listen(msg
->sock
, path
, 50, 0);
649 if (!NT_STATUS_IS_OK(status
)) {
650 DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg
->path
, nt_errstr(status
)));
654 /* it needs to be non blocking for sends */
655 set_blocking(socket_get_fd(msg
->sock
), false);
657 msg
->event
.fde
= tevent_add_fd(ev
, msg
, socket_get_fd(msg
->sock
),
658 TEVENT_FD_READ
, imessaging_handler
, msg
);
659 tevent_fd_set_auto_close(msg
->event
.fde
);
662 talloc_set_destructor(msg
, imessaging_cleanup
);
665 imessaging_register(msg
, NULL
, MSG_PING
, ping_message
);
666 imessaging_register(msg
, NULL
, MSG_IRPC
, irpc_handler
);
667 IRPC_REGISTER(msg
, irpc
, IRPC_UPTIME
, irpc_uptime
, msg
);
676 A hack, for the short term until we get 'client only' messaging in place
678 struct imessaging_context
*imessaging_client_init(TALLOC_CTX
*mem_ctx
,
679 struct loadparm_context
*lp_ctx
,
680 struct tevent_context
*ev
)
685 id
.task_id
= generate_random();
686 id
.vnn
= NONCLUSTER_VNN
;
688 /* This is because we are not in the s3 serverid database */
689 id
.unique_id
= SERVERID_UNIQUE_ID_NOT_TO_VERIFY
;
691 return imessaging_init(mem_ctx
, lp_ctx
, id
, ev
, true);
694 a list of registered irpc server functions
697 struct irpc_list
*next
, *prev
;
699 const struct ndr_interface_table
*table
;
707 register a irpc server function
709 NTSTATUS
irpc_register(struct imessaging_context
*msg_ctx
,
710 const struct ndr_interface_table
*table
,
711 int callnum
, irpc_function_t fn
, void *private_data
)
713 struct irpc_list
*irpc
;
715 /* override an existing handler, if any */
716 for (irpc
=msg_ctx
->irpc
; irpc
; irpc
=irpc
->next
) {
717 if (irpc
->table
== table
&& irpc
->callnum
== callnum
) {
722 irpc
= talloc(msg_ctx
, struct irpc_list
);
723 NT_STATUS_HAVE_NO_MEMORY(irpc
);
724 DLIST_ADD(msg_ctx
->irpc
, irpc
);
728 irpc
->callnum
= callnum
;
730 irpc
->private_data
= private_data
;
731 irpc
->uuid
= irpc
->table
->syntax_id
.uuid
;
738 handle an incoming irpc reply message
740 static void irpc_handler_reply(struct imessaging_context
*msg_ctx
, struct irpc_message
*m
)
742 struct irpc_request
*irpc
;
744 irpc
= (struct irpc_request
*)idr_find(msg_ctx
->idr
, m
->header
.callid
);
745 if (irpc
== NULL
) return;
747 irpc
->incoming
.handler(irpc
, m
);
753 NTSTATUS
irpc_send_reply(struct irpc_message
*m
, NTSTATUS status
)
755 struct ndr_push
*push
;
757 enum ndr_err_code ndr_err
;
759 m
->header
.status
= status
;
761 /* setup the reply */
762 push
= ndr_push_init_ctx(m
->ndr
);
764 status
= NT_STATUS_NO_MEMORY
;
768 m
->header
.flags
|= IRPC_FLAG_REPLY
;
769 m
->header
.creds
.token
= NULL
;
771 /* construct the packet */
772 ndr_err
= ndr_push_irpc_header(push
, NDR_SCALARS
|NDR_BUFFERS
, &m
->header
);
773 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
774 status
= ndr_map_error2ntstatus(ndr_err
);
778 ndr_err
= m
->irpc
->table
->calls
[m
->irpc
->callnum
].ndr_push(push
, NDR_OUT
, m
->data
);
779 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
780 status
= ndr_map_error2ntstatus(ndr_err
);
784 /* send the reply message */
785 packet
= ndr_push_blob(push
);
786 status
= imessaging_send(m
->msg_ctx
, m
->from
, MSG_IRPC
, &packet
);
787 if (!NT_STATUS_IS_OK(status
)) goto failed
;
795 handle an incoming irpc request message
797 static void irpc_handler_request(struct imessaging_context
*msg_ctx
,
798 struct irpc_message
*m
)
802 enum ndr_err_code ndr_err
;
804 for (i
=msg_ctx
->irpc
; i
; i
=i
->next
) {
805 if (GUID_equal(&i
->uuid
, &m
->header
.uuid
) &&
806 i
->table
->syntax_id
.if_version
== m
->header
.if_version
&&
807 i
->callnum
== m
->header
.callnum
) {
813 /* no registered handler for this message */
818 /* allocate space for the structure */
819 r
= talloc_zero_size(m
->ndr
, i
->table
->calls
[m
->header
.callnum
].struct_size
);
820 if (r
== NULL
) goto failed
;
822 m
->ndr
->flags
|= LIBNDR_FLAG_REF_ALLOC
;
824 /* parse the request data */
825 ndr_err
= i
->table
->calls
[i
->callnum
].ndr_pull(m
->ndr
, NDR_IN
, r
);
826 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
829 m
->private_data
= i
->private_data
;
830 m
->defer_reply
= false;
832 m
->msg_ctx
= msg_ctx
;
836 m
->header
.status
= i
->fn(m
, r
);
839 /* the server function won't ever be replying to this request */
844 if (m
->defer_reply
) {
845 /* the server function has asked to defer the reply to later */
846 talloc_steal(msg_ctx
, m
);
850 irpc_send_reply(m
, m
->header
.status
);
858 handle an incoming irpc message
860 static void irpc_handler(struct imessaging_context
*msg_ctx
, void *private_data
,
861 uint32_t msg_type
, struct server_id src
, DATA_BLOB
*packet
)
863 struct irpc_message
*m
;
864 enum ndr_err_code ndr_err
;
866 m
= talloc(msg_ctx
, struct irpc_message
);
867 if (m
== NULL
) goto failed
;
871 m
->ndr
= ndr_pull_init_blob(packet
, m
);
872 if (m
->ndr
== NULL
) goto failed
;
874 m
->ndr
->flags
|= LIBNDR_FLAG_REF_ALLOC
;
876 ndr_err
= ndr_pull_irpc_header(m
->ndr
, NDR_BUFFERS
|NDR_SCALARS
, &m
->header
);
877 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
879 if (m
->header
.flags
& IRPC_FLAG_REPLY
) {
880 irpc_handler_reply(msg_ctx
, m
);
882 irpc_handler_request(msg_ctx
, m
);
892 destroy a irpc request
894 static int irpc_destructor(struct irpc_request
*irpc
)
896 if (irpc
->callid
!= -1) {
897 idr_remove(irpc
->msg_ctx
->idr
, irpc
->callid
);
905 open the naming database
907 static struct tdb_wrap
*irpc_namedb_open(TALLOC_CTX
*mem_ctx
, const char *base_path
,
908 struct loadparm_context
*lp_ctx
)
911 char *path
= talloc_asprintf(mem_ctx
, "%s/names.tdb", base_path
);
915 t
= tdb_wrap_open(mem_ctx
, path
, lpcfg_tdb_hash_size(lp_ctx
, path
),
916 lpcfg_tdb_flags(lp_ctx
, 0), O_RDWR
|O_CREAT
, 0660);
923 add a string name that this irpc server can be called on
925 NTSTATUS
irpc_add_name(struct imessaging_context
*msg_ctx
, const char *name
)
927 struct tdb_context
*t
= msg_ctx
->names_db
->tdb
;
928 struct server_id pid
= msg_ctx
->server_id
;
931 NTSTATUS status
= NT_STATUS_OK
;
933 msg_ctx
->names
= str_list_add(msg_ctx
->names
, name
);
934 if (msg_ctx
->names
== NULL
) {
935 return NT_STATUS_NO_MEMORY
;
937 talloc_steal(msg_ctx
, msg_ctx
->names
);
939 key
= string_term_tdb_data(name
);
940 data
= (TDB_DATA
) { .dptr
= (uint8_t *)&pid
, .dsize
= sizeof(pid
) };
942 ret
= tdb_append(t
, key
, data
);
944 enum TDB_ERROR err
= tdb_error(t
);
945 str_list_remove(msg_ctx
->names
, name
);
946 return map_nt_error_from_tdb(err
);
953 return a list of server ids for a server name
955 struct server_id
*irpc_servers_byname(struct imessaging_context
*msg_ctx
,
959 struct tdb_wrap
*t
= msg_ctx
->names_db
;
962 struct server_id
*ret
;
964 if (tdb_lock_bystring(t
->tdb
, name
) != 0) {
967 rec
= tdb_fetch_bystring(t
->tdb
, name
);
968 if (rec
.dptr
== NULL
) {
969 tdb_unlock_bystring(t
->tdb
, name
);
972 count
= rec
.dsize
/ sizeof(struct server_id
);
973 ret
= talloc_array(mem_ctx
, struct server_id
, count
+1);
975 tdb_unlock_bystring(t
->tdb
, name
);
978 for (i
=0;i
<count
;i
++) {
979 ret
[i
] = ((struct server_id
*)rec
.dptr
)[i
];
981 server_id_set_disconnected(&ret
[i
]);
983 tdb_unlock_bystring(t
->tdb
, name
);
988 static int all_servers_func(struct tdb_context
*tdb
, TDB_DATA key
, TDB_DATA data
, void *state
)
990 struct irpc_name_records
*name_records
= talloc_get_type(state
, struct irpc_name_records
);
991 struct irpc_name_record
*name_record
;
995 = talloc_realloc(name_records
, name_records
->names
,
996 struct irpc_name_record
*, name_records
->num_records
+1);
997 if (!name_records
->names
) {
1001 name_records
->names
[name_records
->num_records
] = name_record
1002 = talloc(name_records
->names
,
1003 struct irpc_name_record
);
1008 name_records
->num_records
++;
1011 = talloc_strndup(name_record
,
1012 (const char *)key
.dptr
, key
.dsize
);
1013 if (!name_record
->name
) {
1017 name_record
->count
= data
.dsize
/ sizeof(struct server_id
);
1018 name_record
->ids
= talloc_array(name_record
,
1020 name_record
->count
);
1021 if (name_record
->ids
== NULL
) {
1024 for (i
=0;i
<name_record
->count
;i
++) {
1025 name_record
->ids
[i
] = ((struct server_id
*)data
.dptr
)[i
];
1031 return a list of server ids for a server name
1033 struct irpc_name_records
*irpc_all_servers(struct imessaging_context
*msg_ctx
,
1034 TALLOC_CTX
*mem_ctx
)
1036 struct tdb_wrap
*t
= msg_ctx
->names_db
;
1038 struct irpc_name_records
*name_records
= talloc_zero(mem_ctx
, struct irpc_name_records
);
1039 if (name_records
== NULL
) {
1043 ret
= tdb_traverse_read(t
->tdb
, all_servers_func
, name_records
);
1048 return name_records
;
1052 remove a name from a messaging context
1054 void irpc_remove_name(struct imessaging_context
*msg_ctx
, const char *name
)
1056 struct tdb_wrap
*t
= msg_ctx
->names_db
;
1059 struct server_id
*ids
;
1061 str_list_remove(msg_ctx
->names
, name
);
1063 if (tdb_lock_bystring(t
->tdb
, name
) != 0) {
1066 rec
= tdb_fetch_bystring(t
->tdb
, name
);
1067 if (rec
.dptr
== NULL
) {
1068 tdb_unlock_bystring(t
->tdb
, name
);
1071 count
= rec
.dsize
/ sizeof(struct server_id
);
1074 tdb_unlock_bystring(t
->tdb
, name
);
1077 ids
= (struct server_id
*)rec
.dptr
;
1078 for (i
=0;i
<count
;i
++) {
1079 if (cluster_id_equal(&ids
[i
], &msg_ctx
->server_id
)) {
1081 memmove(ids
+i
, ids
+i
+1,
1082 sizeof(struct server_id
) * (count
-(i
+1)));
1084 rec
.dsize
-= sizeof(struct server_id
);
1088 tdb_store_bystring(t
->tdb
, name
, rec
, 0);
1090 tdb_unlock_bystring(t
->tdb
, name
);
1093 struct server_id
imessaging_get_server_id(struct imessaging_context
*msg_ctx
)
1095 return msg_ctx
->server_id
;
1098 struct irpc_bh_state
{
1099 struct imessaging_context
*msg_ctx
;
1100 struct server_id server_id
;
1101 const struct ndr_interface_table
*table
;
1103 struct security_token
*token
;
1106 static bool irpc_bh_is_connected(struct dcerpc_binding_handle
*h
)
1108 struct irpc_bh_state
*hs
= dcerpc_binding_handle_data(h
,
1109 struct irpc_bh_state
);
1118 static uint32_t irpc_bh_set_timeout(struct dcerpc_binding_handle
*h
,
1121 struct irpc_bh_state
*hs
= dcerpc_binding_handle_data(h
,
1122 struct irpc_bh_state
);
1123 uint32_t old
= hs
->timeout
;
1125 hs
->timeout
= timeout
;
1130 struct irpc_bh_raw_call_state
{
1131 struct irpc_request
*irpc
;
1134 DATA_BLOB in_packet
;
1138 static void irpc_bh_raw_call_incoming_handler(struct irpc_request
*irpc
,
1139 struct irpc_message
*m
);
1141 static struct tevent_req
*irpc_bh_raw_call_send(TALLOC_CTX
*mem_ctx
,
1142 struct tevent_context
*ev
,
1143 struct dcerpc_binding_handle
*h
,
1144 const struct GUID
*object
,
1147 const uint8_t *in_data
,
1150 struct irpc_bh_state
*hs
=
1151 dcerpc_binding_handle_data(h
,
1152 struct irpc_bh_state
);
1153 struct tevent_req
*req
;
1154 struct irpc_bh_raw_call_state
*state
;
1156 struct irpc_header header
;
1157 struct ndr_push
*ndr
;
1159 enum ndr_err_code ndr_err
;
1161 req
= tevent_req_create(mem_ctx
, &state
,
1162 struct irpc_bh_raw_call_state
);
1166 state
->opnum
= opnum
;
1167 state
->in_data
.data
= discard_const_p(uint8_t, in_data
);
1168 state
->in_data
.length
= in_length
;
1170 ok
= irpc_bh_is_connected(h
);
1172 tevent_req_nterror(req
, NT_STATUS_CONNECTION_DISCONNECTED
);
1173 return tevent_req_post(req
, ev
);
1176 state
->irpc
= talloc_zero(state
, struct irpc_request
);
1177 if (tevent_req_nomem(state
->irpc
, req
)) {
1178 return tevent_req_post(req
, ev
);
1181 state
->irpc
->msg_ctx
= hs
->msg_ctx
;
1182 state
->irpc
->callid
= idr_get_new(hs
->msg_ctx
->idr
,
1183 state
->irpc
, UINT16_MAX
);
1184 if (state
->irpc
->callid
== -1) {
1185 tevent_req_nterror(req
, NT_STATUS_INSUFFICIENT_RESOURCES
);
1186 return tevent_req_post(req
, ev
);
1188 state
->irpc
->incoming
.handler
= irpc_bh_raw_call_incoming_handler
;
1189 state
->irpc
->incoming
.private_data
= req
;
1191 talloc_set_destructor(state
->irpc
, irpc_destructor
);
1193 /* setup the header */
1194 header
.uuid
= hs
->table
->syntax_id
.uuid
;
1196 header
.if_version
= hs
->table
->syntax_id
.if_version
;
1197 header
.callid
= state
->irpc
->callid
;
1198 header
.callnum
= state
->opnum
;
1200 header
.status
= NT_STATUS_OK
;
1201 header
.creds
.token
= hs
->token
;
1203 /* construct the irpc packet */
1204 ndr
= ndr_push_init_ctx(state
->irpc
);
1205 if (tevent_req_nomem(ndr
, req
)) {
1206 return tevent_req_post(req
, ev
);
1209 ndr_err
= ndr_push_irpc_header(ndr
, NDR_SCALARS
|NDR_BUFFERS
, &header
);
1210 status
= ndr_map_error2ntstatus(ndr_err
);
1211 if (!NT_STATUS_IS_OK(status
)) {
1212 tevent_req_nterror(req
, status
);
1213 return tevent_req_post(req
, ev
);
1216 ndr_err
= ndr_push_bytes(ndr
, in_data
, in_length
);
1217 status
= ndr_map_error2ntstatus(ndr_err
);
1218 if (!NT_STATUS_IS_OK(status
)) {
1219 tevent_req_nterror(req
, status
);
1220 return tevent_req_post(req
, ev
);
1224 state
->in_packet
= ndr_push_blob(ndr
);
1225 status
= imessaging_send(hs
->msg_ctx
, hs
->server_id
,
1226 MSG_IRPC
, &state
->in_packet
);
1227 if (!NT_STATUS_IS_OK(status
)) {
1228 tevent_req_nterror(req
, status
);
1229 return tevent_req_post(req
, ev
);
1232 if (hs
->timeout
!= IRPC_CALL_TIMEOUT_INF
) {
1233 /* set timeout-callback in case caller wants that */
1234 ok
= tevent_req_set_endtime(req
, ev
, timeval_current_ofs(hs
->timeout
, 0));
1236 return tevent_req_post(req
, ev
);
1243 static void irpc_bh_raw_call_incoming_handler(struct irpc_request
*irpc
,
1244 struct irpc_message
*m
)
1246 struct tevent_req
*req
=
1247 talloc_get_type_abort(irpc
->incoming
.private_data
,
1249 struct irpc_bh_raw_call_state
*state
=
1250 tevent_req_data(req
,
1251 struct irpc_bh_raw_call_state
);
1253 talloc_steal(state
, m
);
1255 if (!NT_STATUS_IS_OK(m
->header
.status
)) {
1256 tevent_req_nterror(req
, m
->header
.status
);
1260 state
->out_data
= data_blob_talloc(state
,
1261 m
->ndr
->data
+ m
->ndr
->offset
,
1262 m
->ndr
->data_size
- m
->ndr
->offset
);
1263 if ((m
->ndr
->data_size
- m
->ndr
->offset
) > 0 && !state
->out_data
.data
) {
1264 tevent_req_oom(req
);
1268 tevent_req_done(req
);
1271 static NTSTATUS
irpc_bh_raw_call_recv(struct tevent_req
*req
,
1272 TALLOC_CTX
*mem_ctx
,
1275 uint32_t *out_flags
)
1277 struct irpc_bh_raw_call_state
*state
=
1278 tevent_req_data(req
,
1279 struct irpc_bh_raw_call_state
);
1282 if (tevent_req_is_nterror(req
, &status
)) {
1283 tevent_req_received(req
);
1287 *out_data
= talloc_move(mem_ctx
, &state
->out_data
.data
);
1288 *out_length
= state
->out_data
.length
;
1290 tevent_req_received(req
);
1291 return NT_STATUS_OK
;
1294 struct irpc_bh_disconnect_state
{
1298 static struct tevent_req
*irpc_bh_disconnect_send(TALLOC_CTX
*mem_ctx
,
1299 struct tevent_context
*ev
,
1300 struct dcerpc_binding_handle
*h
)
1302 struct irpc_bh_state
*hs
= dcerpc_binding_handle_data(h
,
1303 struct irpc_bh_state
);
1304 struct tevent_req
*req
;
1305 struct irpc_bh_disconnect_state
*state
;
1308 req
= tevent_req_create(mem_ctx
, &state
,
1309 struct irpc_bh_disconnect_state
);
1314 ok
= irpc_bh_is_connected(h
);
1316 tevent_req_nterror(req
, NT_STATUS_CONNECTION_DISCONNECTED
);
1317 return tevent_req_post(req
, ev
);
1322 tevent_req_done(req
);
1323 return tevent_req_post(req
, ev
);
1326 static NTSTATUS
irpc_bh_disconnect_recv(struct tevent_req
*req
)
1330 if (tevent_req_is_nterror(req
, &status
)) {
1331 tevent_req_received(req
);
1335 tevent_req_received(req
);
1336 return NT_STATUS_OK
;
1339 static bool irpc_bh_ref_alloc(struct dcerpc_binding_handle
*h
)
1344 static const struct dcerpc_binding_handle_ops irpc_bh_ops
= {
1346 .is_connected
= irpc_bh_is_connected
,
1347 .set_timeout
= irpc_bh_set_timeout
,
1348 .raw_call_send
= irpc_bh_raw_call_send
,
1349 .raw_call_recv
= irpc_bh_raw_call_recv
,
1350 .disconnect_send
= irpc_bh_disconnect_send
,
1351 .disconnect_recv
= irpc_bh_disconnect_recv
,
1353 .ref_alloc
= irpc_bh_ref_alloc
,
1356 /* initialise a irpc binding handle */
1357 struct dcerpc_binding_handle
*irpc_binding_handle(TALLOC_CTX
*mem_ctx
,
1358 struct imessaging_context
*msg_ctx
,
1359 struct server_id server_id
,
1360 const struct ndr_interface_table
*table
)
1362 struct dcerpc_binding_handle
*h
;
1363 struct irpc_bh_state
*hs
;
1365 h
= dcerpc_binding_handle_create(mem_ctx
,
1370 struct irpc_bh_state
,
1375 hs
->msg_ctx
= msg_ctx
;
1376 hs
->server_id
= server_id
;
1378 hs
->timeout
= IRPC_CALL_TIMEOUT
;
1383 struct dcerpc_binding_handle
*irpc_binding_handle_by_name(TALLOC_CTX
*mem_ctx
,
1384 struct imessaging_context
*msg_ctx
,
1385 const char *dest_task
,
1386 const struct ndr_interface_table
*table
)
1388 struct dcerpc_binding_handle
*h
;
1389 struct server_id
*sids
;
1390 struct server_id sid
;
1392 /* find the server task */
1393 sids
= irpc_servers_byname(msg_ctx
, mem_ctx
, dest_task
);
1395 errno
= EADDRNOTAVAIL
;
1398 if (server_id_is_disconnected(&sids
[0])) {
1400 errno
= EADDRNOTAVAIL
;
1406 h
= irpc_binding_handle(mem_ctx
, msg_ctx
,
1415 void irpc_binding_handle_add_security_token(struct dcerpc_binding_handle
*h
,
1416 struct security_token
*token
)
1418 struct irpc_bh_state
*hs
=
1419 dcerpc_binding_handle_data(h
,
1420 struct irpc_bh_state
);