refactor
[beanstalkd.git] / beanstalkd.c
blob8f281ef57d4f92d74e1f0647c65e5bfb70c419f6
1 /* beanstalk - fast, general-purpose work queue */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include "t.h"
21 #include <signal.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <errno.h>
25 #include <sys/stat.h>
26 #include <sys/resource.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <unistd.h>
30 #include <pwd.h>
31 #include <event.h>
32 #include <limits.h>
33 #include <fcntl.h>
35 #include "sd-daemon.h"
36 #include "version.h"
37 #include "dat.h"
39 static char *user = NULL;
40 static int detach = 0;
41 static char *port = "11300";
42 static char *host_addr;
44 static void
45 nullfd(int fd, int flags)
47 int r;
49 close(fd);
50 r = open("/dev/null", flags);
51 if (r != fd) twarn("open(\"/dev/null\")"), exit(1);
54 static void
55 dfork()
57 pid_t p;
59 p = fork();
60 if (p == -1) exit(1);
61 if (p) exit(0);
64 static void
65 daemonize()
67 int r;
69 r = chdir("/");
70 if (r) return twarn("chdir");
72 nullfd(0, O_RDONLY);
73 nullfd(1, O_WRONLY);
74 nullfd(2, O_WRONLY);
75 umask(0);
76 dfork();
77 setsid();
78 dfork();
81 static void
82 su(const char *user) {
83 int r;
84 struct passwd *pwent;
86 errno = 0;
87 pwent = getpwnam(user);
88 if (errno) twarn("getpwnam(\"%s\")", user), exit(32);
89 if (!pwent) twarnx("getpwnam(\"%s\"): no such user", user), exit(33);
91 r = setgid(pwent->pw_gid);
92 if (r == -1) twarn("setgid(%d \"%s\")", pwent->pw_gid, user), exit(34);
94 r = setuid(pwent->pw_uid);
95 if (r == -1) twarn("setuid(%d \"%s\")", pwent->pw_uid, user), exit(34);
98 void
99 exit_cleanly(int sig)
101 binlog_shutdown();
102 exit(0);
106 static void
107 set_sig_handlers()
109 int r;
110 struct sigaction sa;
112 sa.sa_handler = SIG_IGN;
113 sa.sa_flags = 0;
114 r = sigemptyset(&sa.sa_mask);
115 if (r == -1) twarn("sigemptyset()"), exit(111);
117 r = sigaction(SIGPIPE, &sa, 0);
118 if (r == -1) twarn("sigaction(SIGPIPE)"), exit(111);
120 sa.sa_handler = enter_drain_mode;
121 r = sigaction(SIGUSR1, &sa, 0);
122 if (r == -1) twarn("sigaction(SIGUSR1)"), exit(111);
124 sa.sa_handler = exit_cleanly;
125 r = sigaction(SIGINT, &sa, 0);
126 if (r == -1) twarn("sigaction(SIGINT)"), exit(111);
128 sa.sa_handler = exit_cleanly;
129 r = sigaction(SIGTERM, &sa, 0);
130 if (r == -1) twarn("sigaction(SIGTERM)"), exit(111);
133 /* This is a workaround for a mystifying workaround in libevent's epoll
134 * implementation. The epoll_init() function creates an epoll fd with space to
135 * handle RLIMIT_NOFILE - 1 fds, accompanied by the following puzzling comment:
136 * "Solaris is somewhat retarded - it's important to drop backwards
137 * compatibility when making changes. So, don't dare to put rl.rlim_cur here."
138 * This is presumably to work around a bug in Solaris, but it has the
139 * unfortunate side-effect of causing epoll_ctl() (and, therefore, event_add())
140 * to fail for a valid fd if we have hit the limit of open fds. That makes it
141 * hard to provide reasonable behavior in that situation. So, let's reduce the
142 * real value of RLIMIT_NOFILE by one, after epoll_init() has run. */
143 static void
144 nudge_fd_limit()
146 int r;
147 struct rlimit rl;
149 r = getrlimit(RLIMIT_NOFILE, &rl);
150 if (r != 0) twarn("getrlimit(RLIMIT_NOFILE)"), exit(2);
152 rl.rlim_cur--;
154 r = setrlimit(RLIMIT_NOFILE, &rl);
155 if (r != 0) twarn("setrlimit(RLIMIT_NOFILE)"), exit(2);
158 static void
159 usage(char *msg, char *arg)
161 if (arg) warnx("%s: %s", msg, arg);
162 fprintf(stderr, "Use: %s [OPTIONS]\n"
163 "\n"
164 "Options:\n"
165 " -d detach\n"
166 " -b DIR binlog directory (must be absolute path if used with -d)\n"
167 " -f MS fsync at most once every MS milliseconds"
168 " (use -f 0 for \"always fsync\")\n"
169 " -F never fsync (default)\n"
170 " -l ADDR listen on address (default is 0.0.0.0)\n"
171 " -p PORT listen on port (default is 11300)\n"
172 " -u USER become user and group\n"
173 " -z BYTES set the maximum job size in bytes (default is %d)\n"
174 " -s BYTES set the size of each binlog file (default is %d)\n"
175 " (will be rounded up to a multiple of 512 bytes)\n"
176 " -v show version information\n"
177 " -h show this help\n",
178 progname, JOB_DATA_SIZE_LIMIT_DEFAULT, BINLOG_SIZE_LIMIT_DEFAULT);
179 exit(arg ? 5 : 0);
182 static size_t
183 parse_size_t(char *str)
185 char r, x;
186 size_t size;
188 r = sscanf(str, "%zu%c", &size, &x);
189 if (1 != r) usage("invalid size", str);
190 return size;
193 static char *
194 require_arg(char *opt, char *arg)
196 if (!arg) usage("option requires an argument", opt);
197 return arg;
200 static void
201 warn_systemd_ignored_option(char *opt, char *arg)
203 if (sd_listen_fds(0) > 0) {
204 warnx("inherited listen fd; ignoring option: %s %s", opt, arg);
208 static void
209 opts(int argc, char **argv)
211 int i;
213 for (i = 1; i < argc; ++i) {
214 if (argv[i][0] != '-') usage("unknown option", argv[i]);
215 if (argv[i][1] == 0 || argv[i][2] != 0) usage("unknown option",argv[i]);
216 switch (argv[i][1]) {
217 case 'd':
218 detach = 1;
219 break;
220 case 'p':
221 port = require_arg("-p", argv[++i]);
222 warn_systemd_ignored_option("-p", argv[i]);
223 break;
224 case 'l':
225 host_addr = require_arg("-l", argv[++i]);
226 warn_systemd_ignored_option("-l", argv[i]);
227 break;
228 case 'z':
229 job_data_size_limit = parse_size_t(require_arg("-z",
230 argv[++i]));
231 break;
232 case 's':
233 binlog_size_limit = parse_size_t(require_arg("-s", argv[++i]));
234 break;
235 case 'f':
236 fsync_throttle_ms = parse_size_t(require_arg("-f", argv[++i]));
237 enable_fsync = 1;
238 break;
239 case 'F':
240 enable_fsync = 0;
241 break;
242 case 'u':
243 user = require_arg("-u", argv[++i]);
244 break;
245 case 'b':
246 binlog_dir = require_arg("-b", argv[++i]);
247 break;
248 case 'h':
249 usage(NULL, NULL);
250 case 'v':
251 printf("beanstalkd %s\n", VERSION);
252 exit(0);
253 default:
254 usage("unknown option", argv[i]);
260 main(int argc, char **argv)
262 int r, l;
263 struct event_base *ev_base;
264 struct job binlog_jobs = {};
266 progname = argv[0];
267 opts(argc, argv);
269 if (detach && binlog_dir) {
270 if (binlog_dir[0] != '/') {
271 warnx("The -b option requires an absolute path when used with -d.");
272 usage("Path is not absolute", binlog_dir);
276 job_init();
277 prot_init();
279 /* We want to make sure that only one beanstalkd tries to use the binlog
280 * directory at a time. So acquire a lock now and never release it. */
281 if (binlog_dir) {
282 r = binlog_lock();
283 if (!r) twarnx("failed to lock binlog dir %s", binlog_dir), exit(10);
286 r = make_server_socket(host_addr, port);
287 if (r == -1) twarnx("make_server_socket()"), exit(111);
288 l = r;
290 if (user) su(user);
291 ev_base = event_init();
292 set_sig_handlers();
293 nudge_fd_limit();
295 binlog_jobs.prev = binlog_jobs.next = &binlog_jobs;
296 binlog_init(&binlog_jobs);
297 prot_replay_binlog(&binlog_jobs);
299 if (detach) {
300 daemonize();
301 event_reinit(ev_base);
304 r = listen(l, 1024);
305 if (r == -1) twarn("listen()");
306 accept_handler = (evh)h_accept;
307 unbrake();
308 event_dispatch();
309 twarnx("event_dispatch error");
310 binlog_shutdown();
311 return 0;