add changelog for 1.13
[beanstalkd.git] / job.c
blobea551ce8f2708fa0d9c23899e8edcafb9c8376ce
1 #include "dat.h"
2 #include <stdint.h>
3 #include <stdlib.h>
4 #include <string.h>
6 static uint64 next_id = 1;
8 static int cur_prime = 0;
10 static Job *all_jobs_init[12289] = {0};
11 static Job **all_jobs = all_jobs_init;
12 static size_t all_jobs_cap = 12289; /* == primes[0] */
13 static size_t all_jobs_used = 0;
15 static int hash_table_was_oom = 0;
17 static void rehash(int);
19 static int
20 _get_job_hash_index(uint64 job_id)
22 return job_id % all_jobs_cap;
25 static void
26 store_job(Job *j)
28 int index = 0;
30 index = _get_job_hash_index(j->r.id);
32 j->ht_next = all_jobs[index];
33 all_jobs[index] = j;
34 all_jobs_used++;
36 /* accept a load factor of 4 */
37 if (all_jobs_used > (all_jobs_cap << 2)) rehash(1);
40 static void
41 rehash(int is_upscaling)
43 Job **old = all_jobs;
44 size_t old_cap = all_jobs_cap, old_used = all_jobs_used, i;
45 int old_prime = cur_prime;
46 int d = is_upscaling ? 1 : -1;
48 if (cur_prime + d >= NUM_PRIMES) return;
49 if (cur_prime + d < 0) return;
50 if (is_upscaling && hash_table_was_oom) return;
52 cur_prime += d;
54 all_jobs_cap = primes[cur_prime];
55 all_jobs = calloc(all_jobs_cap, sizeof(Job *));
56 if (!all_jobs) {
57 twarnx("Failed to allocate %zu new hash buckets", all_jobs_cap);
58 hash_table_was_oom = 1;
59 cur_prime = old_prime;
60 all_jobs = old;
61 all_jobs_cap = old_cap;
62 all_jobs_used = old_used;
63 return;
65 all_jobs_used = 0;
66 hash_table_was_oom = 0;
68 for (i = 0; i < old_cap; i++) {
69 while (old[i]) {
70 Job *j = old[i];
71 old[i] = j->ht_next;
72 j->ht_next = NULL;
73 store_job(j);
76 if (old != all_jobs_init) {
77 free(old);
81 Job *
82 job_find(uint64 job_id)
84 int index = _get_job_hash_index(job_id);
85 Job *jh = all_jobs[index];
87 while (jh && jh->r.id != job_id)
88 jh = jh->ht_next;
90 return jh;
93 Job *
94 allocate_job(int body_size)
96 Job *j;
98 j = malloc(sizeof(Job) + body_size);
99 if (!j) {
100 twarnx("OOM");
101 return (Job *) 0;
104 memset(j, 0, sizeof(Job));
105 j->r.created_at = nanoseconds();
106 j->r.body_size = body_size;
107 j->body = (char *)j + sizeof(Job);
108 job_list_reset(j);
109 return j;
112 Job *
113 make_job_with_id(uint32 pri, int64 delay, int64 ttr,
114 int body_size, Tube *tube, uint64 id)
116 Job *j;
118 j = allocate_job(body_size);
119 if (!j) {
120 twarnx("OOM");
121 return (Job *) 0;
124 if (id) {
125 j->r.id = id;
126 if (id >= next_id) next_id = id + 1;
127 } else {
128 j->r.id = next_id++;
130 j->r.pri = pri;
131 j->r.delay = delay;
132 j->r.ttr = ttr;
134 store_job(j);
136 TUBE_ASSIGN(j->tube, tube);
138 return j;
141 static void
142 job_hash_free(Job *j)
144 Job **slot;
146 slot = &all_jobs[_get_job_hash_index(j->r.id)];
147 while (*slot && *slot != j) slot = &(*slot)->ht_next;
148 if (*slot) {
149 *slot = (*slot)->ht_next;
150 --all_jobs_used;
153 // Downscale when the hashmap is too sparse
154 if (all_jobs_used < (all_jobs_cap >> 4)) rehash(0);
157 void
158 job_free(Job *j)
160 if (j) {
161 TUBE_ASSIGN(j->tube, NULL);
162 if (j->r.state != Copy) job_hash_free(j);
165 free(j);
168 void
169 job_setpos(void *j, size_t pos)
171 ((Job *)j)->heap_index = pos;
175 job_pri_less(void *ja, void *jb)
177 Job *a = (Job *)ja;
178 Job *b = (Job *)jb;
179 if (a->r.pri < b->r.pri) return 1;
180 if (a->r.pri > b->r.pri) return 0;
181 return a->r.id < b->r.id;
185 job_delay_less(void *ja, void *jb)
187 Job *a = ja;
188 Job *b = jb;
189 if (a->r.deadline_at < b->r.deadline_at) return 1;
190 if (a->r.deadline_at > b->r.deadline_at) return 0;
191 return a->r.id < b->r.id;
194 Job *
195 job_copy(Job *j)
197 if (!j)
198 return NULL;
200 Job *n = malloc(sizeof(Job) + j->r.body_size);
201 if (!n) {
202 twarnx("OOM");
203 return (Job *) 0;
206 memcpy(n, j, sizeof(Job) + j->r.body_size);
207 job_list_reset(n);
209 n->file = NULL; /* copies do not have refcnt on the wal */
211 n->tube = 0; /* Don't use memcpy for the tube, which we must refcount. */
212 TUBE_ASSIGN(n->tube, j->tube);
214 /* Mark this job as a copy so it can be appropriately freed later on */
215 n->r.state = Copy;
217 return n;
220 const char *
221 job_state(Job *j)
223 if (j->r.state == Ready) return "ready";
224 if (j->r.state == Reserved) return "reserved";
225 if (j->r.state == Buried) return "buried";
226 if (j->r.state == Delayed) return "delayed";
227 return "invalid";
230 // job_list_reset detaches head from the list,
231 // marking the list starting in head pointing to itself.
232 void
233 job_list_reset(Job *head)
235 head->prev = head;
236 head->next = head;
240 job_list_is_empty(Job *head)
242 return head->next == head && head->prev == head;
245 Job *
246 job_list_remove(Job *j)
248 if (!j) return NULL;
249 if (job_list_is_empty(j)) return NULL; /* not in a doubly-linked list */
251 j->next->prev = j->prev;
252 j->prev->next = j->next;
254 job_list_reset(j);
256 return j;
259 void
260 job_list_insert(Job *head, Job *j)
262 if (!job_list_is_empty(j)) return; /* already in a linked list */
264 j->prev = head->prev;
265 j->next = head;
266 head->prev->next = j;
267 head->prev = j;
270 /* for unit tests */
271 size_t
272 get_all_jobs_used()
274 return all_jobs_used;