Fix some memory leaks.
[beanstalkd.git] / beanstalkd.c
blobda2f163a23be16d7731d1dc073fb1d7dfdc39d88
1 /* beanstalk - fast, general-purpose work queue */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include "config.h"
21 #include <signal.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <errno.h>
25 #include <sys/stat.h>
26 #include <sys/resource.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <arpa/inet.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32 #include <pwd.h>
33 #include <event.h>
34 #include <limits.h>
36 #include "net.h"
37 #include "util.h"
38 #include "prot.h"
39 #include "binlog.h"
41 static char *user = NULL;
42 static int detach = 0;
43 static int port = 11300;
44 static struct in_addr host_addr;
46 static void
47 nullfd(int fd, int flags)
49 int r;
51 close(fd);
52 r = open("/dev/null", flags);
53 if (r != fd) twarn("open(\"/dev/null\")"), exit(1);
56 static void
57 dfork()
59 pid_t p;
61 p = fork();
62 if (p == -1) exit(1);
63 if (p) exit(0);
66 static void
67 daemonize()
69 int r;
71 r = chdir("/");
72 if (r) return twarn("chdir");
74 nullfd(0, O_RDONLY);
75 nullfd(1, O_WRONLY);
76 nullfd(2, O_WRONLY);
77 umask(0);
78 dfork();
79 setsid();
80 dfork();
83 static void
84 su(const char *user) {
85 int r;
86 struct passwd *pwent;
88 errno = 0;
89 pwent = getpwnam(user);
90 if (errno) twarn("getpwnam(\"%s\")", user), exit(32);
91 if (!pwent) twarnx("getpwnam(\"%s\"): no such user", user), exit(33);
93 r = setgid(pwent->pw_gid);
94 if (r == -1) twarn("setgid(%d \"%s\")", pwent->pw_gid, user), exit(34);
96 r = setuid(pwent->pw_uid);
97 if (r == -1) twarn("setuid(%d \"%s\")", pwent->pw_uid, user), exit(34);
100 void
101 exit_cleanly(int sig)
103 binlog_shutdown();
104 exit(0);
108 static void
109 set_sig_handlers()
111 int r;
112 struct sigaction sa;
114 sa.sa_handler = SIG_IGN;
115 sa.sa_flags = 0;
116 r = sigemptyset(&sa.sa_mask);
117 if (r == -1) twarn("sigemptyset()"), exit(111);
119 r = sigaction(SIGPIPE, &sa, 0);
120 if (r == -1) twarn("sigaction(SIGPIPE)"), exit(111);
122 sa.sa_handler = enter_drain_mode;
123 r = sigaction(SIGUSR1, &sa, 0);
124 if (r == -1) twarn("sigaction(SIGUSR1)"), exit(111);
126 sa.sa_handler = exit_cleanly;
127 r = sigaction(SIGINT, &sa, 0);
128 if (r == -1) twarn("sigaction(SIGINT)"), exit(111);
130 sa.sa_handler = exit_cleanly;
131 r = sigaction(SIGTERM, &sa, 0);
132 if (r == -1) twarn("sigaction(SIGTERM)"), exit(111);
135 /* This is a workaround for a mystifying workaround in libevent's epoll
136 * implementation. The epoll_init() function creates an epoll fd with space to
137 * handle RLIMIT_NOFILE - 1 fds, accompanied by the following puzzling comment:
138 * "Solaris is somewhat retarded - it's important to drop backwards
139 * compatibility when making changes. So, don't dare to put rl.rlim_cur here."
140 * This is presumably to work around a bug in Solaris, but it has the
141 * unfortunate side-effect of causing epoll_ctl() (and, therefore, event_add())
142 * to fail for a valid fd if we have hit the limit of open fds. That makes it
143 * hard to provide reasonable behavior in that situation. So, let's reduce the
144 * real value of RLIMIT_NOFILE by one, after epoll_init() has run. */
145 static void
146 nudge_fd_limit()
148 int r;
149 struct rlimit rl;
151 r = getrlimit(RLIMIT_NOFILE, &rl);
152 if (r != 0) twarn("getrlimit(RLIMIT_NOFILE)"), exit(2);
154 rl.rlim_cur--;
156 r = setrlimit(RLIMIT_NOFILE, &rl);
157 if (r != 0) twarn("setrlimit(RLIMIT_NOFILE)"), exit(2);
160 static void
161 usage(char *msg, char *arg)
163 if (arg) warnx("%s: %s", msg, arg);
164 fprintf(stderr, "Use: %s [OPTIONS]\n"
165 "\n"
166 "Options:\n"
167 " -d detach\n"
168 " -b DIR binlog directory\n"
169 " -f MS fsync at most once every MS milliseconds"
170 " (use -f 0 for \"always fsync\")\n"
171 " -F never fsync (default)\n"
172 " -l ADDR listen on address (default is 0.0.0.0)\n"
173 " -p PORT listen on port (default is 11300)\n"
174 " -u USER become user and group\n"
175 " -z BYTES set the maximum job size in bytes (default is %d)\n"
176 " -s BYTES set the size of each binlog file (default is %d)\n"
177 #ifndef HAVE_POSIX_FALLOCATE
178 " (will be rounded up to a multiple of 512 bytes)\n"
179 #endif
180 " -v show version information\n"
181 " -h show this help\n",
182 progname, JOB_DATA_SIZE_LIMIT_DEFAULT, BINLOG_SIZE_LIMIT_DEFAULT);
183 exit(arg ? 5 : 0);
186 static size_t
187 parse_size_t(char *str)
189 char r, x;
190 size_t size;
192 r = sscanf(str, "%zu%c", &size, &x);
193 if (1 != r) usage("invalid size", str);
194 return size;
197 static char *
198 require_arg(char *opt, char *arg)
200 if (!arg) usage("option requires an argument", opt);
201 return arg;
204 static int
205 parse_port(char *portstr)
207 int port;
208 char *end;
210 errno = 0;
211 port = strtol(portstr, &end, 10);
212 if (end == portstr) usage("invalid port", portstr);
213 if (end[0] != 0) usage("invalid port", portstr);
214 if (errno) usage("invalid port", portstr);
216 return port;
219 static struct in_addr
220 parse_host(char *hoststr)
222 int r;
223 struct in_addr addr;
225 r = inet_aton(hoststr, &addr);
226 if (!r) usage("invalid address", hoststr);
228 return addr;
231 static void
232 opts(int argc, char **argv)
234 int i;
236 for (i = 1; i < argc; ++i) {
237 if (argv[i][0] != '-') usage("unknown option", argv[i]);
238 if (argv[i][1] == 0 || argv[i][2] != 0) usage("unknown option",argv[i]);
239 switch (argv[i][1]) {
240 case 'd':
241 detach = 1;
242 break;
243 case 'p':
244 port = parse_port(require_arg("-p", argv[++i]));
245 break;
246 case 'l':
247 host_addr = parse_host(require_arg("-l", argv[++i]));
248 break;
249 case 'z':
250 job_data_size_limit = parse_size_t(require_arg("-z",
251 argv[++i]));
252 break;
253 case 's':
254 binlog_size_limit = parse_size_t(require_arg("-s", argv[++i]));
255 break;
256 case 'f':
257 fsync_throttle_ms = parse_size_t(require_arg("-f", argv[++i]));
258 enable_fsync = 1;
259 break;
260 case 'F':
261 enable_fsync = 0;
262 break;
263 case 'u':
264 user = require_arg("-u", argv[++i]);
265 break;
266 case 'b':
267 binlog_dir = require_arg("-b", argv[++i]);
268 break;
269 case 'h':
270 usage(NULL, NULL);
271 case 'v':
272 printf("beanstalkd %s\n", VERSION);
273 exit(0);
274 default:
275 usage("unknown option", argv[i]);
281 main(int argc, char **argv)
283 int r;
284 struct event_base *ev_base;
285 struct job binlog_jobs = {};
287 host_addr.s_addr = INADDR_ANY;
289 progname = argv[0];
290 opts(argc, argv);
292 job_init();
293 prot_init();
295 /* We want to make sure that only one beanstalkd tries to use the binlog
296 * directory at a time. So acquire a lock now and never release it. */
297 if (binlog_dir) {
298 r = binlog_lock();
299 if (!r) twarnx("failed to lock binlog dir %s", binlog_dir), exit(10);
302 r = make_server_socket(host_addr, port);
303 if (r == -1) twarnx("make_server_socket()"), exit(111);
305 if (user) su(user);
306 ev_base = event_init();
307 set_sig_handlers();
308 nudge_fd_limit();
310 unbrake((evh) h_accept);
312 binlog_jobs.prev = binlog_jobs.next = &binlog_jobs;
313 binlog_init(&binlog_jobs);
314 prot_replay_binlog(&binlog_jobs);
316 if (detach) {
317 daemonize();
318 event_reinit(ev_base);
321 event_dispatch();
322 binlog_shutdown();
323 twarnx("got here for some reason");
324 return 0;