use sockaddr_in portably
[beanstalkd.git] / job.c
blobb251c29125187786bd66c963b46c0a77435e9277
1 /* job.c - a job in the queue */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdint.h>
20 #include <sys/types.h>
21 #include <stdlib.h>
22 #include <sys/time.h>
23 #include <string.h>
24 #include <event.h>
26 #include "dat.h"
28 static uint64 next_id = 1;
30 static int cur_prime = 0;
32 static job all_jobs_init[12289] = {0};
33 static job *all_jobs = all_jobs_init;
34 static size_t all_jobs_cap = 12289; /* == primes[0] */
35 static size_t all_jobs_used = 0;
37 static int hash_table_was_oom = 0;
39 static void rehash();
41 static int
42 _get_job_hash_index(uint64 job_id)
44 return job_id % all_jobs_cap;
47 static void
48 store_job(job j)
50 int index = 0;
52 index = _get_job_hash_index(j->id);
54 j->ht_next = all_jobs[index];
55 all_jobs[index] = j;
56 all_jobs_used++;
58 /* accept a load factor of 4 */
59 if (all_jobs_used > (all_jobs_cap << 2)) rehash();
62 static void
63 rehash()
65 job *old = all_jobs;
66 size_t old_cap = all_jobs_cap, old_used = all_jobs_used, i;
68 if (cur_prime >= NUM_PRIMES) return;
69 if (hash_table_was_oom) return;
71 all_jobs_cap = primes[++cur_prime];
72 all_jobs = calloc(all_jobs_cap, sizeof(job));
73 if (!all_jobs) {
74 twarnx("Failed to allocate %d new hash buckets", all_jobs_cap);
75 hash_table_was_oom = 1;
76 --cur_prime;
77 all_jobs = old;
78 all_jobs_cap = old_cap;
79 all_jobs_used = old_used;
80 return;
82 all_jobs_used = 0;
84 for (i = 0; i < old_cap; i++) {
85 while (old[i]) {
86 job j = old[i];
87 old[i] = j->ht_next;
88 j->ht_next = NULL;
89 store_job(j);
92 if (old != all_jobs_init) {
93 free(old);
97 job
98 job_find(uint64 job_id)
100 job jh = NULL;
101 int index = _get_job_hash_index(job_id);
103 for (jh = all_jobs[index]; jh && jh->id != job_id; jh = jh->ht_next);
105 return jh;
109 allocate_job(int body_size)
111 job j;
113 j = malloc(sizeof(struct job) + body_size);
114 if (!j) return twarnx("OOM"), (job) 0;
116 j->id = 0;
117 j->state = JOB_STATE_INVALID;
118 j->created_at = nanoseconds();
119 j->reserve_ct = j->timeout_ct = j->release_ct = j->bury_ct = j->kick_ct = 0;
120 j->body_size = body_size;
121 j->next = j->prev = j; /* not in a linked list */
122 j->ht_next = NULL;
123 j->tube = NULL;
124 j->binlog = NULL;
125 j->heap_index = 0;
126 j->reserved_binlog_space = 0;
128 return j;
132 make_job_with_id(uint pri, int64 delay, int64 ttr,
133 int body_size, tube tube, uint64 id)
135 job j;
137 j = allocate_job(body_size);
138 if (!j) return twarnx("OOM"), (job) 0;
140 if (id) {
141 j->id = id;
142 if (id >= next_id) next_id = id + 1;
143 } else {
144 j->id = next_id++;
146 j->pri = pri;
147 j->delay = delay;
148 j->ttr = ttr;
150 store_job(j);
152 TUBE_ASSIGN(j->tube, tube);
154 return j;
157 static void
158 job_hash_free(job j)
160 job *slot;
162 slot = &all_jobs[_get_job_hash_index(j->id)];
163 while (*slot && *slot != j) slot = &(*slot)->ht_next;
164 if (*slot) {
165 *slot = (*slot)->ht_next;
166 --all_jobs_used;
170 void
171 job_free(job j)
173 if (j) {
174 TUBE_ASSIGN(j->tube, NULL);
175 if (j->state != JOB_STATE_COPY) job_hash_free(j);
178 free(j);
181 void
182 job_setheappos(void *j, int pos)
184 ((job)j)->heap_index = pos;
187 /* We can't substrct any of these values because there are too many bits */
189 job_pri_cmp(void *ax, void *bx)
191 job a = ax, b = bx;
192 if (a->pri > b->pri) return 1;
193 if (a->pri < b->pri) return -1;
194 if (a->id > b->id) return 1;
195 if (a->id < b->id) return -1;
196 return 0;
199 /* We can't substrct any of these values because there are too many bits */
201 job_delay_cmp(void *ax, void *bx)
203 job a = ax, b = bx;
204 if (a->deadline_at > b->deadline_at) return 1;
205 if (a->deadline_at < b->deadline_at) return -1;
206 if (a->id > b->id) return 1;
207 if (a->id < b->id) return -1;
208 return 0;
212 job_copy(job j)
214 job n;
216 if (!j) return NULL;
218 n = malloc(sizeof(struct job) + j->body_size);
219 if (!n) return twarnx("OOM"), (job) 0;
221 memcpy(n, j, sizeof(struct job) + j->body_size);
222 n->next = n->prev = n; /* not in a linked list */
224 n->binlog = NULL; /* copies do not have refcnt on the binlog */
226 n->tube = 0; /* Don't use memcpy for the tube, which we must refcount. */
227 TUBE_ASSIGN(n->tube, j->tube);
229 /* Mark this job as a copy so it can be appropriately freed later on */
230 n->state = JOB_STATE_COPY;
232 return n;
235 const char *
236 job_state(job j)
238 if (j->state == JOB_STATE_READY) return "ready";
239 if (j->state == JOB_STATE_RESERVED) return "reserved";
240 if (j->state == JOB_STATE_BURIED) return "buried";
241 if (j->state == JOB_STATE_DELAYED) return "delayed";
242 return "invalid";
246 job_list_any_p(job head)
248 return head->next != head || head->prev != head;
252 job_remove(job j)
254 if (!j) return NULL;
255 if (!job_list_any_p(j)) return NULL; /* not in a doubly-linked list */
257 j->next->prev = j->prev;
258 j->prev->next = j->next;
260 j->prev = j->next = j;
262 return j;
265 void
266 job_insert(job head, job j)
268 if (job_list_any_p(j)) return; /* already in a linked list */
270 j->prev = head->prev;
271 j->next = head;
272 head->prev->next = j;
273 head->prev = j;
276 uint64
277 total_jobs()
279 return next_id - 1;
282 /* for unit tests */
283 size_t
284 get_all_jobs_used()
286 return all_jobs_used;
289 void
290 job_init()
292 all_jobs = calloc(all_jobs_cap, sizeof(job));
293 if (!all_jobs) {
294 twarnx("Failed to allocate %d hash buckets", all_jobs_cap);