shrink the hashmap when it's too sparse
[beanstalkd.git] / job.c
blob32487da508110ad9985531a23ba6c5a97aee30d2
1 #include <stdint.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include "dat.h"
6 static uint64 next_id = 1;
8 static int cur_prime = 0;
10 static job all_jobs_init[12289] = {0};
11 static job *all_jobs = all_jobs_init;
12 static size_t all_jobs_cap = 12289; /* == primes[0] */
13 static size_t all_jobs_used = 0;
15 static int hash_table_was_oom = 0;
17 static void rehash();
19 static int
20 _get_job_hash_index(uint64 job_id)
22 return job_id % all_jobs_cap;
25 static void
26 store_job(job j)
28 int index = 0;
30 index = _get_job_hash_index(j->r.id);
32 j->ht_next = all_jobs[index];
33 all_jobs[index] = j;
34 all_jobs_used++;
36 /* accept a load factor of 4 */
37 if (all_jobs_used > (all_jobs_cap << 2)) rehash(1);
40 static void
41 rehash(int is_upscaling)
43 job *old = all_jobs;
44 size_t old_cap = all_jobs_cap, old_used = all_jobs_used, i;
45 int old_prime = cur_prime;
46 int d = is_upscaling ? 1 : -1;
48 if (cur_prime + d >= NUM_PRIMES) return;
49 if (cur_prime + d < 0) return;
50 if (is_upscaling && hash_table_was_oom) return;
52 cur_prime += d;
54 all_jobs_cap = primes[cur_prime];
55 all_jobs = calloc(all_jobs_cap, sizeof(job));
56 if (!all_jobs) {
57 twarnx("Failed to allocate %zu new hash buckets", all_jobs_cap);
58 hash_table_was_oom = 1;
59 cur_prime = old_prime;
60 all_jobs = old;
61 all_jobs_cap = old_cap;
62 all_jobs_used = old_used;
63 return;
65 all_jobs_used = 0;
66 hash_table_was_oom = 0;
68 for (i = 0; i < old_cap; i++) {
69 while (old[i]) {
70 job j = old[i];
71 old[i] = j->ht_next;
72 j->ht_next = NULL;
73 store_job(j);
76 if (old != all_jobs_init) {
77 free(old);
81 job
82 job_find(uint64 job_id)
84 job jh = NULL;
85 int index = _get_job_hash_index(job_id);
87 for (jh = all_jobs[index]; jh && jh->r.id != job_id; jh = jh->ht_next);
89 return jh;
92 job
93 allocate_job(int body_size)
95 job j;
97 j = malloc(sizeof(struct job) + body_size);
98 if (!j) return twarnx("OOM"), (job) 0;
100 memset(j, 0, sizeof(struct job));
101 j->r.created_at = nanoseconds();
102 j->r.body_size = body_size;
103 j->next = j->prev = j; /* not in a linked list */
104 return j;
108 make_job_with_id(uint pri, int64 delay, int64 ttr,
109 int body_size, tube tube, uint64 id)
111 job j;
113 j = allocate_job(body_size);
114 if (!j) return twarnx("OOM"), (job) 0;
116 if (id) {
117 j->r.id = id;
118 if (id >= next_id) next_id = id + 1;
119 } else {
120 j->r.id = next_id++;
122 j->r.pri = pri;
123 j->r.delay = delay;
124 j->r.ttr = ttr;
126 store_job(j);
128 TUBE_ASSIGN(j->tube, tube);
130 return j;
133 static void
134 job_hash_free(job j)
136 job *slot;
138 slot = &all_jobs[_get_job_hash_index(j->r.id)];
139 while (*slot && *slot != j) slot = &(*slot)->ht_next;
140 if (*slot) {
141 *slot = (*slot)->ht_next;
142 --all_jobs_used;
145 // Downscale when the hashmap is too sparse
146 if (all_jobs_used < (all_jobs_cap >> 4)) rehash(0);
149 void
150 job_free(job j)
152 if (j) {
153 TUBE_ASSIGN(j->tube, NULL);
154 if (j->r.state != Copy) job_hash_free(j);
157 free(j);
160 void
161 job_setheappos(void *j, int pos)
163 ((job)j)->heap_index = pos;
167 job_pri_less(void *ax, void *bx)
169 job a = ax, b = bx;
170 if (a->r.pri < b->r.pri) return 1;
171 if (a->r.pri > b->r.pri) return 0;
172 return a->r.id < b->r.id;
176 job_delay_less(void *ax, void *bx)
178 job a = ax, b = bx;
179 if (a->r.deadline_at < b->r.deadline_at) return 1;
180 if (a->r.deadline_at > b->r.deadline_at) return 0;
181 return a->r.id < b->r.id;
185 job_copy(job j)
187 job n;
189 if (!j) return NULL;
191 n = malloc(sizeof(struct job) + j->r.body_size);
192 if (!n) return twarnx("OOM"), (job) 0;
194 memcpy(n, j, sizeof(struct job) + j->r.body_size);
195 n->next = n->prev = n; /* not in a linked list */
197 n->file = NULL; /* copies do not have refcnt on the wal */
199 n->tube = 0; /* Don't use memcpy for the tube, which we must refcount. */
200 TUBE_ASSIGN(n->tube, j->tube);
202 /* Mark this job as a copy so it can be appropriately freed later on */
203 n->r.state = Copy;
205 return n;
208 const char *
209 job_state(job j)
211 if (j->r.state == Ready) return "ready";
212 if (j->r.state == Reserved) return "reserved";
213 if (j->r.state == Buried) return "buried";
214 if (j->r.state == Delayed) return "delayed";
215 return "invalid";
219 job_list_any_p(job head)
221 return head->next != head || head->prev != head;
225 job_remove(job j)
227 if (!j) return NULL;
228 if (!job_list_any_p(j)) return NULL; /* not in a doubly-linked list */
230 j->next->prev = j->prev;
231 j->prev->next = j->next;
233 j->prev = j->next = j;
235 return j;
238 void
239 job_insert(job head, job j)
241 if (job_list_any_p(j)) return; /* already in a linked list */
243 j->prev = head->prev;
244 j->next = head;
245 head->prev->next = j;
246 head->prev = j;
249 uint64
250 total_jobs()
252 return next_id - 1;
255 /* for unit tests */
256 size_t
257 get_all_jobs_used()
259 return all_jobs_used;