s3-prefork: provide means to expand the pool size
[Samba/gebeck_regimport.git] / source3 / lib / server_prefork.c
blob000539da0f1600a91fa46653446782b2880c30db
1 /*
2 Unix SMB/CIFS implementation.
3 Common server globals
5 Copyright (C) Simo Sorce <idra@samba.org> 2011
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "includes.h"
22 #include "system/time.h"
23 #include "system/shmem.h"
24 #include "system/filesys.h"
25 #include "server_prefork.h"
26 #include "../lib/util/util.h"
27 #include "../lib/util/tevent_unix.h"
29 struct prefork_pool {
31 int listen_fd;
32 int lock_fd;
34 prefork_main_fn_t *main_fn;
35 void *private_data;
37 int pool_size;
38 struct pf_worker_data *pool;
40 int allowed_clients;
43 int prefork_pool_destructor(struct prefork_pool *pfp)
45 munmap(pfp->pool, pfp->pool_size * sizeof(struct pf_worker_data));
46 return 0;
49 bool prefork_create_pool(struct tevent_context *ev_ctx,
50 TALLOC_CTX *mem_ctx, int listen_fd,
51 int min_children, int max_children,
52 prefork_main_fn_t *main_fn, void *private_data,
53 struct prefork_pool **pf_pool)
55 struct prefork_pool *pfp;
56 pid_t pid;
57 time_t now = time(NULL);
58 size_t data_size;
59 int ret;
60 int i;
62 pfp = talloc(mem_ctx, struct prefork_pool);
63 if (!pfp) {
64 DEBUG(1, ("Out of memory!\n"));
65 return false;
67 pfp->listen_fd = listen_fd;
68 pfp->main_fn = main_fn;
69 pfp->private_data = private_data;
71 pfp->lock_fd = create_unlink_tmp(NULL);
72 if (pfp->lock_fd == -1) {
73 DEBUG(1, ("Failed to create prefork lock fd!\n"));
74 talloc_free(pfp);
75 return false;
78 pfp->pool_size = max_children;
79 data_size = sizeof(struct pf_worker_data) * max_children;
81 pfp->pool = mmap(NULL, data_size, PROT_READ|PROT_WRITE,
82 MAP_SHARED|MAP_ANONYMOUS, -1, 0);
83 if (pfp->pool == MAP_FAILED) {
84 DEBUG(1, ("Failed to mmap memory for prefork pool!\n"));
85 talloc_free(pfp);
86 return false;
88 talloc_set_destructor(pfp, prefork_pool_destructor);
90 for (i = 0; i < min_children; i++) {
92 pfp->pool[i].allowed_clients = 1;
93 pfp->pool[i].started = now;
95 pid = sys_fork();
96 switch (pid) {
97 case -1:
98 DEBUG(1, ("Failed to prefork child n. %d !\n", i));
99 break;
101 case 0: /* THE CHILD */
103 pfp->pool[i].status = PF_WORKER_IDLE;
104 ret = pfp->main_fn(ev_ctx, &pfp->pool[i],
105 pfp->listen_fd, pfp->lock_fd,
106 pfp->private_data);
107 exit(ret);
109 default: /* THE PARENT */
110 pfp->pool[i].pid = pid;
111 break;
115 *pf_pool = pfp;
116 return true;
119 /* Provide the new max children number in new_max
120 * (must be larger than current max).
121 * Returns: 0 if all fine
122 * ENOSPC if mremap fails to expand
123 * EINVAL if new_max is invalid
125 int prefork_expand_pool(struct prefork_pool *pfp, int new_max)
127 struct pf_worker_data *pool;
128 size_t old_size;
129 size_t new_size;
131 if (new_max <= pfp->pool_size) {
132 return EINVAL;
135 old_size = sizeof(struct pf_worker_data) * pfp->pool_size;
136 new_size = sizeof(struct pf_worker_data) * new_max;
138 pool = mremap(pfp->pool, old_size, new_size, 0);
139 if (pool == MAP_FAILED) {
140 DEBUG(3, ("Failed to mremap memory for prefork pool!\n"));
141 return ENOSPC;
144 memset(&pool[pfp->pool_size], 0, new_size - old_size);
146 pfp->pool_size = new_max;
148 return 0;
151 int prefork_add_children(struct tevent_context *ev_ctx,
152 struct prefork_pool *pfp,
153 int num_children)
155 pid_t pid;
156 time_t now = time(NULL);
157 int ret;
158 int i, j;
160 for (i = 0, j = 0; i < pfp->pool_size && j < num_children; i++) {
162 if (pfp->pool[i].status != PF_WORKER_NONE) {
163 continue;
166 pfp->pool[i].allowed_clients = 1;
167 pfp->pool[i].started = now;
169 pid = sys_fork();
170 switch (pid) {
171 case -1:
172 DEBUG(1, ("Failed to prefork child n. %d !\n", j));
173 break;
175 case 0: /* THE CHILD */
177 pfp->pool[i].status = PF_WORKER_IDLE;
178 ret = pfp->main_fn(ev_ctx, &pfp->pool[i],
179 pfp->listen_fd, pfp->lock_fd,
180 pfp->private_data);
182 pfp->pool[i].status = PF_WORKER_EXITING;
183 exit(ret);
185 default: /* THE PARENT */
186 pfp->pool[i].pid = pid;
187 j++;
188 break;
192 DEBUG(5, ("Added %d children!\n", j));
194 return j;
197 struct prefork_oldest {
198 int num;
199 time_t started;
202 /* sort in inverse order */
203 static int prefork_sort_oldest(const void *ap, const void *bp)
205 struct prefork_oldest *a = (struct prefork_oldest *)ap;
206 struct prefork_oldest *b = (struct prefork_oldest *)bp;
208 if (a->started == b->started) {
209 return 0;
211 if (a->started < b->started) {
212 return 1;
214 return -1;
217 int prefork_retire_children(struct prefork_pool *pfp,
218 int num_children, time_t age_limit)
220 time_t now = time(NULL);
221 struct prefork_oldest *oldest;
222 int i, j;
224 oldest = talloc_array(pfp, struct prefork_oldest, pfp->pool_size);
225 if (!oldest) {
226 return -1;
229 for (i = 0; i < pfp->pool_size; i++) {
230 oldest[i].num = i;
231 if (pfp->pool[i].status == PF_WORKER_IDLE) {
232 oldest[i].started = pfp->pool[i].started;
233 } else {
234 oldest[i].started = now;
238 qsort(oldest, pfp->pool_size,
239 sizeof(struct prefork_oldest),
240 prefork_sort_oldest);
242 for (i = 0, j = 0; i < pfp->pool_size && j < num_children; i++) {
243 if (pfp->pool[i].status == PF_WORKER_IDLE &&
244 pfp->pool[i].started <= age_limit) {
245 /* tell the child it's time to give up */
246 DEBUG(5, ("Retiring pid %d!\n", pfp->pool[i].pid));
247 pfp->pool[i].cmds = PF_SRV_MSG_EXIT;
248 kill(pfp->pool[i].pid, SIGHUP);
249 j++;
253 return j;
256 int prefork_count_active_children(struct prefork_pool *pfp, int *total)
258 int i, a, t;
260 a = 0;
261 t = 0;
262 for (i = 0; i < pfp->pool_size; i++) {
263 if (pfp->pool[i].status == PF_WORKER_NONE) {
264 continue;
267 t++;
269 if (pfp->pool[i].num_clients == 0) {
270 continue;
273 a++;
276 *total = t;
277 return a;
280 /* to be used to finally mark a children as dead, so that it's slot can
281 * be reused */
282 bool prefork_mark_pid_dead(struct prefork_pool *pfp, pid_t pid)
284 int i;
286 for (i = 0; i < pfp->pool_size; i++) {
287 if (pfp->pool[i].pid == pid) {
288 if (pfp->pool[i].status != PF_WORKER_EXITING) {
289 DEBUG(2, ("pid %d terminated abnormally!\n",
290 (int)pid));
293 /* reset all fields,
294 * this makes status = PF_WORK_NONE */
295 memset(&pfp->pool[i], 0,
296 sizeof(struct pf_worker_data));
298 return true;
302 return false;
305 void prefork_increase_allowed_clients(struct prefork_pool *pfp, int max)
307 int i;
309 for (i = 0; i < pfp->pool_size; i++) {
310 if (pfp->pool[i].status == PF_WORKER_NONE) {
311 continue;
314 if (pfp->pool[i].allowed_clients < max) {
315 pfp->pool[i].allowed_clients++;
320 void prefork_reset_allowed_clients(struct prefork_pool *pfp)
322 int i;
324 for (i = 0; i < pfp->pool_size; i++) {
325 pfp->pool[i].allowed_clients = 1;
329 /* ==== Functions used by children ==== */
331 static SIG_ATOMIC_T pf_alarm;
333 static void pf_alarm_cb(int signum)
335 pf_alarm = 1;
340 * Parameters:
341 * pf - the worker shared data structure
342 * lock_fd - the file descriptor used for locking
343 * timeout - expressed in seconds:
344 * -1 never timeouts,
345 * 0 timeouts immediately
346 * N seconds before timing out
348 * Returns values:
349 * negative errno on fatal error
350 * 0 on success to acquire lock
351 * -1 on timeout/lock held by other
352 * -2 on server msg to terminate
353 * ERRNO on other errors
356 static int prefork_grab_lock(struct pf_worker_data *pf,
357 int lock_fd, int timeout)
359 struct flock lock;
360 int op;
361 int ret;
363 if (pf->cmds == PF_SRV_MSG_EXIT) {
364 return -2;
367 pf_alarm = 0;
369 if (timeout > 0) {
370 CatchSignal(SIGALRM, pf_alarm_cb);
371 alarm(timeout);
374 if (timeout == 0) {
375 op = F_SETLK;
376 } else {
377 op = F_SETLKW;
380 ret = 0;
381 do {
382 ZERO_STRUCT(lock);
383 lock.l_type = F_WRLCK;
384 lock.l_whence = SEEK_SET;
386 ret = fcntl(lock_fd, op, &lock);
387 if (ret == 0) break;
389 ret = errno;
391 if (pf->cmds == PF_SRV_MSG_EXIT) {
392 ret = -2;
393 goto done;
396 switch (ret) {
397 case EINTR:
398 break;
400 case EACCES:
401 case EAGAIN:
402 /* lock held by other proc */
403 ret = -1;
404 goto done;
405 default:
406 goto done;
409 if (pf_alarm == 1) {
410 /* timed out */
411 ret = -1;
412 goto done;
414 } while (timeout != 0);
416 if (ret != 0) {
417 /* We have the Lock */
418 pf->status = PF_WORKER_ACCEPTING;
421 done:
422 if (timeout > 0) {
423 alarm(0);
424 CatchSignal(SIGALRM, SIG_IGN);
427 if (ret > 0) {
428 DEBUG(1, ("Failed to get lock (%d, %s)!\n",
429 ret, strerror(ret)));
431 return ret;
435 * Parameters:
436 * pf - the worker shared data structure
437 * lock_fd - the file descriptor used for locking
438 * timeout - expressed in seconds:
439 * -1 never timeouts,
440 * 0 timeouts immediately
441 * N seconds before timing out
443 * Returns values:
444 * negative errno on fatal error
445 * 0 on success to release lock
446 * -1 on timeout
447 * ERRNO on error
450 static int prefork_release_lock(struct pf_worker_data *pf,
451 int lock_fd, int timeout)
453 struct flock lock;
454 int op;
455 int ret;
457 pf_alarm = 0;
459 if (timeout > 0) {
460 CatchSignal(SIGALRM, pf_alarm_cb);
461 alarm(timeout);
464 if (timeout == 0) {
465 op = F_SETLK;
466 } else {
467 op = F_SETLKW;
470 do {
471 ZERO_STRUCT(lock);
472 lock.l_type = F_UNLCK;
473 lock.l_whence = SEEK_SET;
475 ret = fcntl(lock_fd, op, &lock);
476 if (ret == 0) break;
478 ret = errno;
480 if (ret != EINTR) {
481 goto done;
484 if (pf_alarm == 1) {
485 /* timed out */
486 ret = -1;
487 goto done;
489 } while (timeout != 0);
491 done:
492 if (timeout > 0) {
493 alarm(0);
494 CatchSignal(SIGALRM, SIG_IGN);
497 if (ret > 0) {
498 DEBUG(1, ("Failed to release lock (%d, %s)!\n",
499 ret, strerror(ret)));
501 return ret;
504 /* returns:
505 * negative errno on error
506 * -2 if server commands to terminate
507 * 0 if all ok
508 * ERRNO on other errors
511 int prefork_wait_for_client(struct pf_worker_data *pf,
512 int lock_fd, int listen_fd,
513 struct sockaddr *addr,
514 socklen_t *addrlen, int *fd)
516 int ret;
517 int sd = -1;
518 int err;
520 ret = prefork_grab_lock(pf, lock_fd, -1);
521 if (ret != 0) {
522 return ret;
525 err = 0;
526 do {
527 sd = accept(listen_fd, addr, addrlen);
529 if (sd != -1) break;
531 if (errno == EINTR) {
532 if (pf->cmds == PF_SRV_MSG_EXIT) {
533 err = -2;
535 } else {
536 err = errno;
539 } while ((sd == -1) && (err == 0));
541 /* return lock now, even if the accept failed.
542 * if it takes more than 10 seconds we are in deep trouble */
543 ret = prefork_release_lock(pf, lock_fd, 2);
544 if (ret != 0) {
545 /* we were unable to release the lock!! */
546 DEBUG(0, ("Terminating due to fatal failure!\n"));
548 /* Just exit we cannot hold the whole server, better to error
549 * on this one client and hope it was a transiet problem */
550 err = -2;
553 if (err != 0) {
554 if (sd != -1) {
555 close(sd);
556 sd = -1;
558 return err;
561 pf->status = PF_WORKER_BUSY;
562 pf->num_clients++;
563 *fd = sd;
564 return 0;
567 /* ==== async code ==== */
569 #define PF_ASYNC_LOCK_GRAB 0x01
570 #define PF_ASYNC_LOCK_RELEASE 0x02
571 #define PF_ASYNC_ACTION_MASK 0x03
572 #define PF_ASYNC_LOCK_DONE 0x04
574 struct pf_lock_state {
575 struct pf_worker_data *pf;
576 int lock_fd;
577 int flags;
580 static void prefork_lock_handler(struct tevent_context *ev,
581 struct tevent_timer *te,
582 struct timeval curtime, void *pvt);
584 static struct tevent_req *prefork_lock_send(TALLOC_CTX *mem_ctx,
585 struct tevent_context *ev,
586 struct pf_worker_data *pf,
587 int lock_fd, int action)
589 struct tevent_req *req;
590 struct pf_lock_state *state;
592 req = tevent_req_create(mem_ctx, &state, struct pf_lock_state);
593 if (!req) {
594 return NULL;
597 state->pf = pf;
598 state->lock_fd = lock_fd;
599 state->flags = action;
601 /* try once immediately */
602 prefork_lock_handler(ev, NULL, tevent_timeval_zero(), req);
603 if (state->flags & PF_ASYNC_LOCK_DONE) {
604 tevent_req_post(req, ev);
607 return req;
610 static void prefork_lock_handler(struct tevent_context *ev,
611 struct tevent_timer *te,
612 struct timeval curtime, void *pvt)
614 struct tevent_req *req;
615 struct pf_lock_state *state;
616 int ret;
618 req = talloc_get_type_abort(pvt, struct tevent_req);
619 state = tevent_req_data(req, struct pf_lock_state);
621 switch (state->flags & PF_ASYNC_ACTION_MASK) {
622 case PF_ASYNC_LOCK_GRAB:
623 ret = prefork_grab_lock(state->pf, state->lock_fd, 0);
624 break;
625 case PF_ASYNC_LOCK_RELEASE:
626 ret = prefork_release_lock(state->pf, state->lock_fd, 0);
627 break;
628 default:
629 ret = EINVAL;
630 break;
633 switch (ret) {
634 case 0:
635 state->flags |= PF_ASYNC_LOCK_DONE;
636 tevent_req_done(req);
637 return;
638 case -1:
639 te = tevent_add_timer(ev, state,
640 tevent_timeval_current_ofs(1, 0),
641 prefork_lock_handler, req);
642 tevent_req_nomem(te, req);
643 return;
644 case -2:
645 /* server tells us to stop */
646 state->flags |= PF_ASYNC_LOCK_DONE;
647 tevent_req_error(req, -2);
648 return;
649 default:
650 state->flags |= PF_ASYNC_LOCK_DONE;
651 tevent_req_error(req, ret);
652 return;
656 static int prefork_lock_recv(struct tevent_req *req)
658 int ret;
660 if (!tevent_req_is_unix_error(req, &ret)) {
661 ret = 0;
664 tevent_req_received(req);
665 return ret;
668 struct pf_listen_state {
669 struct tevent_context *ev;
670 struct pf_worker_data *pf;
672 int lock_fd;
673 int listen_fd;
675 struct sockaddr *addr;
676 socklen_t *addrlen;
678 int accept_fd;
680 int error;
683 static void prefork_listen_lock_done(struct tevent_req *subreq);
684 static void prefork_listen_accept_handler(struct tevent_context *ev,
685 struct tevent_fd *fde,
686 uint16_t flags, void *pvt);
687 static void prefork_listen_release_done(struct tevent_req *subreq);
689 struct tevent_req *prefork_listen_send(TALLOC_CTX *mem_ctx,
690 struct tevent_context *ev,
691 struct pf_worker_data *pf,
692 int lock_fd, int listen_fd,
693 struct sockaddr *addr,
694 socklen_t *addrlen)
696 struct tevent_req *req, *subreq;
697 struct pf_listen_state *state;
699 req = tevent_req_create(mem_ctx, &state, struct pf_listen_state);
700 if (!req) {
701 return NULL;
704 state->ev = ev;
705 state->pf = pf;
706 state->lock_fd = lock_fd;
707 state->listen_fd = listen_fd;
708 state->addr = addr;
709 state->addrlen = addrlen;
710 state->accept_fd = -1;
711 state->error = 0;
713 subreq = prefork_lock_send(state, state->ev, state->pf,
714 state->lock_fd, PF_ASYNC_LOCK_GRAB);
715 if (tevent_req_nomem(subreq, req)) {
716 return tevent_req_post(req, ev);
719 tevent_req_set_callback(subreq, prefork_listen_lock_done, req);
720 return req;
723 static void prefork_listen_lock_done(struct tevent_req *subreq)
725 struct tevent_req *req;
726 struct pf_listen_state *state;
727 struct tevent_fd *fde;
728 int ret;
730 req = tevent_req_callback_data(subreq, struct tevent_req);
731 state = tevent_req_data(req, struct pf_listen_state);
733 ret = prefork_lock_recv(subreq);
734 if (ret != 0) {
735 tevent_req_error(req, ret);
736 return;
739 /* next step, accept */
740 fde = tevent_add_fd(state->ev, state,
741 state->listen_fd, TEVENT_FD_READ,
742 prefork_listen_accept_handler, req);
743 tevent_req_nomem(fde, req);
746 static void prefork_listen_accept_handler(struct tevent_context *ev,
747 struct tevent_fd *fde,
748 uint16_t flags, void *pvt)
750 struct pf_listen_state *state;
751 struct tevent_req *req, *subreq;
752 int err = 0;
753 int sd = -1;
755 req = talloc_get_type_abort(pvt, struct tevent_req);
756 state = tevent_req_data(req, struct pf_listen_state);
758 sd = accept(state->listen_fd, state->addr, state->addrlen);
759 if (sd == -1) {
760 if (errno == EINTR) {
761 /* keep trying */
762 return;
764 err = errno;
765 DEBUG(6, ("Accept failed! (%d, %s)\n", err, strerror(err)));
769 /* do not track the listen fd anymore */
770 talloc_free(fde);
771 if (err) {
772 tevent_req_error(req, err);
773 return;
776 state->accept_fd = sd;
778 /* release lock now */
779 subreq = prefork_lock_send(state, state->ev, state->pf,
780 state->lock_fd, PF_ASYNC_LOCK_RELEASE);
781 if (tevent_req_nomem(subreq, req)) {
782 return;
784 tevent_req_set_callback(subreq, prefork_listen_release_done, req);
787 static void prefork_listen_release_done(struct tevent_req *subreq)
789 struct tevent_req *req;
790 int ret;
792 req = tevent_req_callback_data(subreq, struct tevent_req);
794 ret = prefork_lock_recv(subreq);
795 if (ret != 0) {
796 tevent_req_error(req, ret);
797 return;
800 tevent_req_done(req);
803 int prefork_listen_recv(struct tevent_req *req, int *fd)
805 struct pf_listen_state *state;
806 int ret;
808 state = tevent_req_data(req, struct pf_listen_state);
810 if (tevent_req_is_unix_error(req, &ret)) {
811 if (state->accept_fd != -1) {
812 close(state->accept_fd);
814 } else {
815 *fd = state->accept_fd;
816 ret = 0;
817 state->pf->status = PF_WORKER_BUSY;
818 state->pf->num_clients++;
821 tevent_req_received(req);
822 return ret;