document the new binlog stats
[beanstalkd.git] / job.c
blob3f5fb9286ddb136f315a4061ff30aa7160b46b95
1 #include <stdint.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include "dat.h"
6 static uint64 next_id = 1;
8 static int cur_prime = 0;
10 static job all_jobs_init[12289] = {0};
11 static job *all_jobs = all_jobs_init;
12 static size_t all_jobs_cap = 12289; /* == primes[0] */
13 static size_t all_jobs_used = 0;
15 static int hash_table_was_oom = 0;
17 static void rehash();
19 static int
20 _get_job_hash_index(uint64 job_id)
22 return job_id % all_jobs_cap;
25 static void
26 store_job(job j)
28 int index = 0;
30 index = _get_job_hash_index(j->r.id);
32 j->ht_next = all_jobs[index];
33 all_jobs[index] = j;
34 all_jobs_used++;
36 /* accept a load factor of 4 */
37 if (all_jobs_used > (all_jobs_cap << 2)) rehash();
40 static void
41 rehash()
43 job *old = all_jobs;
44 size_t old_cap = all_jobs_cap, old_used = all_jobs_used, i;
46 if (cur_prime >= NUM_PRIMES) return;
47 if (hash_table_was_oom) return;
49 all_jobs_cap = primes[++cur_prime];
50 all_jobs = calloc(all_jobs_cap, sizeof(job));
51 if (!all_jobs) {
52 twarnx("Failed to allocate %d new hash buckets", all_jobs_cap);
53 hash_table_was_oom = 1;
54 --cur_prime;
55 all_jobs = old;
56 all_jobs_cap = old_cap;
57 all_jobs_used = old_used;
58 return;
60 all_jobs_used = 0;
62 for (i = 0; i < old_cap; i++) {
63 while (old[i]) {
64 job j = old[i];
65 old[i] = j->ht_next;
66 j->ht_next = NULL;
67 store_job(j);
70 if (old != all_jobs_init) {
71 free(old);
75 job
76 job_find(uint64 job_id)
78 job jh = NULL;
79 int index = _get_job_hash_index(job_id);
81 for (jh = all_jobs[index]; jh && jh->r.id != job_id; jh = jh->ht_next);
83 return jh;
86 job
87 allocate_job(int body_size)
89 job j;
91 j = malloc(sizeof(struct job) + body_size);
92 if (!j) return twarnx("OOM"), (job) 0;
94 memset(j, 0, sizeof(struct job));
95 j->r.created_at = nanoseconds();
96 j->r.body_size = body_size;
97 j->next = j->prev = j; /* not in a linked list */
98 return j;
102 make_job_with_id(uint pri, int64 delay, int64 ttr,
103 int body_size, tube tube, uint64 id)
105 job j;
107 j = allocate_job(body_size);
108 if (!j) return twarnx("OOM"), (job) 0;
110 if (id) {
111 j->r.id = id;
112 if (id >= next_id) next_id = id + 1;
113 } else {
114 j->r.id = next_id++;
116 j->r.pri = pri;
117 j->r.delay = delay;
118 j->r.ttr = ttr;
120 store_job(j);
122 TUBE_ASSIGN(j->tube, tube);
124 return j;
127 static void
128 job_hash_free(job j)
130 job *slot;
132 slot = &all_jobs[_get_job_hash_index(j->r.id)];
133 while (*slot && *slot != j) slot = &(*slot)->ht_next;
134 if (*slot) {
135 *slot = (*slot)->ht_next;
136 --all_jobs_used;
140 void
141 job_free(job j)
143 if (j) {
144 TUBE_ASSIGN(j->tube, NULL);
145 if (j->r.state != Copy) job_hash_free(j);
148 free(j);
151 void
152 job_setheappos(void *j, int pos)
154 ((job)j)->heap_index = pos;
158 job_pri_less(void *ax, void *bx)
160 job a = ax, b = bx;
161 if (a->r.pri < b->r.pri) return 1;
162 if (a->r.pri > b->r.pri) return 0;
163 return a->r.id < b->r.id;
167 job_delay_less(void *ax, void *bx)
169 job a = ax, b = bx;
170 if (a->r.deadline_at < b->r.deadline_at) return 1;
171 if (a->r.deadline_at > b->r.deadline_at) return 0;
172 return a->r.id < b->r.id;
176 job_copy(job j)
178 job n;
180 if (!j) return NULL;
182 n = malloc(sizeof(struct job) + j->r.body_size);
183 if (!n) return twarnx("OOM"), (job) 0;
185 memcpy(n, j, sizeof(struct job) + j->r.body_size);
186 n->next = n->prev = n; /* not in a linked list */
188 n->file = NULL; /* copies do not have refcnt on the wal */
190 n->tube = 0; /* Don't use memcpy for the tube, which we must refcount. */
191 TUBE_ASSIGN(n->tube, j->tube);
193 /* Mark this job as a copy so it can be appropriately freed later on */
194 n->r.state = Copy;
196 return n;
199 const char *
200 job_state(job j)
202 if (j->r.state == Ready) return "ready";
203 if (j->r.state == Reserved) return "reserved";
204 if (j->r.state == Buried) return "buried";
205 if (j->r.state == Delayed) return "delayed";
206 return "invalid";
210 job_list_any_p(job head)
212 return head->next != head || head->prev != head;
216 job_remove(job j)
218 if (!j) return NULL;
219 if (!job_list_any_p(j)) return NULL; /* not in a doubly-linked list */
221 j->next->prev = j->prev;
222 j->prev->next = j->next;
224 j->prev = j->next = j;
226 return j;
229 void
230 job_insert(job head, job j)
232 if (job_list_any_p(j)) return; /* already in a linked list */
234 j->prev = head->prev;
235 j->next = head;
236 head->prev->next = j;
237 head->prev = j;
240 uint64
241 total_jobs()
243 return next_id - 1;
246 /* for unit tests */
247 size_t
248 get_all_jobs_used()
250 return all_jobs_used;
253 void
254 job_init()
256 all_jobs = calloc(all_jobs_cap, sizeof(job));
257 if (!all_jobs) {
258 twarnx("Failed to allocate %d hash buckets", all_jobs_cap);