1 /* job.c - a job in the queue */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
26 static unsigned long long int next_id
= 1;
28 static job
*all_jobs
=NULL
;
31 _get_job_hash_index(unsigned long long int job_id
)
33 return job_id
% NUM_JOB_BUCKETS
;
41 index
= _get_job_hash_index(j
->id
);
43 j
->ht_next
= all_jobs
[index
];
51 job_find(unsigned long long int job_id
)
54 int index
= _get_job_hash_index(job_id
);
56 for (jh
= all_jobs
[index
]; jh
&& jh
->id
!= job_id
; jh
= jh
->ht_next
);
62 allocate_job(int body_size
)
66 j
= malloc(sizeof(struct job
) + body_size
);
67 if (!j
) return twarnx("OOM"), NULL
;
70 j
->state
= JOB_STATE_INVALID
;
71 j
->creation
= time(NULL
);
72 j
->timeout_ct
= j
->release_ct
= j
->bury_ct
= j
->kick_ct
= 0;
73 j
->body_size
= body_size
;
74 j
->next
= j
->prev
= j
; /* not in a linked list */
82 make_job(unsigned int pri
, unsigned int delay
, unsigned int ttr
, int body_size
,
87 j
= allocate_job(body_size
);
88 if (!j
) return twarnx("OOM"), NULL
;
95 if (store_job(j
) == NULL
) {
101 TUBE_ASSIGN(j
->tube
, tube
);
109 int index
=_get_job_hash_index(j
->id
);
110 job jh
= all_jobs
? all_jobs
[index
] : NULL
;
114 /* Special case the first */
115 all_jobs
[index
] = jh
->ht_next
;
118 while (jh
->ht_next
&& jh
->ht_next
!= j
) jh
= jh
->ht_next
;
121 jh
->ht_next
= jh
->ht_next
->ht_next
;
131 TUBE_ASSIGN(j
->tube
, NULL
);
139 job_pri_cmp(job a
, job b
)
141 if (a
->pri
== b
->pri
) {
142 /* we can't just subtract because id has too many bits */
143 if (a
->id
> b
->id
) return 1;
144 if (a
->id
< b
->id
) return -1;
147 return a
->pri
- b
->pri
;
151 job_delay_cmp(job a
, job b
)
153 if (a
->deadline
== b
->deadline
) {
154 /* we can't just subtract because id has too many bits */
155 if (a
->id
> b
->id
) return 1;
156 if (a
->id
< b
->id
) return -1;
159 return a
->deadline
- b
->deadline
;
169 n
= malloc(sizeof(struct job
) + j
->body_size
);
170 if (!n
) return twarnx("OOM"), NULL
;
172 memcpy(n
, j
, sizeof(struct job
) + j
->body_size
);
173 n
->next
= n
->prev
= n
; /* not in a linked list */
175 n
->tube
= 0; /* Don't use memcpy for the tube, which we must refcount. */
176 TUBE_ASSIGN(n
->tube
, j
->tube
);
184 if (j
->state
== JOB_STATE_READY
) return "ready";
185 if (j
->state
== JOB_STATE_RESERVED
) return "reserved";
186 if (j
->state
== JOB_STATE_BURIED
) return "buried";
187 if (j
->state
== JOB_STATE_DELAYED
) return "delayed";
192 job_list_any_p(job head
)
194 return head
->next
!= head
|| head
->prev
!= head
;
201 if (!job_list_any_p(j
)) return NULL
; /* not in a doubly-linked list */
203 j
->next
->prev
= j
->prev
;
204 j
->prev
->next
= j
->next
;
206 j
->prev
= j
->next
= j
;
212 job_insert(job head
, job j
)
214 if (job_list_any_p(j
)) return; /* already in a linked list */
216 j
->prev
= head
->prev
;
218 head
->prev
->next
= j
;
222 unsigned long long int
231 all_jobs
= calloc(NUM_JOB_BUCKETS
, sizeof(job
));
233 twarnx("Failed to allocate %d hash buckets", NUM_JOB_BUCKETS
);