s3: tests: Add new test_stream_dir_rename.sh test.
[Samba.git] / ctdb / common / tmon.c
blob04bad1f3bf48f07e6bb85787ecd5cd950d9989e0
1 /*
2 Trivial FD monitoring
4 Copyright (C) Martin Schwenke & Amitay Isaacs, DataDirect Networks 2022
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
22 #include <ctype.h>
24 #include "lib/util/blocking.h"
25 #include "lib/util/sys_rw.h"
26 #include "lib/util/tevent_unix.h"
27 #include "lib/util/util.h"
28 #include "lib/util/smb_strtox.h"
30 #include "lib/async_req/async_sock.h"
32 #include "common/tmon.h"
35 enum tmon_message_type {
36 TMON_MSG_EXIT = 1,
37 TMON_MSG_ERRNO,
38 TMON_MSG_PING,
39 TMON_MSG_ASCII,
40 TMON_MSG_CUSTOM,
43 struct tmon_pkt {
44 enum tmon_message_type type;
45 uint16_t val;
48 struct tmon_buf {
49 uint8_t data[4];
52 static void tmon_packet_push(struct tmon_pkt *pkt,
53 struct tmon_buf *buf)
55 uint16_t type_n, val_n;
57 type_n = htons(pkt->type);
58 val_n = htons(pkt->val);
59 memcpy(&buf->data[0], &type_n, 2);
60 memcpy(&buf->data[2], &val_n, 2);
63 static void tmon_packet_pull(struct tmon_buf *buf,
64 struct tmon_pkt *pkt)
66 uint16_t type_n, val_n;
68 memcpy(&type_n, &buf->data[0], 2);
69 memcpy(&val_n, &buf->data[2], 2);
71 pkt->type = ntohs(type_n);
72 pkt->val = ntohs(val_n);
75 static int tmon_packet_write(int fd, struct tmon_pkt *pkt)
77 struct tmon_buf buf;
78 ssize_t n;
80 tmon_packet_push(pkt, &buf);
82 n = sys_write(fd, &buf.data[0], sizeof(buf.data));
83 if (n == -1) {
84 return errno;
86 return 0;
89 bool tmon_set_exit(struct tmon_pkt *pkt)
91 *pkt = (struct tmon_pkt) {
92 .type = TMON_MSG_EXIT,
95 return true;
98 bool tmon_set_errno(struct tmon_pkt *pkt, int err)
100 if (err <= 0 || err > UINT16_MAX) {
101 return false;
104 *pkt = (struct tmon_pkt) {
105 .type = TMON_MSG_ERRNO,
106 .val = (uint16_t)err,
109 return true;
112 bool tmon_set_ping(struct tmon_pkt *pkt)
114 *pkt = (struct tmon_pkt) {
115 .type = TMON_MSG_PING,
118 return true;
121 bool tmon_set_ascii(struct tmon_pkt *pkt, char c)
123 if (!isascii(c)) {
124 return false;
127 *pkt = (struct tmon_pkt) {
128 .type = TMON_MSG_ASCII,
129 .val = (uint16_t)c,
132 return true;
135 bool tmon_set_custom(struct tmon_pkt *pkt, uint16_t val)
137 *pkt = (struct tmon_pkt) {
138 .type = TMON_MSG_CUSTOM,
139 .val = val,
142 return true;
145 static bool tmon_parse_exit(struct tmon_pkt *pkt)
147 if (pkt->type != TMON_MSG_EXIT) {
148 return false;
150 if (pkt->val != 0) {
151 return false;
154 return true;
157 static bool tmon_parse_errno(struct tmon_pkt *pkt, int *err)
159 if (pkt->type != TMON_MSG_ERRNO) {
160 return false;
162 *err= (int)pkt->val;
164 return true;
167 bool tmon_parse_ping(struct tmon_pkt *pkt)
169 if (pkt->type != TMON_MSG_PING) {
170 return false;
172 if (pkt->val != 0) {
173 return false;
176 return true;
179 bool tmon_parse_ascii(struct tmon_pkt *pkt, char *c)
181 if (pkt->type != TMON_MSG_ASCII) {
182 return false;
184 if (!isascii((int)pkt->val)) {
185 return false;
187 *c = (char)pkt->val;
189 return true;
192 bool tmon_parse_custom(struct tmon_pkt *pkt, uint16_t *val)
194 if (pkt->type != TMON_MSG_CUSTOM) {
195 return false;
197 *val = pkt->val;
199 return true;
202 struct tmon_state {
203 int fd;
204 int direction;
205 struct tevent_context *ev;
206 bool monitor_close;
207 unsigned long write_interval;
208 unsigned long read_timeout;
209 struct tmon_actions actions;
210 struct tevent_timer *timer;
211 void *private_data;
214 static void tmon_readable(struct tevent_req *subreq);
215 static bool tmon_set_timeout(struct tevent_req *req,
216 struct tevent_context *ev);
217 static void tmon_timedout(struct tevent_context *ev,
218 struct tevent_timer *te,
219 struct timeval now,
220 void *private_data);
221 static void tmon_write_loop(struct tevent_req *subreq);
223 struct tevent_req *tmon_send(TALLOC_CTX *mem_ctx,
224 struct tevent_context *ev,
225 int fd,
226 int direction,
227 unsigned long read_timeout,
228 unsigned long write_interval,
229 struct tmon_actions *actions,
230 void *private_data)
232 struct tevent_req *req, *subreq;
233 struct tmon_state *state;
234 bool status;
236 req = tevent_req_create(mem_ctx, &state, struct tmon_state);
237 if (req == NULL) {
238 return NULL;
241 if (actions != NULL) {
242 /* If FD isn't readable then read actions are invalid */
243 if (!(direction & TMON_FD_READ) &&
244 (actions->timeout_callback != NULL ||
245 actions->read_callback != NULL ||
246 read_timeout != 0)) {
247 tevent_req_error(req, EINVAL);
248 return tevent_req_post(req, ev);
250 /* If FD isn't writeable then write actions are invalid */
251 if (!(direction & TMON_FD_WRITE) &&
252 (actions->write_callback != NULL ||
253 write_interval != 0)) {
254 tevent_req_error(req, EINVAL);
255 return tevent_req_post(req, ev);
257 /* Can't specify write interval without a callback */
258 if (state->write_interval != 0 &&
259 state->actions.write_callback == NULL) {
260 tevent_req_error(req, EINVAL);
261 return tevent_req_post(req, ev);
265 state->fd = fd;
266 state->direction = direction;
267 state->ev = ev;
268 state->write_interval = write_interval;
269 state->read_timeout = read_timeout;
270 state->private_data = private_data;
272 if (actions != NULL) {
273 state->actions = *actions;
276 status = set_close_on_exec(fd);
277 if (!status) {
278 tevent_req_error(req, errno);
279 return tevent_req_post(req, ev);
282 if (direction & TMON_FD_READ) {
283 subreq = wait_for_read_send(state, ev, fd, true);
284 if (tevent_req_nomem(subreq, req)) {
285 return tevent_req_post(req, ev);
287 tevent_req_set_callback(subreq, tmon_readable, req);
290 if (state->read_timeout != 0) {
291 status = tmon_set_timeout(req, state->ev);
292 if (!status) {
293 tevent_req_error(req, ENOMEM);
294 return tevent_req_post(req, ev);
298 if (state->write_interval != 0) {
299 subreq = tevent_wakeup_send(
300 state,
301 state->ev,
302 tevent_timeval_current_ofs(state->write_interval, 0));
303 if (tevent_req_nomem(subreq, req)) {
304 return tevent_req_post(req, state->ev);
306 tevent_req_set_callback(subreq, tmon_write_loop, req);
309 return req;
312 static void tmon_readable(struct tevent_req *subreq)
314 struct tevent_req *req = tevent_req_callback_data(
315 subreq, struct tevent_req);
316 struct tmon_state *state = tevent_req_data( req, struct tmon_state);
317 struct tmon_buf buf;
318 struct tmon_pkt pkt;
319 ssize_t nread;
320 bool status;
321 int err;
322 int ret;
324 status = wait_for_read_recv(subreq, &ret);
325 TALLOC_FREE(subreq);
326 if (!status) {
327 if (ret == EPIPE && state->actions.close_callback != NULL) {
328 ret = state->actions.close_callback(state->private_data);
329 if (ret == TMON_STATUS_EXIT) {
330 ret = 0;
333 if (ret == 0) {
334 tevent_req_done(req);
335 } else {
336 tevent_req_error(req, ret);
338 return;
341 nread = sys_read(state->fd, buf.data, sizeof(buf.data));
342 if (nread == -1) {
343 tevent_req_error(req, errno);
344 return;
346 if (nread == 0) {
347 /* Can't happen, treat like EPIPE, above */
348 tevent_req_error(req, EPIPE);
349 return;
351 if (nread != sizeof(buf.data)) {
352 tevent_req_error(req, EPROTO);
353 return;
356 tmon_packet_pull(&buf, &pkt);
358 switch (pkt.type) {
359 case TMON_MSG_EXIT:
360 status = tmon_parse_exit(&pkt);
361 if (!status) {
362 tevent_req_error(req, EPROTO);
363 return;
365 tevent_req_done(req);
366 return;
367 case TMON_MSG_ERRNO:
368 status = tmon_parse_errno(&pkt, &err);
369 if (!status) {
370 err = EPROTO;
372 tevent_req_error(req, err);
373 return;
374 default:
375 break;
378 if (state->actions.read_callback == NULL) {
379 /* Shouldn't happen, other end should not write */
380 tevent_req_error(req, EIO);
381 return;
383 ret = state->actions.read_callback(state->private_data, &pkt);
384 if (ret == TMON_STATUS_EXIT) {
385 tevent_req_done(req);
386 return;
388 if (ret != 0) {
389 tevent_req_error(req, ret);
390 return;
393 subreq = wait_for_read_send(state, state->ev, state->fd, true);
394 if (tevent_req_nomem(subreq, req)) {
395 return;
397 tevent_req_set_callback(subreq, tmon_readable, req);
399 /* Reset read timeout */
400 if (state->read_timeout != 0) {
401 status = tmon_set_timeout(req, state->ev);
402 if (!status) {
403 tevent_req_error(req, ENOMEM);
404 return;
409 static bool tmon_set_timeout(struct tevent_req *req,
410 struct tevent_context *ev)
412 struct tmon_state *state = tevent_req_data(
413 req, struct tmon_state);
414 struct timeval endtime =
415 tevent_timeval_current_ofs(state->read_timeout, 0);
417 TALLOC_FREE(state->timer);
419 state->timer = tevent_add_timer(ev, req, endtime, tmon_timedout, req);
420 if (tevent_req_nomem(state->timer, req)) {
421 return false;
424 return true;
427 static void tmon_timedout(struct tevent_context *ev,
428 struct tevent_timer *te,
429 struct timeval now,
430 void *private_data)
432 struct tevent_req *req = talloc_get_type_abort(
433 private_data, struct tevent_req);
434 struct tmon_state *state = tevent_req_data(req, struct tmon_state);
435 int ret;
437 TALLOC_FREE(state->timer);
439 if (state->actions.timeout_callback != NULL) {
440 ret = state->actions.timeout_callback(state->private_data);
441 if (ret == TMON_STATUS_EXIT) {
442 ret = 0;
444 } else {
445 ret = ETIMEDOUT;
448 if (ret == 0) {
449 tevent_req_done(req);
450 } else {
451 tevent_req_error(req, ret);
455 static void tmon_write_loop(struct tevent_req *subreq)
457 struct tevent_req *req = tevent_req_callback_data(
458 subreq, struct tevent_req);
459 struct tmon_state *state = tevent_req_data(
460 req, struct tmon_state);
461 struct tmon_pkt pkt;
462 int ret;
463 bool status;
465 status = tevent_wakeup_recv(subreq);
466 TALLOC_FREE(subreq);
467 if (!status) {
468 /* Ignore error */
471 ret = state->actions.write_callback(state->private_data, &pkt);
472 if (ret == TMON_STATUS_EXIT) {
473 tevent_req_done(req);
474 return;
476 if (ret == TMON_STATUS_SKIP) {
477 goto done;
479 if (ret != 0) {
480 tevent_req_error(req, ret);
481 return;
484 status = tmon_write(req, &pkt);
485 if (!status) {
486 return;
489 done:
490 subreq = tevent_wakeup_send(
491 state,
492 state->ev,
493 tevent_timeval_current_ofs(state->write_interval, 0));
494 if (tevent_req_nomem(subreq, req)) {
495 return;
497 tevent_req_set_callback(subreq, tmon_write_loop, req);
500 bool tmon_write(struct tevent_req *req, struct tmon_pkt *pkt)
502 struct tmon_state *state = tevent_req_data(
503 req, struct tmon_state);
504 int ret;
506 if (state->fd == -1) {
507 return false;
510 if (!(state->direction & TMON_FD_WRITE)) {
511 tevent_req_error(req, EINVAL);
512 return false;
515 ret = tmon_packet_write(state->fd, pkt);
516 if (ret != 0) {
517 if (ret == EPIPE && state->actions.close_callback != NULL) {
518 ret = state->actions.close_callback(state->private_data);
519 if (ret == TMON_STATUS_EXIT) {
520 ret = 0;
524 if (ret == 0) {
525 tevent_req_done(req);
526 } else {
527 tevent_req_error(req, ret);
529 state->fd = -1;
530 return false;
533 return true;
536 bool tmon_recv(struct tevent_req *req, int *perr)
538 if (tevent_req_is_unix_error(req, perr)) {
539 return false;
542 return true;
545 static int ping_writer(void *private_data, struct tmon_pkt *pkt)
547 tmon_set_ping(pkt);
549 return 0;
552 static int ping_reader(void *private_data, struct tmon_pkt *pkt)
554 bool status;
556 /* Only expect pings */
557 status = tmon_parse_ping(pkt);
558 if (!status) {
559 return EPROTO;
562 return 0;
565 struct tevent_req *tmon_ping_send(TALLOC_CTX *mem_ctx,
566 struct tevent_context *ev,
567 int fd,
568 int direction,
569 unsigned long timeout,
570 unsigned long interval)
572 struct tevent_req *req;
573 struct tmon_actions actions = {
574 .write_callback = NULL,
577 if ((direction & TMON_FD_WRITE) && interval != 0) {
578 actions.write_callback = ping_writer;
580 if ((direction & TMON_FD_READ) && timeout != 0) {
581 actions.read_callback = ping_reader;
584 req = tmon_send(mem_ctx,
587 direction,
588 timeout,
589 interval,
590 &actions,
591 NULL);
592 return req;
595 bool tmon_ping_recv(struct tevent_req *req, int *perr)
597 bool status;
599 status = tmon_recv(req, perr);
601 return status;