messaging3: I don't see 2 versions running concurrently...
[Samba.git] / source3 / lib / messages_dgm.c
blobd626b74715c25e1503baf569ab81abe290f05bcd
1 /*
2 * Unix SMB/CIFS implementation.
3 * Samba internal messaging functions
4 * Copyright (C) 2013 by Volker Lendecke
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "lib/util/data_blob.h"
22 #include "lib/util/debug.h"
23 #include "lib/unix_msg/unix_msg.h"
24 #include "system/filesys.h"
25 #include "messages.h"
26 #include "lib/param/param.h"
27 #include "poll_funcs/poll_funcs_tevent.h"
28 #include "unix_msg/unix_msg.h"
30 struct messaging_dgm_context {
31 struct server_id pid;
32 struct poll_funcs *msg_callbacks;
33 void *tevent_handle;
34 struct unix_msg_ctx *dgm_ctx;
35 char *cache_dir;
36 int lockfile_fd;
38 void (*recv_cb)(int msg_type,
39 struct server_id src, struct server_id dst,
40 const uint8_t *msg, size_t msg_len,
41 void *private_data);
42 void *recv_cb_private_data;
45 struct messaging_dgm_hdr {
46 int msg_type;
47 struct server_id dst;
48 struct server_id src;
51 static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
52 uint8_t *msg, size_t msg_len,
53 void *private_data);
55 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
57 static int messaging_dgm_lockfile_create(TALLOC_CTX *tmp_ctx,
58 const char *cache_dir,
59 uid_t dir_owner, pid_t pid,
60 int *plockfile_fd, uint64_t unique)
62 fstring buf;
63 char *dir;
64 char *lockfile_name;
65 int lockfile_fd;
66 struct flock lck = {};
67 int unique_len, ret;
68 ssize_t written;
69 bool ok;
71 dir = talloc_asprintf(tmp_ctx, "%s/lck", cache_dir);
72 if (dir == NULL) {
73 return ENOMEM;
76 ok = directory_create_or_exist_strict(dir, dir_owner, 0755);
77 if (!ok) {
78 ret = errno;
79 DEBUG(1, ("%s: Could not create lock directory: %s\n",
80 __func__, strerror(ret)));
81 TALLOC_FREE(dir);
82 return ret;
85 lockfile_name = talloc_asprintf(tmp_ctx, "%s/%u", dir,
86 (unsigned)pid);
87 TALLOC_FREE(dir);
88 if (lockfile_name == NULL) {
89 DEBUG(1, ("%s: talloc_asprintf failed\n", __func__));
90 return ENOMEM;
93 /* no O_EXCL, existence check is via the fcntl lock */
95 lockfile_fd = open(lockfile_name, O_NONBLOCK|O_CREAT|O_WRONLY, 0644);
96 if (lockfile_fd == -1) {
97 ret = errno;
98 DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
99 goto fail_free;
102 lck.l_type = F_WRLCK;
103 lck.l_whence = SEEK_SET;
104 lck.l_start = 0;
105 lck.l_len = 0;
107 ret = fcntl(lockfile_fd, F_SETLK, &lck);
108 if (ret == -1) {
109 ret = errno;
110 DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
111 goto fail_close;
114 unique_len = snprintf(buf, sizeof(buf), "%"PRIu64, unique);
116 /* shorten a potentially preexisting file */
118 ret = ftruncate(lockfile_fd, unique_len);
119 if (ret == -1) {
120 ret = errno;
121 DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
122 strerror(ret)));
123 goto fail_unlink;
126 written = write(lockfile_fd, buf, unique_len);
127 if (written != unique_len) {
128 ret = errno;
129 DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
130 goto fail_unlink;
133 TALLOC_FREE(lockfile_name);
134 *plockfile_fd = lockfile_fd;
135 return 0;
137 fail_unlink:
138 unlink(lockfile_name);
139 fail_close:
140 close(lockfile_fd);
141 fail_free:
142 TALLOC_FREE(lockfile_name);
143 return ret;
146 static int messaging_dgm_lockfile_remove(TALLOC_CTX *tmp_ctx,
147 const char *cache_dir, pid_t pid)
149 char *lockfile_name;
150 int ret;
152 lockfile_name = talloc_asprintf(
153 tmp_ctx, "%s/lck/%u", cache_dir, (unsigned)pid);
154 if (lockfile_name == NULL) {
155 return ENOMEM;
158 ret = unlink(lockfile_name);
159 if (ret == -1) {
160 ret = errno;
161 DEBUG(10, ("%s: unlink(%s) failed: %s\n", __func__,
162 lockfile_name, strerror(ret)));
165 TALLOC_FREE(lockfile_name);
166 return ret;
169 int messaging_dgm_init(TALLOC_CTX *mem_ctx,
170 struct tevent_context *ev,
171 struct server_id pid,
172 const char *cache_dir,
173 uid_t dir_owner,
174 void (*recv_cb)(int msg_type,
175 struct server_id src,
176 struct server_id dst,
177 const uint8_t *msg,
178 size_t msg_len,
179 void *private_data),
180 void *recv_cb_private_data,
181 struct messaging_dgm_context **pctx)
183 struct messaging_dgm_context *ctx;
184 int ret;
185 bool ok;
186 char *socket_dir;
187 struct sockaddr_un socket_address;
188 size_t sockname_len;
189 uint64_t cookie;
191 ctx = talloc_zero(mem_ctx, struct messaging_dgm_context);
192 if (ctx == NULL) {
193 goto fail_nomem;
195 ctx->pid = pid;
196 ctx->recv_cb = recv_cb;
197 ctx->recv_cb_private_data = recv_cb_private_data;
199 ctx->cache_dir = talloc_strdup(ctx, cache_dir);
200 if (ctx->cache_dir == NULL) {
201 goto fail_nomem;
203 socket_dir = talloc_asprintf(ctx, "%s/msg", cache_dir);
204 if (socket_dir == NULL) {
205 goto fail_nomem;
208 socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
209 sockname_len = snprintf(socket_address.sun_path,
210 sizeof(socket_address.sun_path),
211 "%s/%u", socket_dir, (unsigned)pid.pid);
212 if (sockname_len >= sizeof(socket_address.sun_path)) {
213 TALLOC_FREE(ctx);
214 return ENAMETOOLONG;
217 ret = messaging_dgm_lockfile_create(ctx, cache_dir, dir_owner, pid.pid,
218 &ctx->lockfile_fd, pid.unique_id);
219 if (ret != 0) {
220 DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
221 __func__, strerror(ret)));
222 TALLOC_FREE(ctx);
223 return ret;
226 ctx->msg_callbacks = poll_funcs_init_tevent(ctx);
227 if (ctx->msg_callbacks == NULL) {
228 goto fail_nomem;
231 ctx->tevent_handle = poll_funcs_tevent_register(
232 ctx, ctx->msg_callbacks, ev);
233 if (ctx->tevent_handle == NULL) {
234 goto fail_nomem;
237 ok = directory_create_or_exist_strict(socket_dir, dir_owner, 0700);
238 if (!ok) {
239 DEBUG(1, ("Could not create socket directory\n"));
240 TALLOC_FREE(ctx);
241 return EACCES;
243 TALLOC_FREE(socket_dir);
245 unlink(socket_address.sun_path);
247 generate_random_buffer((uint8_t *)&cookie, sizeof(cookie));
249 ret = unix_msg_init(&socket_address, ctx->msg_callbacks, 1024, cookie,
250 messaging_dgm_recv, ctx, &ctx->dgm_ctx);
251 if (ret != 0) {
252 DEBUG(1, ("unix_msg_init failed: %s\n", strerror(ret)));
253 TALLOC_FREE(ctx);
254 return ret;
256 talloc_set_destructor(ctx, messaging_dgm_context_destructor);
258 *pctx = ctx;
259 return 0;
261 fail_nomem:
262 TALLOC_FREE(ctx);
263 return ENOMEM;
266 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
268 struct server_id pid = c->pid;
271 * First delete the socket to avoid races. The lockfile is the
272 * indicator that we're still around.
274 unix_msg_free(c->dgm_ctx);
276 if (getpid() == pid.pid) {
277 (void)messaging_dgm_lockfile_remove(c, c->cache_dir, pid.pid);
279 close(c->lockfile_fd);
280 return 0;
283 int messaging_dgm_send(struct messaging_dgm_context *ctx,
284 struct server_id src, struct server_id pid,
285 int msg_type, const struct iovec *iov, int iovlen)
287 struct messaging_dgm_hdr hdr;
288 struct iovec iov2[iovlen + 1];
289 struct server_id_buf idbuf;
290 struct sockaddr_un dst;
291 ssize_t dst_pathlen;
292 int ret;
294 dst = (struct sockaddr_un) { .sun_family = AF_UNIX };
296 dst_pathlen = snprintf(dst.sun_path, sizeof(dst.sun_path),
297 "%s/msg/%u", ctx->cache_dir, (unsigned)pid.pid);
298 if (dst_pathlen >= sizeof(dst.sun_path)) {
299 return ENAMETOOLONG;
302 hdr.msg_type = msg_type & MSG_TYPE_MASK;
303 hdr.dst = pid;
304 hdr.src = src;
306 DEBUG(10, ("%s: Sending message 0x%x to %s\n", __func__,
307 (unsigned)hdr.msg_type,
308 server_id_str_buf(pid, &idbuf)));
310 iov2[0].iov_base = &hdr;
311 iov2[0].iov_len = sizeof(hdr);
312 memcpy(iov2+1, iov, iovlen*sizeof(struct iovec));
314 ret = unix_msg_send(ctx->dgm_ctx, &dst, iov2, iovlen + 1);
316 return ret;
319 static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
320 uint8_t *msg, size_t msg_len,
321 void *private_data)
323 struct messaging_dgm_context *dgm_ctx = talloc_get_type_abort(
324 private_data, struct messaging_dgm_context);
325 struct messaging_dgm_hdr *hdr;
326 struct server_id_buf idbuf;
328 if (msg_len < sizeof(*hdr)) {
329 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
330 return;
334 * unix_msg guarantees alignment, so we can cast here
336 hdr = (struct messaging_dgm_hdr *)msg;
338 DEBUG(10, ("%s: Received message 0x%x len %u from %s\n", __func__,
339 (unsigned)hdr->msg_type, (unsigned)(msg_len - sizeof(*hdr)),
340 server_id_str_buf(hdr->src, &idbuf)));
342 dgm_ctx->recv_cb(hdr->msg_type, hdr->src, hdr->dst,
343 msg + sizeof(*hdr), msg_len - sizeof(*hdr),
344 dgm_ctx->recv_cb_private_data);
347 int messaging_dgm_cleanup(struct messaging_dgm_context *ctx, pid_t pid)
349 char *lockfile_name, *socket_name;
350 int fd, ret;
351 struct flock lck = {};
353 lockfile_name = talloc_asprintf(talloc_tos(), "%s/lck/%u",
354 ctx->cache_dir, (unsigned)pid);
355 if (lockfile_name == NULL) {
356 return ENOMEM;
358 socket_name = talloc_asprintf(lockfile_name, "%s/msg/%u",
359 ctx->cache_dir, (unsigned)pid);
360 if (socket_name == NULL) {
361 TALLOC_FREE(lockfile_name);
362 return ENOMEM;
365 fd = open(lockfile_name, O_NONBLOCK|O_WRONLY, 0);
366 if (fd == -1) {
367 ret = errno;
368 DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
369 lockfile_name, strerror(ret)));
370 TALLOC_FREE(lockfile_name);
371 return ret;
374 lck.l_type = F_WRLCK;
375 lck.l_whence = SEEK_SET;
376 lck.l_start = 0;
377 lck.l_len = 0;
379 ret = fcntl(fd, F_SETLK, &lck);
380 if (ret != 0) {
381 ret = errno;
382 DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
383 strerror(ret)));
384 TALLOC_FREE(lockfile_name);
385 close(fd);
386 return ret;
389 (void)unlink(socket_name);
390 (void)unlink(lockfile_name);
391 (void)close(fd);
393 TALLOC_FREE(lockfile_name);
394 return 0;
397 int messaging_dgm_wipe(struct messaging_dgm_context *ctx)
399 char *msgdir_name;
400 DIR *msgdir;
401 struct dirent *dp;
402 pid_t our_pid = getpid();
403 int ret;
406 * We scan the socket directory and not the lock directory. Otherwise
407 * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
408 * and fcntl(SETLK).
411 msgdir_name = talloc_asprintf(talloc_tos(), "%s/msg", ctx->cache_dir);
412 if (msgdir_name == NULL) {
413 return ENOMEM;
416 msgdir = opendir(msgdir_name);
417 if (msgdir == NULL) {
418 ret = errno;
419 TALLOC_FREE(msgdir_name);
420 return ret;
422 TALLOC_FREE(msgdir_name);
424 while ((dp = readdir(msgdir)) != NULL) {
425 unsigned long pid;
427 pid = strtoul(dp->d_name, NULL, 10);
428 if (pid == 0) {
430 * . and .. and other malformed entries
432 continue;
434 if (pid == our_pid) {
436 * fcntl(F_GETLK) will succeed for ourselves, we hold
437 * that lock ourselves.
439 continue;
442 ret = messaging_dgm_cleanup(ctx, pid);
443 DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
444 pid, ret ? strerror(ret) : "ok"));
446 closedir(msgdir);
448 return 0;
451 void *messaging_dgm_register_tevent_context(TALLOC_CTX *mem_ctx,
452 struct messaging_dgm_context *ctx,
453 struct tevent_context *ev)
455 return poll_funcs_tevent_register(mem_ctx, ctx->msg_callbacks, ev);