Fix crash in unit tests.
[beanstalkd.git] / job.c
bloba8001dda1911c425afef8dd11801ddf363f41f18
1 /* job.c - a job in the queue */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <string.h>
22 #include "tube.h"
23 #include "job.h"
24 #include "util.h"
26 static unsigned long long int next_id = 1;
28 static job *all_jobs=NULL;
30 static int
31 _get_job_hash_index(unsigned long long int job_id)
33 return job_id % NUM_JOB_BUCKETS;
36 static job
37 store_job(job j)
39 int index=0;
41 index = _get_job_hash_index(j->id);
43 j->ht_next = all_jobs[index];
45 all_jobs[index] = j;
47 return j;
50 job
51 job_find(unsigned long long int job_id)
53 job jh = NULL;
54 int index = _get_job_hash_index(job_id);
56 for (jh = all_jobs[index]; jh && jh->id != job_id; jh = jh->ht_next);
58 return jh;
61 job
62 allocate_job(int body_size)
64 job j;
66 j = malloc(sizeof(struct job) + body_size);
67 if (!j) return twarnx("OOM"), NULL;
69 j->id = 0;
70 j->state = JOB_STATE_INVALID;
71 j->creation = time(NULL);
72 j->timeout_ct = j->release_ct = j->bury_ct = j->kick_ct = 0;
73 j->body_size = body_size;
74 j->next = j->prev = j; /* not in a linked list */
75 j->ht_next = NULL;
76 j->tube = NULL;
78 return j;
81 job
82 make_job(unsigned int pri, unsigned int delay, unsigned int ttr, int body_size,
83 tube tube)
85 job j;
87 j = allocate_job(body_size);
88 if (!j) return twarnx("OOM"), NULL;
90 j->id = next_id++;
91 j->pri = pri;
92 j->delay = delay;
93 j->ttr = ttr;
95 if (store_job(j) == NULL) {
96 job_free (j);
97 twarnx("OOM");
98 return NULL;
101 TUBE_ASSIGN(j->tube, tube);
103 return j;
106 static void
107 job_hash_free(job j)
109 int index=_get_job_hash_index(j->id);
110 job jh = all_jobs ? all_jobs[index] : NULL;
112 if (jh) {
113 if (jh == j) {
114 /* Special case the first */
115 all_jobs[index] = jh->ht_next;
116 } else {
117 job tmp;
118 while (jh->ht_next && jh->ht_next != j) jh = jh->ht_next;
119 if (jh->ht_next) {
120 tmp = jh->ht_next;
121 jh->ht_next = jh->ht_next->ht_next;
127 void
128 job_free(job j)
130 if (j) {
131 TUBE_ASSIGN(j->tube, NULL);
132 job_hash_free(j);
135 free(j);
139 job_pri_cmp(job a, job b)
141 if (a->pri == b->pri) {
142 /* we can't just subtract because id has too many bits */
143 if (a->id > b->id) return 1;
144 if (a->id < b->id) return -1;
145 return 0;
147 return a->pri - b->pri;
151 job_delay_cmp(job a, job b)
153 if (a->deadline == b->deadline) {
154 /* we can't just subtract because id has too many bits */
155 if (a->id > b->id) return 1;
156 if (a->id < b->id) return -1;
157 return 0;
159 return a->deadline - b->deadline;
163 job_copy(job j)
165 job n;
167 if (!j) return NULL;
169 n = malloc(sizeof(struct job) + j->body_size);
170 if (!n) return twarnx("OOM"), NULL;
172 memcpy(n, j, sizeof(struct job) + j->body_size);
173 n->next = n->prev = n; /* not in a linked list */
175 n->tube = 0; /* Don't use memcpy for the tube, which we must refcount. */
176 TUBE_ASSIGN(n->tube, j->tube);
178 return n;
181 const char *
182 job_state(job j)
184 if (j->state == JOB_STATE_READY) return "ready";
185 if (j->state == JOB_STATE_RESERVED) return "reserved";
186 if (j->state == JOB_STATE_BURIED) return "buried";
187 if (j->state == JOB_STATE_DELAYED) return "delayed";
188 return "invalid";
192 job_list_any_p(job head)
194 return head->next != head || head->prev != head;
198 job_remove(job j)
200 if (!j) return NULL;
201 if (!job_list_any_p(j)) return NULL; /* not in a doubly-linked list */
203 j->next->prev = j->prev;
204 j->prev->next = j->next;
206 j->prev = j->next = j;
208 return j;
211 void
212 job_insert(job head, job j)
214 if (job_list_any_p(j)) return; /* already in a linked list */
216 j->prev = head->prev;
217 j->next = head;
218 head->prev->next = j;
219 head->prev = j;
222 unsigned long long int
223 total_jobs()
225 return next_id - 1;
228 void
229 job_init()
231 all_jobs = calloc(NUM_JOB_BUCKETS, sizeof(job));
232 if (!all_jobs) {
233 twarnx("Failed to allocate %d hash buckets", NUM_JOB_BUCKETS);