2 Unix SMB/CIFS implementation.
4 Samba internal messaging functions
6 Copyright (C) Andrew Tridgell 2004
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "lib/events/events.h"
24 #include "system/filesys.h"
25 #include "messaging/messaging.h"
26 #include "../lib/util/dlinklist.h"
27 #include "lib/socket/socket.h"
28 #include "librpc/gen_ndr/ndr_irpc.h"
29 #include "lib/messaging/irpc.h"
31 #include "../lib/util/unix_privs.h"
32 #include "librpc/rpc/dcerpc.h"
34 #include "../lib/util/util_tdb.h"
35 #include "cluster/cluster.h"
36 #include "../lib/util/tevent_ntstatus.h"
38 /* change the message version with any incompatible changes in the protocol */
39 #define MESSAGING_VERSION 1
45 struct messaging_context
*msg_ctx
;
48 void (*handler
)(struct irpc_request
*irpc
, struct irpc_message
*m
);
53 struct messaging_context
{
54 struct server_id server_id
;
55 struct socket_context
*sock
;
56 const char *base_path
;
58 struct dispatch_fn
**dispatch
;
60 struct idr_context
*dispatch_tree
;
61 struct messaging_rec
*pending
;
62 struct messaging_rec
*retry_queue
;
63 struct irpc_list
*irpc
;
64 struct idr_context
*idr
;
66 struct timeval start_time
;
67 struct tevent_timer
*retry_te
;
69 struct tevent_context
*ev
;
70 struct tevent_fd
*fde
;
74 /* we have a linked list of dispatch handlers for each msg_type that
75 this messaging server can deal with */
77 struct dispatch_fn
*next
, *prev
;
83 /* an individual message */
84 struct messaging_rec
{
85 struct messaging_rec
*next
, *prev
;
86 struct messaging_context
*msg
;
89 struct messaging_header
{
92 struct server_id from
;
102 static void irpc_handler(struct messaging_context
*, void *,
103 uint32_t, struct server_id
, DATA_BLOB
*);
107 A useful function for testing the message system.
109 static void ping_message(struct messaging_context
*msg
, void *private_data
,
110 uint32_t msg_type
, struct server_id src
, DATA_BLOB
*data
)
112 DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n",
113 (unsigned int)src
.node
, (unsigned int)src
.id
, (int)data
->length
,
114 data
->data
?(const char *)data
->data
:""));
115 messaging_send(msg
, src
, MSG_PONG
, data
);
119 return uptime of messaging server via irpc
121 static NTSTATUS
irpc_uptime(struct irpc_message
*msg
,
122 struct irpc_uptime
*r
)
124 struct messaging_context
*ctx
= talloc_get_type(msg
->private_data
, struct messaging_context
);
125 *r
->out
.start_time
= timeval_to_nttime(&ctx
->start_time
);
130 return the path to a messaging socket
132 static char *messaging_path(struct messaging_context
*msg
, struct server_id server_id
)
134 TALLOC_CTX
*tmp_ctx
= talloc_new(msg
);
135 const char *id
= cluster_id_string(tmp_ctx
, server_id
);
140 s
= talloc_asprintf(msg
, "%s/msg.%s", msg
->base_path
, id
);
141 talloc_steal(s
, tmp_ctx
);
146 dispatch a fully received message
148 note that this deliberately can match more than one message handler
149 per message. That allows a single messasging context to register
150 (for example) a debug handler for more than one piece of code
152 static void messaging_dispatch(struct messaging_context
*msg
, struct messaging_rec
*rec
)
154 struct dispatch_fn
*d
, *next
;
156 /* temporary IDs use an idtree, the rest use a array of pointers */
157 if (rec
->header
->msg_type
>= MSG_TMP_BASE
) {
158 d
= (struct dispatch_fn
*)idr_find(msg
->dispatch_tree
,
159 rec
->header
->msg_type
);
160 } else if (rec
->header
->msg_type
< msg
->num_types
) {
161 d
= msg
->dispatch
[rec
->header
->msg_type
];
166 for (; d
; d
= next
) {
169 data
.data
= rec
->packet
.data
+ sizeof(*rec
->header
);
170 data
.length
= rec
->header
->length
;
171 d
->fn(msg
, d
->private_data
, d
->msg_type
, rec
->header
->from
, &data
);
173 rec
->header
->length
= 0;
177 handler for messages that arrive from other nodes in the cluster
179 static void cluster_message_handler(struct messaging_context
*msg
, DATA_BLOB packet
)
181 struct messaging_rec
*rec
;
183 rec
= talloc(msg
, struct messaging_rec
);
185 smb_panic("Unable to allocate messaging_rec");
189 rec
->path
= msg
->path
;
190 rec
->header
= (struct messaging_header
*)packet
.data
;
191 rec
->packet
= packet
;
194 if (packet
.length
!= sizeof(*rec
->header
) + rec
->header
->length
) {
195 DEBUG(0,("messaging: bad message header size %d should be %d\n",
196 rec
->header
->length
, (int)(packet
.length
- sizeof(*rec
->header
))));
201 messaging_dispatch(msg
, rec
);
208 try to send the message
210 static NTSTATUS
try_send(struct messaging_rec
*rec
)
212 struct messaging_context
*msg
= rec
->msg
;
216 struct socket_address
*path
;
218 /* rec->path is the path of the *other* socket, where we want
220 path
= socket_address_from_strings(msg
, msg
->sock
->backend_name
,
223 return NT_STATUS_NO_MEMORY
;
226 /* we send with privileges so messages work from any context */
227 priv
= root_privileges();
228 status
= socket_sendto(msg
->sock
, &rec
->packet
, &nsent
, path
);
236 retry backed off messages
238 static void msg_retry_timer(struct tevent_context
*ev
, struct tevent_timer
*te
,
239 struct timeval t
, void *private_data
)
241 struct messaging_context
*msg
= talloc_get_type(private_data
,
242 struct messaging_context
);
243 msg
->retry_te
= NULL
;
245 /* put the messages back on the main queue */
246 while (msg
->retry_queue
) {
247 struct messaging_rec
*rec
= msg
->retry_queue
;
248 DLIST_REMOVE(msg
->retry_queue
, rec
);
249 DLIST_ADD_END(msg
->pending
, rec
, struct messaging_rec
*);
252 EVENT_FD_WRITEABLE(msg
->event
.fde
);
256 handle a socket write event
258 static void messaging_send_handler(struct messaging_context
*msg
)
260 while (msg
->pending
) {
261 struct messaging_rec
*rec
= msg
->pending
;
263 status
= try_send(rec
);
264 if (NT_STATUS_EQUAL(status
, STATUS_MORE_ENTRIES
)) {
266 if (rec
->retries
> 3) {
267 /* we're getting continuous write errors -
268 backoff this record */
269 DLIST_REMOVE(msg
->pending
, rec
);
270 DLIST_ADD_END(msg
->retry_queue
, rec
,
271 struct messaging_rec
*);
272 if (msg
->retry_te
== NULL
) {
274 event_add_timed(msg
->event
.ev
, msg
,
275 timeval_current_ofs(1, 0),
276 msg_retry_timer
, msg
);
282 if (!NT_STATUS_IS_OK(status
)) {
283 TALLOC_CTX
*tmp_ctx
= talloc_new(msg
);
284 DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n",
285 cluster_id_string(tmp_ctx
, rec
->header
->from
),
286 cluster_id_string(tmp_ctx
, rec
->header
->to
),
287 rec
->header
->msg_type
,
289 talloc_free(tmp_ctx
);
291 DLIST_REMOVE(msg
->pending
, rec
);
294 if (msg
->pending
== NULL
) {
295 EVENT_FD_NOT_WRITEABLE(msg
->event
.fde
);
300 handle a new incoming packet
302 static void messaging_recv_handler(struct messaging_context
*msg
)
304 struct messaging_rec
*rec
;
309 /* see how many bytes are in the next packet */
310 status
= socket_pending(msg
->sock
, &msize
);
311 if (!NT_STATUS_IS_OK(status
)) {
312 DEBUG(0,("socket_pending failed in messaging - %s\n",
317 packet
= data_blob_talloc(msg
, NULL
, msize
);
318 if (packet
.data
== NULL
) {
319 /* assume this is temporary and retry */
323 status
= socket_recv(msg
->sock
, packet
.data
, msize
, &msize
);
324 if (!NT_STATUS_IS_OK(status
)) {
325 data_blob_free(&packet
);
329 if (msize
< sizeof(*rec
->header
)) {
330 DEBUG(0,("messaging: bad message of size %d\n", (int)msize
));
331 data_blob_free(&packet
);
335 rec
= talloc(msg
, struct messaging_rec
);
337 smb_panic("Unable to allocate messaging_rec");
340 talloc_steal(rec
, packet
.data
);
342 rec
->path
= msg
->path
;
343 rec
->header
= (struct messaging_header
*)packet
.data
;
344 rec
->packet
= packet
;
347 if (msize
!= sizeof(*rec
->header
) + rec
->header
->length
) {
348 DEBUG(0,("messaging: bad message header size %d should be %d\n",
349 rec
->header
->length
, (int)(msize
- sizeof(*rec
->header
))));
354 messaging_dispatch(msg
, rec
);
360 handle a socket event
362 static void messaging_handler(struct tevent_context
*ev
, struct tevent_fd
*fde
,
363 uint16_t flags
, void *private_data
)
365 struct messaging_context
*msg
= talloc_get_type(private_data
,
366 struct messaging_context
);
367 if (flags
& EVENT_FD_WRITE
) {
368 messaging_send_handler(msg
);
370 if (flags
& EVENT_FD_READ
) {
371 messaging_recv_handler(msg
);
377 Register a dispatch function for a particular message type.
379 NTSTATUS
messaging_register(struct messaging_context
*msg
, void *private_data
,
380 uint32_t msg_type
, msg_callback_t fn
)
382 struct dispatch_fn
*d
;
384 /* possibly expand dispatch array */
385 if (msg_type
>= msg
->num_types
) {
386 struct dispatch_fn
**dp
;
388 dp
= talloc_realloc(msg
, msg
->dispatch
, struct dispatch_fn
*, msg_type
+1);
389 NT_STATUS_HAVE_NO_MEMORY(dp
);
391 for (i
=msg
->num_types
;i
<=msg_type
;i
++) {
392 msg
->dispatch
[i
] = NULL
;
394 msg
->num_types
= msg_type
+1;
397 d
= talloc_zero(msg
->dispatch
, struct dispatch_fn
);
398 NT_STATUS_HAVE_NO_MEMORY(d
);
399 d
->msg_type
= msg_type
;
400 d
->private_data
= private_data
;
403 DLIST_ADD(msg
->dispatch
[msg_type
], d
);
409 register a temporary message handler. The msg_type is allocated
412 NTSTATUS
messaging_register_tmp(struct messaging_context
*msg
, void *private_data
,
413 msg_callback_t fn
, uint32_t *msg_type
)
415 struct dispatch_fn
*d
;
418 d
= talloc_zero(msg
->dispatch
, struct dispatch_fn
);
419 NT_STATUS_HAVE_NO_MEMORY(d
);
420 d
->private_data
= private_data
;
423 id
= idr_get_new_above(msg
->dispatch_tree
, d
, MSG_TMP_BASE
, UINT16_MAX
);
426 return NT_STATUS_TOO_MANY_CONTEXT_IDS
;
429 d
->msg_type
= (uint32_t)id
;
430 (*msg_type
) = d
->msg_type
;
436 De-register the function for a particular message type.
438 void messaging_deregister(struct messaging_context
*msg
, uint32_t msg_type
, void *private_data
)
440 struct dispatch_fn
*d
, *next
;
442 if (msg_type
>= msg
->num_types
) {
443 d
= (struct dispatch_fn
*)idr_find(msg
->dispatch_tree
,
446 idr_remove(msg
->dispatch_tree
, msg_type
);
451 for (d
= msg
->dispatch
[msg_type
]; d
; d
= next
) {
453 if (d
->private_data
== private_data
) {
454 DLIST_REMOVE(msg
->dispatch
[msg_type
], d
);
461 Send a message to a particular server
463 NTSTATUS
messaging_send(struct messaging_context
*msg
, struct server_id server
,
464 uint32_t msg_type
, const DATA_BLOB
*data
)
466 struct messaging_rec
*rec
;
468 size_t dlength
= data
?data
->length
:0;
470 rec
= talloc(msg
, struct messaging_rec
);
472 return NT_STATUS_NO_MEMORY
;
475 rec
->packet
= data_blob_talloc(rec
, NULL
, sizeof(*rec
->header
) + dlength
);
476 if (rec
->packet
.data
== NULL
) {
478 return NT_STATUS_NO_MEMORY
;
483 rec
->header
= (struct messaging_header
*)rec
->packet
.data
;
485 ZERO_STRUCTP(rec
->header
);
486 rec
->header
->version
= MESSAGING_VERSION
;
487 rec
->header
->msg_type
= msg_type
;
488 rec
->header
->from
= msg
->server_id
;
489 rec
->header
->to
= server
;
490 rec
->header
->length
= dlength
;
492 memcpy(rec
->packet
.data
+ sizeof(*rec
->header
),
493 data
->data
, dlength
);
496 if (!cluster_node_equal(&msg
->server_id
, &server
)) {
497 /* the destination is on another node - dispatch via
499 status
= cluster_message_send(server
, &rec
->packet
);
504 rec
->path
= messaging_path(msg
, server
);
505 talloc_steal(rec
, rec
->path
);
507 if (msg
->pending
!= NULL
) {
508 status
= STATUS_MORE_ENTRIES
;
510 status
= try_send(rec
);
513 if (NT_STATUS_EQUAL(status
, STATUS_MORE_ENTRIES
)) {
514 if (msg
->pending
== NULL
) {
515 EVENT_FD_WRITEABLE(msg
->event
.fde
);
517 DLIST_ADD_END(msg
->pending
, rec
, struct messaging_rec
*);
527 Send a message to a particular server, with the message containing a single pointer
529 NTSTATUS
messaging_send_ptr(struct messaging_context
*msg
, struct server_id server
,
530 uint32_t msg_type
, void *ptr
)
534 blob
.data
= (uint8_t *)&ptr
;
535 blob
.length
= sizeof(void *);
537 return messaging_send(msg
, server
, msg_type
, &blob
);
542 destroy the messaging context
544 static int messaging_destructor(struct messaging_context
*msg
)
547 while (msg
->names
&& msg
->names
[0]) {
548 irpc_remove_name(msg
, msg
->names
[0]);
554 create the listening socket and setup the dispatcher
556 struct messaging_context
*messaging_init(TALLOC_CTX
*mem_ctx
,
558 struct server_id server_id
,
559 struct tevent_context
*ev
)
561 struct messaging_context
*msg
;
563 struct socket_address
*path
;
569 msg
= talloc_zero(mem_ctx
, struct messaging_context
);
574 /* setup a handler for messages from other cluster nodes, if appropriate */
575 status
= cluster_message_init(msg
, server_id
, cluster_message_handler
);
576 if (!NT_STATUS_IS_OK(status
)) {
581 /* create the messaging directory if needed */
584 msg
->base_path
= talloc_reference(msg
, dir
);
585 msg
->path
= messaging_path(msg
, server_id
);
586 msg
->server_id
= server_id
;
587 msg
->idr
= idr_init(msg
);
588 msg
->dispatch_tree
= idr_init(msg
);
589 msg
->start_time
= timeval_current();
591 status
= socket_create("unix", SOCKET_TYPE_DGRAM
, &msg
->sock
, 0);
592 if (!NT_STATUS_IS_OK(status
)) {
597 /* by stealing here we ensure that the socket is cleaned up (and even
599 talloc_steal(msg
, msg
->sock
);
601 path
= socket_address_from_strings(msg
, msg
->sock
->backend_name
,
608 status
= socket_listen(msg
->sock
, path
, 50, 0);
609 if (!NT_STATUS_IS_OK(status
)) {
610 DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg
->path
, nt_errstr(status
)));
615 /* it needs to be non blocking for sends */
616 set_blocking(socket_get_fd(msg
->sock
), false);
619 msg
->event
.fde
= event_add_fd(ev
, msg
, socket_get_fd(msg
->sock
),
620 EVENT_FD_READ
, messaging_handler
, msg
);
621 tevent_fd_set_auto_close(msg
->event
.fde
);
623 talloc_set_destructor(msg
, messaging_destructor
);
625 messaging_register(msg
, NULL
, MSG_PING
, ping_message
);
626 messaging_register(msg
, NULL
, MSG_IRPC
, irpc_handler
);
627 IRPC_REGISTER(msg
, irpc
, IRPC_UPTIME
, irpc_uptime
, msg
);
633 A hack, for the short term until we get 'client only' messaging in place
635 struct messaging_context
*messaging_client_init(TALLOC_CTX
*mem_ctx
,
637 struct tevent_context
*ev
)
641 id
.id
= random() % 0x10000000;
642 return messaging_init(mem_ctx
, dir
, id
, ev
);
645 a list of registered irpc server functions
648 struct irpc_list
*next
, *prev
;
650 const struct ndr_interface_table
*table
;
658 register a irpc server function
660 NTSTATUS
irpc_register(struct messaging_context
*msg_ctx
,
661 const struct ndr_interface_table
*table
,
662 int callnum
, irpc_function_t fn
, void *private_data
)
664 struct irpc_list
*irpc
;
666 /* override an existing handler, if any */
667 for (irpc
=msg_ctx
->irpc
; irpc
; irpc
=irpc
->next
) {
668 if (irpc
->table
== table
&& irpc
->callnum
== callnum
) {
673 irpc
= talloc(msg_ctx
, struct irpc_list
);
674 NT_STATUS_HAVE_NO_MEMORY(irpc
);
675 DLIST_ADD(msg_ctx
->irpc
, irpc
);
679 irpc
->callnum
= callnum
;
681 irpc
->private_data
= private_data
;
682 irpc
->uuid
= irpc
->table
->syntax_id
.uuid
;
689 handle an incoming irpc reply message
691 static void irpc_handler_reply(struct messaging_context
*msg_ctx
, struct irpc_message
*m
)
693 struct irpc_request
*irpc
;
695 irpc
= (struct irpc_request
*)idr_find(msg_ctx
->idr
, m
->header
.callid
);
696 if (irpc
== NULL
) return;
698 irpc
->incoming
.handler(irpc
, m
);
704 NTSTATUS
irpc_send_reply(struct irpc_message
*m
, NTSTATUS status
)
706 struct ndr_push
*push
;
708 enum ndr_err_code ndr_err
;
710 m
->header
.status
= status
;
712 /* setup the reply */
713 push
= ndr_push_init_ctx(m
->ndr
);
715 status
= NT_STATUS_NO_MEMORY
;
719 m
->header
.flags
|= IRPC_FLAG_REPLY
;
720 m
->header
.creds
.token
= NULL
;
722 /* construct the packet */
723 ndr_err
= ndr_push_irpc_header(push
, NDR_SCALARS
|NDR_BUFFERS
, &m
->header
);
724 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
725 status
= ndr_map_error2ntstatus(ndr_err
);
729 ndr_err
= m
->irpc
->table
->calls
[m
->irpc
->callnum
].ndr_push(push
, NDR_OUT
, m
->data
);
730 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
731 status
= ndr_map_error2ntstatus(ndr_err
);
735 /* send the reply message */
736 packet
= ndr_push_blob(push
);
737 status
= messaging_send(m
->msg_ctx
, m
->from
, MSG_IRPC
, &packet
);
738 if (!NT_STATUS_IS_OK(status
)) goto failed
;
746 handle an incoming irpc request message
748 static void irpc_handler_request(struct messaging_context
*msg_ctx
,
749 struct irpc_message
*m
)
753 enum ndr_err_code ndr_err
;
755 for (i
=msg_ctx
->irpc
; i
; i
=i
->next
) {
756 if (GUID_equal(&i
->uuid
, &m
->header
.uuid
) &&
757 i
->table
->syntax_id
.if_version
== m
->header
.if_version
&&
758 i
->callnum
== m
->header
.callnum
) {
764 /* no registered handler for this message */
769 /* allocate space for the structure */
770 r
= talloc_zero_size(m
->ndr
, i
->table
->calls
[m
->header
.callnum
].struct_size
);
771 if (r
== NULL
) goto failed
;
773 m
->ndr
->flags
|= LIBNDR_FLAG_REF_ALLOC
;
775 /* parse the request data */
776 ndr_err
= i
->table
->calls
[i
->callnum
].ndr_pull(m
->ndr
, NDR_IN
, r
);
777 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
780 m
->private_data
= i
->private_data
;
781 m
->defer_reply
= false;
783 m
->msg_ctx
= msg_ctx
;
786 m
->ev
= msg_ctx
->event
.ev
;
788 m
->header
.status
= i
->fn(m
, r
);
791 /* the server function won't ever be replying to this request */
796 if (m
->defer_reply
) {
797 /* the server function has asked to defer the reply to later */
798 talloc_steal(msg_ctx
, m
);
802 irpc_send_reply(m
, m
->header
.status
);
810 handle an incoming irpc message
812 static void irpc_handler(struct messaging_context
*msg_ctx
, void *private_data
,
813 uint32_t msg_type
, struct server_id src
, DATA_BLOB
*packet
)
815 struct irpc_message
*m
;
816 enum ndr_err_code ndr_err
;
818 m
= talloc(msg_ctx
, struct irpc_message
);
819 if (m
== NULL
) goto failed
;
823 m
->ndr
= ndr_pull_init_blob(packet
, m
);
824 if (m
->ndr
== NULL
) goto failed
;
826 m
->ndr
->flags
|= LIBNDR_FLAG_REF_ALLOC
;
828 ndr_err
= ndr_pull_irpc_header(m
->ndr
, NDR_BUFFERS
|NDR_SCALARS
, &m
->header
);
829 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) goto failed
;
831 if (m
->header
.flags
& IRPC_FLAG_REPLY
) {
832 irpc_handler_reply(msg_ctx
, m
);
834 irpc_handler_request(msg_ctx
, m
);
844 destroy a irpc request
846 static int irpc_destructor(struct irpc_request
*irpc
)
848 if (irpc
->callid
!= -1) {
849 idr_remove(irpc
->msg_ctx
->idr
, irpc
->callid
);
857 open the naming database
859 static struct tdb_wrap
*irpc_namedb_open(struct messaging_context
*msg_ctx
)
862 char *path
= talloc_asprintf(msg_ctx
, "%s/names.tdb", msg_ctx
->base_path
);
866 t
= tdb_wrap_open(msg_ctx
, path
, 0, 0, O_RDWR
|O_CREAT
, 0660);
873 add a string name that this irpc server can be called on
875 NTSTATUS
irpc_add_name(struct messaging_context
*msg_ctx
, const char *name
)
880 NTSTATUS status
= NT_STATUS_OK
;
882 t
= irpc_namedb_open(msg_ctx
);
883 NT_STATUS_HAVE_NO_MEMORY(t
);
885 if (tdb_lock_bystring(t
->tdb
, name
) != 0) {
887 return NT_STATUS_LOCK_NOT_GRANTED
;
889 rec
= tdb_fetch_bystring(t
->tdb
, name
);
890 count
= rec
.dsize
/ sizeof(struct server_id
);
891 rec
.dptr
= (unsigned char *)realloc_p(rec
.dptr
, struct server_id
, count
+1);
892 rec
.dsize
+= sizeof(struct server_id
);
893 if (rec
.dptr
== NULL
) {
894 tdb_unlock_bystring(t
->tdb
, name
);
896 return NT_STATUS_NO_MEMORY
;
898 ((struct server_id
*)rec
.dptr
)[count
] = msg_ctx
->server_id
;
899 if (tdb_store_bystring(t
->tdb
, name
, rec
, 0) != 0) {
900 status
= NT_STATUS_INTERNAL_ERROR
;
903 tdb_unlock_bystring(t
->tdb
, name
);
906 msg_ctx
->names
= str_list_add(msg_ctx
->names
, name
);
907 talloc_steal(msg_ctx
, msg_ctx
->names
);
913 return a list of server ids for a server name
915 struct server_id
*irpc_servers_byname(struct messaging_context
*msg_ctx
,
922 struct server_id
*ret
;
924 t
= irpc_namedb_open(msg_ctx
);
929 if (tdb_lock_bystring(t
->tdb
, name
) != 0) {
933 rec
= tdb_fetch_bystring(t
->tdb
, name
);
934 if (rec
.dptr
== NULL
) {
935 tdb_unlock_bystring(t
->tdb
, name
);
939 count
= rec
.dsize
/ sizeof(struct server_id
);
940 ret
= talloc_array(mem_ctx
, struct server_id
, count
+1);
942 tdb_unlock_bystring(t
->tdb
, name
);
946 for (i
=0;i
<count
;i
++) {
947 ret
[i
] = ((struct server_id
*)rec
.dptr
)[i
];
949 ret
[i
] = cluster_id(0, 0);
951 tdb_unlock_bystring(t
->tdb
, name
);
958 remove a name from a messaging context
960 void irpc_remove_name(struct messaging_context
*msg_ctx
, const char *name
)
965 struct server_id
*ids
;
967 str_list_remove(msg_ctx
->names
, name
);
969 t
= irpc_namedb_open(msg_ctx
);
974 if (tdb_lock_bystring(t
->tdb
, name
) != 0) {
978 rec
= tdb_fetch_bystring(t
->tdb
, name
);
979 if (rec
.dptr
== NULL
) {
980 tdb_unlock_bystring(t
->tdb
, name
);
984 count
= rec
.dsize
/ sizeof(struct server_id
);
987 tdb_unlock_bystring(t
->tdb
, name
);
991 ids
= (struct server_id
*)rec
.dptr
;
992 for (i
=0;i
<count
;i
++) {
993 if (cluster_id_equal(&ids
[i
], &msg_ctx
->server_id
)) {
995 memmove(ids
+i
, ids
+i
+1,
996 sizeof(struct server_id
) * (count
-(i
+1)));
998 rec
.dsize
-= sizeof(struct server_id
);
1002 tdb_store_bystring(t
->tdb
, name
, rec
, 0);
1004 tdb_unlock_bystring(t
->tdb
, name
);
1008 struct server_id
messaging_get_server_id(struct messaging_context
*msg_ctx
)
1010 return msg_ctx
->server_id
;
1013 struct irpc_bh_state
{
1014 struct messaging_context
*msg_ctx
;
1015 struct server_id server_id
;
1016 const struct ndr_interface_table
*table
;
1018 struct security_token
*token
;
1021 static bool irpc_bh_is_connected(struct dcerpc_binding_handle
*h
)
1023 struct irpc_bh_state
*hs
= dcerpc_binding_handle_data(h
,
1024 struct irpc_bh_state
);
1033 static uint32_t irpc_bh_set_timeout(struct dcerpc_binding_handle
*h
,
1036 struct irpc_bh_state
*hs
= dcerpc_binding_handle_data(h
,
1037 struct irpc_bh_state
);
1038 uint32_t old
= hs
->timeout
;
1040 hs
->timeout
= timeout
;
1045 struct irpc_bh_raw_call_state
{
1046 struct irpc_request
*irpc
;
1049 DATA_BLOB in_packet
;
1053 static void irpc_bh_raw_call_incoming_handler(struct irpc_request
*irpc
,
1054 struct irpc_message
*m
);
1056 static struct tevent_req
*irpc_bh_raw_call_send(TALLOC_CTX
*mem_ctx
,
1057 struct tevent_context
*ev
,
1058 struct dcerpc_binding_handle
*h
,
1059 const struct GUID
*object
,
1062 const uint8_t *in_data
,
1065 struct irpc_bh_state
*hs
=
1066 dcerpc_binding_handle_data(h
,
1067 struct irpc_bh_state
);
1068 struct tevent_req
*req
;
1069 struct irpc_bh_raw_call_state
*state
;
1071 struct irpc_header header
;
1072 struct ndr_push
*ndr
;
1074 enum ndr_err_code ndr_err
;
1076 req
= tevent_req_create(mem_ctx
, &state
,
1077 struct irpc_bh_raw_call_state
);
1081 state
->opnum
= opnum
;
1082 state
->in_data
.data
= discard_const_p(uint8_t, in_data
);
1083 state
->in_data
.length
= in_length
;
1085 ok
= irpc_bh_is_connected(h
);
1087 tevent_req_nterror(req
, NT_STATUS_INVALID_CONNECTION
);
1088 return tevent_req_post(req
, ev
);
1091 state
->irpc
= talloc_zero(state
, struct irpc_request
);
1092 if (tevent_req_nomem(state
->irpc
, req
)) {
1093 return tevent_req_post(req
, ev
);
1096 state
->irpc
->msg_ctx
= hs
->msg_ctx
;
1097 state
->irpc
->callid
= idr_get_new(hs
->msg_ctx
->idr
,
1098 state
->irpc
, UINT16_MAX
);
1099 if (state
->irpc
->callid
== -1) {
1100 tevent_req_nterror(req
, NT_STATUS_INSUFFICIENT_RESOURCES
);
1101 return tevent_req_post(req
, ev
);
1103 state
->irpc
->incoming
.handler
= irpc_bh_raw_call_incoming_handler
;
1104 state
->irpc
->incoming
.private_data
= req
;
1106 talloc_set_destructor(state
->irpc
, irpc_destructor
);
1108 /* setup the header */
1109 header
.uuid
= hs
->table
->syntax_id
.uuid
;
1111 header
.if_version
= hs
->table
->syntax_id
.if_version
;
1112 header
.callid
= state
->irpc
->callid
;
1113 header
.callnum
= state
->opnum
;
1115 header
.status
= NT_STATUS_OK
;
1116 header
.creds
.token
= hs
->token
;
1118 /* construct the irpc packet */
1119 ndr
= ndr_push_init_ctx(state
->irpc
);
1120 if (tevent_req_nomem(ndr
, req
)) {
1121 return tevent_req_post(req
, ev
);
1124 ndr_err
= ndr_push_irpc_header(ndr
, NDR_SCALARS
|NDR_BUFFERS
, &header
);
1125 status
= ndr_map_error2ntstatus(ndr_err
);
1126 if (!NT_STATUS_IS_OK(status
)) {
1127 tevent_req_nterror(req
, status
);
1128 return tevent_req_post(req
, ev
);
1131 ndr_err
= ndr_push_bytes(ndr
, in_data
, in_length
);
1132 status
= ndr_map_error2ntstatus(ndr_err
);
1133 if (!NT_STATUS_IS_OK(status
)) {
1134 tevent_req_nterror(req
, status
);
1135 return tevent_req_post(req
, ev
);
1139 state
->in_packet
= ndr_push_blob(ndr
);
1140 status
= messaging_send(hs
->msg_ctx
, hs
->server_id
,
1141 MSG_IRPC
, &state
->in_packet
);
1142 if (!NT_STATUS_IS_OK(status
)) {
1143 tevent_req_nterror(req
, status
);
1144 return tevent_req_post(req
, ev
);
1147 if (hs
->timeout
!= IRPC_CALL_TIMEOUT_INF
) {
1148 /* set timeout-callback in case caller wants that */
1149 ok
= tevent_req_set_endtime(req
, ev
, timeval_current_ofs(hs
->timeout
, 0));
1151 return tevent_req_post(req
, ev
);
1158 static void irpc_bh_raw_call_incoming_handler(struct irpc_request
*irpc
,
1159 struct irpc_message
*m
)
1161 struct tevent_req
*req
=
1162 talloc_get_type_abort(irpc
->incoming
.private_data
,
1164 struct irpc_bh_raw_call_state
*state
=
1165 tevent_req_data(req
,
1166 struct irpc_bh_raw_call_state
);
1168 talloc_steal(state
, m
);
1170 if (!NT_STATUS_IS_OK(m
->header
.status
)) {
1171 tevent_req_nterror(req
, m
->header
.status
);
1175 state
->out_data
= data_blob_talloc(state
,
1176 m
->ndr
->data
+ m
->ndr
->offset
,
1177 m
->ndr
->data_size
- m
->ndr
->offset
);
1178 if ((m
->ndr
->data_size
- m
->ndr
->offset
) > 0 && !state
->out_data
.data
) {
1179 tevent_req_nomem(NULL
, req
);
1183 tevent_req_done(req
);
1186 static NTSTATUS
irpc_bh_raw_call_recv(struct tevent_req
*req
,
1187 TALLOC_CTX
*mem_ctx
,
1190 uint32_t *out_flags
)
1192 struct irpc_bh_raw_call_state
*state
=
1193 tevent_req_data(req
,
1194 struct irpc_bh_raw_call_state
);
1197 if (tevent_req_is_nterror(req
, &status
)) {
1198 tevent_req_received(req
);
1202 *out_data
= talloc_move(mem_ctx
, &state
->out_data
.data
);
1203 *out_length
= state
->out_data
.length
;
1205 tevent_req_received(req
);
1206 return NT_STATUS_OK
;
1209 struct irpc_bh_disconnect_state
{
1213 static struct tevent_req
*irpc_bh_disconnect_send(TALLOC_CTX
*mem_ctx
,
1214 struct tevent_context
*ev
,
1215 struct dcerpc_binding_handle
*h
)
1217 struct irpc_bh_state
*hs
= dcerpc_binding_handle_data(h
,
1218 struct irpc_bh_state
);
1219 struct tevent_req
*req
;
1220 struct irpc_bh_disconnect_state
*state
;
1223 req
= tevent_req_create(mem_ctx
, &state
,
1224 struct irpc_bh_disconnect_state
);
1229 ok
= irpc_bh_is_connected(h
);
1231 tevent_req_nterror(req
, NT_STATUS_INVALID_CONNECTION
);
1232 return tevent_req_post(req
, ev
);
1237 tevent_req_done(req
);
1238 return tevent_req_post(req
, ev
);
1241 static NTSTATUS
irpc_bh_disconnect_recv(struct tevent_req
*req
)
1245 if (tevent_req_is_nterror(req
, &status
)) {
1246 tevent_req_received(req
);
1250 tevent_req_received(req
);
1251 return NT_STATUS_OK
;
1254 static bool irpc_bh_ref_alloc(struct dcerpc_binding_handle
*h
)
1259 static const struct dcerpc_binding_handle_ops irpc_bh_ops
= {
1261 .is_connected
= irpc_bh_is_connected
,
1262 .set_timeout
= irpc_bh_set_timeout
,
1263 .raw_call_send
= irpc_bh_raw_call_send
,
1264 .raw_call_recv
= irpc_bh_raw_call_recv
,
1265 .disconnect_send
= irpc_bh_disconnect_send
,
1266 .disconnect_recv
= irpc_bh_disconnect_recv
,
1268 .ref_alloc
= irpc_bh_ref_alloc
,
1271 /* initialise a irpc binding handle */
1272 struct dcerpc_binding_handle
*irpc_binding_handle(TALLOC_CTX
*mem_ctx
,
1273 struct messaging_context
*msg_ctx
,
1274 struct server_id server_id
,
1275 const struct ndr_interface_table
*table
)
1277 struct dcerpc_binding_handle
*h
;
1278 struct irpc_bh_state
*hs
;
1280 h
= dcerpc_binding_handle_create(mem_ctx
,
1285 struct irpc_bh_state
,
1290 hs
->msg_ctx
= msg_ctx
;
1291 hs
->server_id
= server_id
;
1293 hs
->timeout
= IRPC_CALL_TIMEOUT
;
1295 dcerpc_binding_handle_set_sync_ev(h
, msg_ctx
->event
.ev
);
1300 struct dcerpc_binding_handle
*irpc_binding_handle_by_name(TALLOC_CTX
*mem_ctx
,
1301 struct messaging_context
*msg_ctx
,
1302 const char *dest_task
,
1303 const struct ndr_interface_table
*table
)
1305 struct dcerpc_binding_handle
*h
;
1306 struct server_id
*sids
;
1307 struct server_id sid
;
1309 /* find the server task */
1310 sids
= irpc_servers_byname(msg_ctx
, mem_ctx
, dest_task
);
1312 errno
= EADDRNOTAVAIL
;
1315 if (sids
[0].id
== 0) {
1317 errno
= EADDRNOTAVAIL
;
1323 h
= irpc_binding_handle(mem_ctx
, msg_ctx
,
1332 void irpc_binding_handle_add_security_token(struct dcerpc_binding_handle
*h
,
1333 struct security_token
*token
)
1335 struct irpc_bh_state
*hs
=
1336 dcerpc_binding_handle_data(h
,
1337 struct irpc_bh_state
);