1 /* job.c - a job in the queue */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include <sys/types.h>
28 static uint64 next_id
= 1;
30 static int cur_prime
= 0;
32 static job all_jobs_init
[12289] = {0};
33 static job
*all_jobs
= all_jobs_init
;
34 static size_t all_jobs_cap
= 12289; /* == primes[0] */
35 static size_t all_jobs_used
= 0;
37 static int hash_table_was_oom
= 0;
42 _get_job_hash_index(uint64 job_id
)
44 return job_id
% all_jobs_cap
;
52 index
= _get_job_hash_index(j
->id
);
54 j
->ht_next
= all_jobs
[index
];
58 /* accept a load factor of 4 */
59 if (all_jobs_used
> (all_jobs_cap
<< 2)) rehash();
66 size_t old_cap
= all_jobs_cap
, old_used
= all_jobs_used
, i
;
68 if (cur_prime
>= NUM_PRIMES
) return;
69 if (hash_table_was_oom
) return;
71 all_jobs_cap
= primes
[++cur_prime
];
72 all_jobs
= calloc(all_jobs_cap
, sizeof(job
));
74 twarnx("Failed to allocate %d new hash buckets", all_jobs_cap
);
75 hash_table_was_oom
= 1;
78 all_jobs_cap
= old_cap
;
79 all_jobs_used
= old_used
;
84 for (i
= 0; i
< old_cap
; i
++) {
92 if (old
!= all_jobs_init
) {
98 job_find(uint64 job_id
)
101 int index
= _get_job_hash_index(job_id
);
103 for (jh
= all_jobs
[index
]; jh
&& jh
->id
!= job_id
; jh
= jh
->ht_next
);
109 allocate_job(int body_size
)
113 j
= malloc(sizeof(struct job
) + body_size
);
114 if (!j
) return twarnx("OOM"), (job
) 0;
117 j
->state
= JOB_STATE_INVALID
;
118 j
->created_at
= nanoseconds();
119 j
->reserve_ct
= j
->timeout_ct
= j
->release_ct
= j
->bury_ct
= j
->kick_ct
= 0;
120 j
->body_size
= body_size
;
121 j
->next
= j
->prev
= j
; /* not in a linked list */
126 j
->reserved_binlog_space
= 0;
132 make_job_with_id(uint pri
, int64 delay
, int64 ttr
,
133 int body_size
, tube tube
, uint64 id
)
137 j
= allocate_job(body_size
);
138 if (!j
) return twarnx("OOM"), (job
) 0;
142 if (id
>= next_id
) next_id
= id
+ 1;
152 TUBE_ASSIGN(j
->tube
, tube
);
162 slot
= &all_jobs
[_get_job_hash_index(j
->id
)];
163 while (*slot
&& *slot
!= j
) slot
= &(*slot
)->ht_next
;
165 *slot
= (*slot
)->ht_next
;
174 TUBE_ASSIGN(j
->tube
, NULL
);
175 if (j
->state
!= JOB_STATE_COPY
) job_hash_free(j
);
182 job_setheappos(void *j
, int pos
)
184 ((job
)j
)->heap_index
= pos
;
187 /* We can't substrct any of these values because there are too many bits */
189 job_pri_cmp(void *ax
, void *bx
)
192 if (a
->pri
> b
->pri
) return 1;
193 if (a
->pri
< b
->pri
) return -1;
194 if (a
->id
> b
->id
) return 1;
195 if (a
->id
< b
->id
) return -1;
199 /* We can't substrct any of these values because there are too many bits */
201 job_delay_cmp(void *ax
, void *bx
)
204 if (a
->deadline_at
> b
->deadline_at
) return 1;
205 if (a
->deadline_at
< b
->deadline_at
) return -1;
206 if (a
->id
> b
->id
) return 1;
207 if (a
->id
< b
->id
) return -1;
218 n
= malloc(sizeof(struct job
) + j
->body_size
);
219 if (!n
) return twarnx("OOM"), (job
) 0;
221 memcpy(n
, j
, sizeof(struct job
) + j
->body_size
);
222 n
->next
= n
->prev
= n
; /* not in a linked list */
224 n
->binlog
= NULL
; /* copies do not have refcnt on the binlog */
226 n
->tube
= 0; /* Don't use memcpy for the tube, which we must refcount. */
227 TUBE_ASSIGN(n
->tube
, j
->tube
);
229 /* Mark this job as a copy so it can be appropriately freed later on */
230 n
->state
= JOB_STATE_COPY
;
238 if (j
->state
== JOB_STATE_READY
) return "ready";
239 if (j
->state
== JOB_STATE_RESERVED
) return "reserved";
240 if (j
->state
== JOB_STATE_BURIED
) return "buried";
241 if (j
->state
== JOB_STATE_DELAYED
) return "delayed";
246 job_list_any_p(job head
)
248 return head
->next
!= head
|| head
->prev
!= head
;
255 if (!job_list_any_p(j
)) return NULL
; /* not in a doubly-linked list */
257 j
->next
->prev
= j
->prev
;
258 j
->prev
->next
= j
->next
;
260 j
->prev
= j
->next
= j
;
266 job_insert(job head
, job j
)
268 if (job_list_any_p(j
)) return; /* already in a linked list */
270 j
->prev
= head
->prev
;
272 head
->prev
->next
= j
;
286 return all_jobs_used
;
292 all_jobs
= calloc(all_jobs_cap
, sizeof(job
));
294 twarnx("Failed to allocate %d hash buckets", all_jobs_cap
);