build: make the "rpm" rule work once again
[iwhd.git] / mpipe.c
blob46b3ad229d268f18072bbe7f2f2e3871f6ad3a55
1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
17 #include <assert.h>
19 #include "iwh.h"
20 #include "mpipe.h"
22 void
23 pipe_init_shared (pipe_shared *ps, void *owner, unsigned short ncons)
25 ps->owner = owner;
26 pthread_mutex_init(&ps->lock,NULL);
27 pthread_cond_init(&ps->prod_cond,NULL);
28 pthread_cond_init(&ps->cons_cond,NULL);
29 pipe_reset(ps,ncons);
32 void
33 pipe_reset (pipe_shared *ps, unsigned short ncons)
35 ps->data_ptr = NULL;
36 ps->data_len = 0;
37 ps->sequence = 0; /* TBD: randomize? */
38 ps->cons_total = ncons;
39 ps->cons_done = 0;
40 ps->cons_error = 0;
41 ps->cons_init_done = 0;
42 ps->cons_init_error = 0;
43 ps->prod_state = PROD_INIT;
46 pipe_private *
47 pipe_init_private (pipe_shared *ps)
49 pipe_private *pp;
51 pp = malloc(sizeof(*pp));
52 if (pp) {
53 pp->shared = ps;
55 * The producer might already have posted #1, so we can't use
56 * ps->sequence+1. This precludes consumers joining
57 * mid-stream, but that was never a goal anyway.
59 pp->sequence = 1;
60 pp->offset = 0;
62 return pp;
65 int
66 pipe_cons_wait (pipe_private *pp)
68 pipe_shared *ps = pp->shared;
69 int rc;
71 pp->offset = 0;
72 pthread_mutex_lock(&ps->lock);
74 while (ps->sequence != pp->sequence) {
75 DPRINTF("consumer about to wait for %lu\n",pp->sequence);
76 pthread_cond_wait(&ps->cons_cond,&ps->lock);
77 DPRINTF("consumer done waiting\n");
80 rc = (ps->data_len != 0);
81 if (!rc) {
82 DPRINTF("consumer saw producer is done\n");
83 if (++ps->cons_done + ps->cons_error >= ps->cons_total) {
84 pthread_cond_signal(&ps->prod_cond);
86 rc = 0;
89 pthread_mutex_unlock(&ps->lock);
90 return rc;
93 void
94 pipe_cons_signal (pipe_private *pp, int error)
96 pipe_shared *ps = pp->shared;
98 pthread_mutex_lock(&ps->lock);
99 ++pp->sequence;
100 pp->offset = 0;
102 if (error) {
103 ++ps->cons_error;
105 else {
106 ++ps->cons_done;
108 if (ps->cons_done + ps->cons_error >= ps->cons_total) {
109 DPRINTF("consumer signal, total %u done %u error %u\n",
110 ps->cons_total, ps->cons_done, ps->cons_error);
111 pthread_cond_signal(&ps->prod_cond);
113 pthread_mutex_unlock(&ps->lock);
116 void
117 pipe_cons_siginit (pipe_shared *ps, int error)
119 pthread_mutex_lock(&ps->lock);
120 assert ((ps->cons_init_done + ps->cons_init_error) < ps->cons_total);
121 if (error) {
122 ++ps->cons_init_error;
124 else {
125 ++ps->cons_init_done;
127 pthread_cond_broadcast(&ps->prod_cond);
128 DPRINTF("consumer init signal (total %u done %u error %u)\n",
129 ps->cons_total,ps->cons_done,ps->cons_error);
130 pthread_mutex_unlock(&ps->lock);
134 * Return the number of bad children, or -1 if some other error.
137 pipe_prod_wait_init (pipe_shared *ps)
139 pthread_mutex_lock(&ps->lock);
140 DPRINTF("producer initializing (total %u done %u error %u)\n",
141 ps->cons_total, ps->cons_init_done, ps->cons_init_error);
142 while (ps->cons_init_done + ps->cons_init_error < ps->cons_total) {
143 pthread_cond_broadcast(&ps->cons_cond);
144 pthread_cond_wait(&ps->prod_cond,&ps->lock);
145 DPRINTF(" after sleep (total %u done %u error %u)\n",
146 ps->cons_total,ps->cons_init_done,ps->cons_init_error);
148 pthread_mutex_unlock(&ps->lock);
149 return ps->cons_error;
152 void
153 pipe_prod_signal (pipe_shared *ps, void *ptr, size_t total)
155 pthread_mutex_lock(&ps->lock);
156 if (ps->cons_error >= ps->cons_total) {
157 DPRINTF("producer posting %zu bytes as %ld, no sinks"
158 " (total %u error %u)\n",
159 total,ps->sequence+1, ps->cons_total,ps->cons_error);
160 pthread_mutex_unlock(&ps->lock);
161 return;
163 ps->data_ptr = ptr;
164 ps->data_len = total;
165 ps->cons_done = ps->cons_error;
166 ++ps->sequence;
167 DPRINTF("producer posting %zu bytes as %ld (total %u error %u)\n",
168 total,ps->sequence, ps->cons_total,ps->cons_error);
169 while (ps->cons_done + ps->cons_error < ps->cons_total) {
170 pthread_cond_broadcast(&ps->cons_cond);
171 pthread_cond_wait(&ps->prod_cond,&ps->lock);
172 DPRINTF("%u children yet to read (total %u done %u error %u)\n",
173 ps->cons_total - (ps->cons_done + ps->cons_error),
174 ps->cons_total,ps->cons_done,ps->cons_error);
176 pthread_mutex_unlock(&ps->lock);
179 void
180 pipe_prod_siginit (pipe_shared *ps, int err)
182 pthread_mutex_lock(&ps->lock);
183 assert (ps->prod_state == PROD_INIT);
184 ps->prod_state = (err >= 0) ? PROD_RUNNING : PROD_ERROR;
185 pthread_cond_broadcast(&ps->cons_cond);
186 pthread_mutex_unlock(&ps->lock);
190 pipe_cons_wait_init (pipe_shared *ps)
192 pthread_mutex_lock(&ps->lock);
193 DPRINTF("consumer initing\n");
194 while (ps->prod_state == PROD_INIT) {
195 pthread_cond_broadcast(&ps->prod_cond);
196 pthread_cond_wait(&ps->cons_cond,&ps->lock);
197 DPRINTF(" after sleep (state = %u)\n",ps->prod_state);
199 pthread_mutex_unlock(&ps->lock);
200 return (ps->prod_state == PROD_ERROR);
203 void
204 pipe_prod_finish (pipe_shared *ps)
206 pthread_mutex_lock(&ps->lock);
207 ps->data_len = 0;
208 ps->cons_done = ps->cons_error;
209 ++ps->sequence;
210 DPRINTF("waiting for %u children (total %u error %u)\n",
211 ps->cons_total - (ps->cons_done + ps->cons_error),
212 ps->cons_total,ps->cons_error);
213 while (ps->cons_done + ps->cons_error < ps->cons_total) {
214 pthread_cond_broadcast(&ps->cons_cond);
215 pthread_cond_wait(&ps->prod_cond,&ps->lock);
216 DPRINTF("%u children left (total %u done %u error %u)\n",
217 ps->cons_total - (ps->cons_done + ps->cons_error),
218 ps->cons_total,ps->cons_done,ps->cons_error);
220 pthread_mutex_unlock(&ps->lock);
221 DPRINTF("producer finished with sequence %ld\n",ps->sequence);