Update protocol doc -- can now delete ready jobs.
[beanstalkd.git] / pq.c
blob2d54bfc9eb9853cac61a78f0e793bb5d91c3b154
1 /* pq.c - priority queue */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <string.h>
23 #include "tube.h" /* hack to make cpp happy */
24 #include "pq.h"
26 void
27 pq_init(pq q, job_cmp_fn cmp)
29 if (!q) return;
31 q->cap = 0;
32 q->used = 0;
33 q->cmp = cmp;
34 q->heap = NULL;
36 return;
39 void
40 pq_clear(pq q)
42 free(q->heap);
43 pq_init(q, q->cmp);
46 static void
47 pq_grow(pq q)
49 job *nheap;
50 unsigned int ncap = q->cap << 1 ? : 1;
52 nheap = malloc(ncap * sizeof(job));
53 if (!nheap) return;
55 if (q->heap) memcpy(nheap, q->heap, q->used * sizeof(job));
56 free(q->heap);
57 q->heap = nheap;
58 q->cap = ncap;
61 static void
62 swap(pq q, unsigned int a, unsigned int b)
64 job j;
66 j = q->heap[a];
67 q->heap[a] = q->heap[b];
68 q->heap[b] = j;
70 q->heap[a]->heap_index = a;
71 q->heap[b]->heap_index = b;
74 #define PARENT(i) (((i-1))>>1)
75 #define CHILD_LEFT(i) (((i)<<1)+1)
76 #define CHILD_RIGHT(i) (((i)<<1)+2)
78 static int
79 cmp(pq q, unsigned int a, unsigned int b)
81 return q->cmp(q->heap[a], q->heap[b]);
84 static void
85 bubble_up(pq q, unsigned int k)
87 int p;
89 if (k == 0) return;
90 p = PARENT(k);
91 if (cmp(q, p, k) <= 0) return;
92 swap(q, k, p);
93 bubble_up(q, p);
96 static void
97 bubble_down(pq q, unsigned int k)
99 int l, r, s;
101 l = CHILD_LEFT(k);
102 r = CHILD_RIGHT(k);
104 s = k;
105 if (l < q->used && cmp(q, l, k) < 0) s = l;
106 if (r < q->used && cmp(q, r, s) < 0) s = r;
107 if (s == k) return; /* already satisfies the heap property */
109 swap(q, k, s);
110 bubble_down(q, s);
113 /* assumes there is at least one item in the queue */
114 static void
115 delete_min(pq q)
117 q->heap[0] = q->heap[--q->used];
118 q->heap[0]->heap_index = 0;
119 if (q->used) bubble_down(q, 0);
123 pq_give(pq q, job j)
125 int k;
127 if (q->used >= q->cap) pq_grow(q);
128 if (q->used >= q->cap) return 0;
130 k = q->used++;
131 q->heap[k] = j;
132 j->heap_index = k;
133 bubble_up(q, k);
135 return 1;
139 pq_take(pq q)
141 job j;
143 if (q->used == 0) return NULL;
145 j = q->heap[0];
146 delete_min(q);
147 return j;
151 pq_peek(pq q)
153 if (q->used == 0) return NULL;
154 return q->heap[0];
158 pq_remove(pq q, job j)
160 unsigned long long int id;
161 unsigned int pri;
163 if (q->used == 0) return NULL;
164 if (q->heap[j->heap_index] != j) return NULL;
166 id = j->id;
167 j->id = 0;
168 pri = j->pri;
169 j->pri = 0;
171 bubble_up(q, j->heap_index);
173 j->id = id;
174 j->pri = pri;
176 /* can't happen */
177 if (q->heap[0] != j) return NULL;
179 delete_min(q);
181 return j;