Update comment.
[beanstalkd.git] / job.c
blobc5da776646803a53f8cbfce51895de5d2593a728
1 /* job.c - a job in the queue */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <string.h>
22 #include "tube.h"
23 #include "job.h"
24 #include "primes.h"
25 #include "util.h"
27 static unsigned long long int next_id = 1;
29 static int cur_prime = 0;
31 static job *all_jobs = NULL;
32 static size_t all_jobs_cap = 12289; /* == primes[0] */
33 static size_t all_jobs_used = 0;
35 static int hash_table_was_oom = 0;
37 static void rehash();
39 static int
40 _get_job_hash_index(unsigned long long int job_id)
42 return job_id % all_jobs_cap;
45 static void
46 store_job(job j)
48 int index = 0;
50 index = _get_job_hash_index(j->id);
52 j->ht_next = all_jobs[index];
53 all_jobs[index] = j;
54 all_jobs_used++;
56 /* accept a load factor of 4 */
57 if (all_jobs_used > (all_jobs_cap << 2)) rehash();
60 static void
61 rehash()
63 job *old = all_jobs;
64 size_t old_cap = all_jobs_cap, old_used = all_jobs_used, i;
66 if (cur_prime >= NUM_PRIMES) return;
67 if (hash_table_was_oom) return;
69 all_jobs_cap = primes[++cur_prime];
70 all_jobs = calloc(all_jobs_cap, sizeof(job));
71 if (!all_jobs) {
72 twarnx("Failed to allocate %d new hash buckets", all_jobs_cap);
73 hash_table_was_oom = 1;
74 --cur_prime;
75 all_jobs = old;
76 all_jobs_cap = old_cap;
77 all_jobs_used = old_used;
78 return;
80 all_jobs_used = 0;
82 for (i = 0; i < old_cap; i++) {
83 while (old[i]) {
84 job j = old[i];
85 old[i] = j->ht_next;
86 j->ht_next = NULL;
87 store_job(j);
92 job
93 job_find(unsigned long long int job_id)
95 job jh = NULL;
96 int index = _get_job_hash_index(job_id);
98 for (jh = all_jobs[index]; jh && jh->id != job_id; jh = jh->ht_next);
100 return jh;
104 allocate_job(int body_size)
106 job j;
108 j = malloc(sizeof(struct job) + body_size);
109 if (!j) return twarnx("OOM"), (job) 0;
111 j->id = 0;
112 j->state = JOB_STATE_INVALID;
113 j->creation = time(NULL);
114 j->reserve_ct = j->timeout_ct = j->release_ct = j->bury_ct = j->kick_ct = 0;
115 j->body_size = body_size;
116 j->next = j->prev = j; /* not in a linked list */
117 j->ht_next = NULL;
118 j->tube = NULL;
119 j->binlog = NULL;
120 j->heap_index = 0;
121 j->reserved_binlog_space = 0;
123 return j;
127 make_job_with_id(unsigned int pri, unsigned int delay, unsigned int ttr,
128 int body_size, tube tube, unsigned long long id)
130 job j;
132 j = allocate_job(body_size);
133 if (!j) return twarnx("OOM"), (job) 0;
135 if (id) {
136 j->id = id;
137 if (id >= next_id) next_id = id + 1;
138 } else {
139 j->id = next_id++;
141 j->pri = pri;
142 j->delay = delay;
143 j->ttr = ttr;
145 store_job(j);
147 TUBE_ASSIGN(j->tube, tube);
149 return j;
152 static void
153 job_hash_free(job j)
155 job *slot;
157 slot = &all_jobs[_get_job_hash_index(j->id)];
158 while (*slot && *slot != j) slot = &(*slot)->ht_next;
159 if (*slot) {
160 *slot = (*slot)->ht_next;
161 --all_jobs_used;
165 void
166 job_free(job j)
168 if (j) {
169 TUBE_ASSIGN(j->tube, NULL);
170 if (j->state != JOB_STATE_COPY) job_hash_free(j);
173 free(j);
176 /* We can't substrct any of these values because there are too many bits */
178 job_pri_cmp(job a, job b)
180 if (a->pri > b->pri) return 1;
181 if (a->pri < b->pri) return -1;
182 if (a->id > b->id) return 1;
183 if (a->id < b->id) return -1;
184 return 0;
187 /* We can't substrct any of these values because there are too many bits */
189 job_delay_cmp(job a, job b)
191 if (a->deadline > b->deadline) return 1;
192 if (a->deadline < b->deadline) return -1;
193 if (a->id > b->id) return 1;
194 if (a->id < b->id) return -1;
195 return 0;
199 job_copy(job j)
201 job n;
203 if (!j) return NULL;
205 n = malloc(sizeof(struct job) + j->body_size);
206 if (!n) return twarnx("OOM"), (job) 0;
208 memcpy(n, j, sizeof(struct job) + j->body_size);
209 n->next = n->prev = n; /* not in a linked list */
211 n->binlog = NULL; /* copies do not have refcnt on the binlog */
213 n->tube = 0; /* Don't use memcpy for the tube, which we must refcount. */
214 TUBE_ASSIGN(n->tube, j->tube);
216 /* Mark this job as a copy so it can be appropriately freed later on */
217 n->state = JOB_STATE_COPY;
219 return n;
222 const char *
223 job_state(job j)
225 if (j->state == JOB_STATE_READY) return "ready";
226 if (j->state == JOB_STATE_RESERVED) return "reserved";
227 if (j->state == JOB_STATE_BURIED) return "buried";
228 if (j->state == JOB_STATE_DELAYED) return "delayed";
229 return "invalid";
233 job_list_any_p(job head)
235 return head->next != head || head->prev != head;
239 job_remove(job j)
241 if (!j) return NULL;
242 if (!job_list_any_p(j)) return NULL; /* not in a doubly-linked list */
244 j->next->prev = j->prev;
245 j->prev->next = j->next;
247 j->prev = j->next = j;
249 return j;
252 void
253 job_insert(job head, job j)
255 if (job_list_any_p(j)) return; /* already in a linked list */
257 j->prev = head->prev;
258 j->next = head;
259 head->prev->next = j;
260 head->prev = j;
263 unsigned long long int
264 total_jobs()
266 return next_id - 1;
269 /* for unit tests */
270 size_t
271 get_all_jobs_used()
273 return all_jobs_used;
276 void
277 job_init()
279 all_jobs = calloc(all_jobs_cap, sizeof(job));
280 if (!all_jobs) {
281 twarnx("Failed to allocate %d hash buckets", all_jobs_cap);