6 static uint64 next_id
= 1;
8 static int cur_prime
= 0;
10 static job all_jobs_init
[12289] = {0};
11 static job
*all_jobs
= all_jobs_init
;
12 static size_t all_jobs_cap
= 12289; /* == primes[0] */
13 static size_t all_jobs_used
= 0;
15 static int hash_table_was_oom
= 0;
20 _get_job_hash_index(uint64 job_id
)
22 return job_id
% all_jobs_cap
;
30 index
= _get_job_hash_index(j
->r
.id
);
32 j
->ht_next
= all_jobs
[index
];
36 /* accept a load factor of 4 */
37 if (all_jobs_used
> (all_jobs_cap
<< 2)) rehash();
44 size_t old_cap
= all_jobs_cap
, old_used
= all_jobs_used
, i
;
46 if (cur_prime
>= NUM_PRIMES
) return;
47 if (hash_table_was_oom
) return;
49 all_jobs_cap
= primes
[++cur_prime
];
50 all_jobs
= calloc(all_jobs_cap
, sizeof(job
));
52 twarnx("Failed to allocate %d new hash buckets", all_jobs_cap
);
53 hash_table_was_oom
= 1;
56 all_jobs_cap
= old_cap
;
57 all_jobs_used
= old_used
;
62 for (i
= 0; i
< old_cap
; i
++) {
70 if (old
!= all_jobs_init
) {
76 job_find(uint64 job_id
)
79 int index
= _get_job_hash_index(job_id
);
81 for (jh
= all_jobs
[index
]; jh
&& jh
->r
.id
!= job_id
; jh
= jh
->ht_next
);
87 allocate_job(int body_size
)
91 j
= malloc(sizeof(struct job
) + body_size
);
92 if (!j
) return twarnx("OOM"), (job
) 0;
94 memset(j
, 0, sizeof(struct job
));
95 j
->r
.created_at
= nanoseconds();
96 j
->r
.body_size
= body_size
;
97 j
->next
= j
->prev
= j
; /* not in a linked list */
102 make_job_with_id(uint pri
, int64 delay
, int64 ttr
,
103 int body_size
, tube tube
, uint64 id
)
107 j
= allocate_job(body_size
);
108 if (!j
) return twarnx("OOM"), (job
) 0;
112 if (id
>= next_id
) next_id
= id
+ 1;
122 TUBE_ASSIGN(j
->tube
, tube
);
132 slot
= &all_jobs
[_get_job_hash_index(j
->r
.id
)];
133 while (*slot
&& *slot
!= j
) slot
= &(*slot
)->ht_next
;
135 *slot
= (*slot
)->ht_next
;
144 TUBE_ASSIGN(j
->tube
, NULL
);
145 if (j
->r
.state
!= Copy
) job_hash_free(j
);
152 job_setheappos(void *j
, int pos
)
154 ((job
)j
)->heap_index
= pos
;
158 job_pri_less(void *ax
, void *bx
)
161 if (a
->r
.pri
< b
->r
.pri
) return 1;
162 if (a
->r
.pri
> b
->r
.pri
) return 0;
163 return a
->r
.id
< b
->r
.id
;
167 job_delay_less(void *ax
, void *bx
)
170 if (a
->r
.deadline_at
< b
->r
.deadline_at
) return 1;
171 if (a
->r
.deadline_at
> b
->r
.deadline_at
) return 0;
172 return a
->r
.id
< b
->r
.id
;
182 n
= malloc(sizeof(struct job
) + j
->r
.body_size
);
183 if (!n
) return twarnx("OOM"), (job
) 0;
185 memcpy(n
, j
, sizeof(struct job
) + j
->r
.body_size
);
186 n
->next
= n
->prev
= n
; /* not in a linked list */
188 n
->file
= NULL
; /* copies do not have refcnt on the wal */
190 n
->tube
= 0; /* Don't use memcpy for the tube, which we must refcount. */
191 TUBE_ASSIGN(n
->tube
, j
->tube
);
193 /* Mark this job as a copy so it can be appropriately freed later on */
202 if (j
->r
.state
== Ready
) return "ready";
203 if (j
->r
.state
== Reserved
) return "reserved";
204 if (j
->r
.state
== Buried
) return "buried";
205 if (j
->r
.state
== Delayed
) return "delayed";
210 job_list_any_p(job head
)
212 return head
->next
!= head
|| head
->prev
!= head
;
219 if (!job_list_any_p(j
)) return NULL
; /* not in a doubly-linked list */
221 j
->next
->prev
= j
->prev
;
222 j
->prev
->next
= j
->next
;
224 j
->prev
= j
->next
= j
;
230 job_insert(job head
, job j
)
232 if (job_list_any_p(j
)) return; /* already in a linked list */
234 j
->prev
= head
->prev
;
236 head
->prev
->next
= j
;
250 return all_jobs_used
;
256 all_jobs
= calloc(all_jobs_cap
, sizeof(job
));
258 twarnx("Failed to allocate %d hash buckets", all_jobs_cap
);