proxy.c: declare functions and file-scoped variables static
[iwhd.git] / mpipe.c
blob28bce3b03fce0230d272c6da118a4fce173ebebf
1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include "iwh.h"
19 #include "mpipe.h"
21 void
22 pipe_init_shared (pipe_shared *ps, void *owner, unsigned short ncons)
24 ps->owner = owner;
25 pthread_mutex_init(&ps->lock,NULL);
26 pthread_cond_init(&ps->prod_cond,NULL);
27 pthread_cond_init(&ps->cons_cond,NULL);
28 ps->data_ptr = NULL;
29 ps->data_len = 0;
30 ps->sequence = 0; /* TBD: randomize? */
31 ps->in_init = 1;
32 ps->cons_total = ncons;
33 ps->cons_done = 0;
34 ps->cons_error = 0;
37 pipe_private *
38 pipe_init_private (pipe_shared *ps)
40 pipe_private *pp;
42 pp = malloc(sizeof(*pp));
43 if (pp) {
44 pp->shared = ps;
45 pp->sequence = ps->sequence + 1;
46 pp->offset = 0;
48 return pp;
51 int
52 pipe_cons_wait (pipe_private *pp)
54 pipe_shared *ps = pp->shared;
55 int rc;
57 pthread_mutex_lock(&ps->lock);
59 while (ps->sequence != pp->sequence) {
60 DPRINTF("consumer about to wait for %lu\n",pp->sequence);
61 pthread_cond_wait(&ps->cons_cond,&ps->lock);
62 DPRINTF("consumer done waiting\n");
65 rc = (ps->data_len != 0);
66 if (!rc) {
67 DPRINTF("consumer saw producer is done\n");
68 if (++ps->cons_done + ps->cons_error >= ps->cons_total) {
69 pthread_cond_signal(&ps->prod_cond);
71 rc = 0;
74 pthread_mutex_unlock(&ps->lock);
75 return rc;
78 void
79 pipe_cons_signal (pipe_private *pp, int error)
81 pipe_shared *ps = pp->shared;
83 pthread_mutex_lock(&ps->lock);
84 ++pp->sequence;
85 pp->offset = 0;
87 if (error)
88 ++ps->cons_error;
89 else
90 ++ps->cons_done;
91 if (ps->cons_done + ps->cons_error >= ps->cons_total) {
92 DPRINTF("consumer signal, done %u total %u\n",
93 ps->cons_done,ps->cons_total);
94 pthread_cond_signal(&ps->prod_cond);
96 pthread_mutex_unlock(&ps->lock);
99 void
100 pipe_cons_siginit (pipe_shared *ps, int error)
102 pthread_mutex_lock(&ps->lock);
103 if (!ps->in_init) {
104 pthread_mutex_unlock(&ps->lock);
105 return;
107 if (error)
108 ++ps->cons_error;
109 else
110 ++ps->cons_done;
111 pthread_cond_broadcast(&ps->prod_cond);
112 DPRINTF("consumer init signal (total %u done %u error %u)\n",
113 ps->cons_total,ps->cons_done,ps->cons_error);
114 ps->in_init = 0;
115 pthread_mutex_unlock(&ps->lock);
119 * Return the number of bad children, or -1 if some other error.
122 pipe_prod_wait_init (pipe_shared *ps)
124 pthread_mutex_lock(&ps->lock);
125 DPRINTF("producer initializing (total %u error %u)\n",
126 ps->cons_total,ps->cons_error);
127 while (ps->cons_done + ps->cons_error < ps->cons_total) {
128 pthread_cond_broadcast(&ps->cons_cond);
129 pthread_cond_wait(&ps->prod_cond,&ps->lock);
130 DPRINTF("%u children yet to poll (total %u done %u error %u)\n",
131 ps->cons_total - (ps->cons_done + ps->cons_error),
132 ps->cons_total,ps->cons_done,ps->cons_error);
134 pthread_mutex_unlock(&ps->lock);
135 return ps->cons_error;
138 void
139 pipe_prod_signal (pipe_shared *ps, void *ptr, size_t total)
141 pthread_mutex_lock(&ps->lock);
142 if (ps->cons_error >= ps->cons_total) {
143 DPRINTF("producer posting %zu bytes as %ld, no sinks"
144 " (total %u error %u)\n",
145 total,ps->sequence+1, ps->cons_total,ps->cons_error);
146 pthread_mutex_unlock(&ps->lock);
147 return;
149 ps->data_ptr = ptr;
150 ps->data_len = total;
151 ps->cons_done = ps->cons_error;
152 ++ps->sequence;
153 DPRINTF("producer posting %zu bytes as %ld (total %u error %u)\n",
154 total,ps->sequence, ps->cons_total,ps->cons_error);
155 while (ps->cons_done + ps->cons_error < ps->cons_total) {
156 pthread_cond_broadcast(&ps->cons_cond);
157 pthread_cond_wait(&ps->prod_cond,&ps->lock);
158 DPRINTF("%u children yet to read (total %u done %u error %u)\n",
159 ps->cons_total - (ps->cons_done + ps->cons_error),
160 ps->cons_total,ps->cons_done,ps->cons_error);
162 pthread_mutex_unlock(&ps->lock);
165 void
166 pipe_prod_finish (pipe_shared *ps)
168 pthread_mutex_lock(&ps->lock);
169 ps->data_len = 0;
170 ps->cons_done = ps->cons_error;
171 ++ps->sequence;
172 DPRINTF("waiting for %u children (total %u error %u)\n",
173 ps->cons_total - (ps->cons_done + ps->cons_error),
174 ps->cons_total,ps->cons_error);
175 while (ps->cons_done + ps->cons_error < ps->cons_total) {
176 pthread_cond_broadcast(&ps->cons_cond);
177 pthread_cond_wait(&ps->prod_cond,&ps->lock);
178 DPRINTF("%u children left (total %u done %u error %u)\n",
179 ps->cons_total - (ps->cons_done + ps->cons_error),
180 ps->cons_total,ps->cons_done,ps->cons_error);
182 pthread_mutex_unlock(&ps->lock);