6 static uint64 next_id
= 1;
8 static int cur_prime
= 0;
10 static Job
*all_jobs_init
[12289] = {0};
11 static Job
**all_jobs
= all_jobs_init
;
12 static size_t all_jobs_cap
= 12289; /* == primes[0] */
13 static size_t all_jobs_used
= 0;
15 static int hash_table_was_oom
= 0;
17 static void rehash(int);
20 _get_job_hash_index(uint64 job_id
)
22 return job_id
% all_jobs_cap
;
30 index
= _get_job_hash_index(j
->r
.id
);
32 j
->ht_next
= all_jobs
[index
];
36 /* accept a load factor of 4 */
37 if (all_jobs_used
> (all_jobs_cap
<< 2)) rehash(1);
41 rehash(int is_upscaling
)
44 size_t old_cap
= all_jobs_cap
, old_used
= all_jobs_used
, i
;
45 int old_prime
= cur_prime
;
46 int d
= is_upscaling
? 1 : -1;
48 if (cur_prime
+ d
>= NUM_PRIMES
) return;
49 if (cur_prime
+ d
< 0) return;
50 if (is_upscaling
&& hash_table_was_oom
) return;
54 all_jobs_cap
= primes
[cur_prime
];
55 all_jobs
= calloc(all_jobs_cap
, sizeof(Job
*));
57 twarnx("Failed to allocate %zu new hash buckets", all_jobs_cap
);
58 hash_table_was_oom
= 1;
59 cur_prime
= old_prime
;
61 all_jobs_cap
= old_cap
;
62 all_jobs_used
= old_used
;
66 hash_table_was_oom
= 0;
68 for (i
= 0; i
< old_cap
; i
++) {
76 if (old
!= all_jobs_init
) {
82 job_find(uint64 job_id
)
84 int index
= _get_job_hash_index(job_id
);
85 Job
*jh
= all_jobs
[index
];
87 while (jh
&& jh
->r
.id
!= job_id
)
94 allocate_job(int body_size
)
98 j
= malloc(sizeof(Job
) + body_size
);
104 memset(j
, 0, sizeof(Job
));
105 j
->r
.created_at
= nanoseconds();
106 j
->r
.body_size
= body_size
;
107 j
->body
= (char *)j
+ sizeof(Job
);
113 make_job_with_id(uint32 pri
, int64 delay
, int64 ttr
,
114 int body_size
, Tube
*tube
, uint64 id
)
118 j
= allocate_job(body_size
);
126 if (id
>= next_id
) next_id
= id
+ 1;
136 TUBE_ASSIGN(j
->tube
, tube
);
142 job_hash_free(Job
*j
)
146 slot
= &all_jobs
[_get_job_hash_index(j
->r
.id
)];
147 while (*slot
&& *slot
!= j
) slot
= &(*slot
)->ht_next
;
149 *slot
= (*slot
)->ht_next
;
153 // Downscale when the hashmap is too sparse
154 if (all_jobs_used
< (all_jobs_cap
>> 4)) rehash(0);
161 TUBE_ASSIGN(j
->tube
, NULL
);
162 if (j
->r
.state
!= Copy
) job_hash_free(j
);
169 job_setpos(void *j
, size_t pos
)
171 ((Job
*)j
)->heap_index
= pos
;
175 job_pri_less(void *ja
, void *jb
)
179 if (a
->r
.pri
< b
->r
.pri
) return 1;
180 if (a
->r
.pri
> b
->r
.pri
) return 0;
181 return a
->r
.id
< b
->r
.id
;
185 job_delay_less(void *ja
, void *jb
)
189 if (a
->r
.deadline_at
< b
->r
.deadline_at
) return 1;
190 if (a
->r
.deadline_at
> b
->r
.deadline_at
) return 0;
191 return a
->r
.id
< b
->r
.id
;
200 Job
*n
= malloc(sizeof(Job
) + j
->r
.body_size
);
206 memcpy(n
, j
, sizeof(Job
) + j
->r
.body_size
);
209 n
->file
= NULL
; /* copies do not have refcnt on the wal */
211 n
->tube
= 0; /* Don't use memcpy for the tube, which we must refcount. */
212 TUBE_ASSIGN(n
->tube
, j
->tube
);
214 /* Mark this job as a copy so it can be appropriately freed later on */
223 if (j
->r
.state
== Ready
) return "ready";
224 if (j
->r
.state
== Reserved
) return "reserved";
225 if (j
->r
.state
== Buried
) return "buried";
226 if (j
->r
.state
== Delayed
) return "delayed";
230 // job_list_reset detaches head from the list,
231 // marking the list starting in head pointing to itself.
233 job_list_reset(Job
*head
)
240 job_list_is_empty(Job
*head
)
242 return head
->next
== head
&& head
->prev
== head
;
246 job_list_remove(Job
*j
)
249 if (job_list_is_empty(j
)) return NULL
; /* not in a doubly-linked list */
251 j
->next
->prev
= j
->prev
;
252 j
->prev
->next
= j
->next
;
260 job_list_insert(Job
*head
, Job
*j
)
262 if (!job_list_is_empty(j
)) return; /* already in a linked list */
264 j
->prev
= head
->prev
;
266 head
->prev
->next
= j
;
274 return all_jobs_used
;