1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 pipe_init_shared (pipe_shared
*ps
, void *owner
, unsigned short ncons
)
26 pthread_mutex_init(&ps
->lock
,NULL
);
27 pthread_cond_init(&ps
->prod_cond
,NULL
);
28 pthread_cond_init(&ps
->cons_cond
,NULL
);
33 pipe_reset (pipe_shared
*ps
, unsigned short ncons
)
37 ps
->sequence
= 0; /* TBD: randomize? */
38 ps
->cons_total
= ncons
;
41 ps
->cons_init_done
= 0;
42 ps
->cons_init_error
= 0;
43 ps
->prod_state
= PROD_INIT
;
47 pipe_init_private (pipe_shared
*ps
)
51 pp
= malloc(sizeof(*pp
));
55 * The producer might already have posted #1, so we can't use
56 * ps->sequence+1. This precludes consumers joining
57 * mid-stream, but that was never a goal anyway.
66 pipe_cons_wait (pipe_private
*pp
)
68 pipe_shared
*ps
= pp
->shared
;
72 pthread_mutex_lock(&ps
->lock
);
74 while (ps
->sequence
!= pp
->sequence
) {
75 DPRINTF("consumer about to wait for %lu\n",pp
->sequence
);
76 pthread_cond_wait(&ps
->cons_cond
,&ps
->lock
);
77 DPRINTF("consumer done waiting\n");
80 rc
= (ps
->data_len
!= 0);
82 DPRINTF("consumer saw producer is done\n");
83 if (++ps
->cons_done
+ ps
->cons_error
>= ps
->cons_total
) {
84 pthread_cond_signal(&ps
->prod_cond
);
89 pthread_mutex_unlock(&ps
->lock
);
94 pipe_cons_signal (pipe_private
*pp
, int error
)
96 pipe_shared
*ps
= pp
->shared
;
98 pthread_mutex_lock(&ps
->lock
);
108 if (ps
->cons_done
+ ps
->cons_error
>= ps
->cons_total
) {
109 DPRINTF("consumer signal, total %u done %u error %u\n",
110 ps
->cons_total
, ps
->cons_done
, ps
->cons_error
);
111 pthread_cond_signal(&ps
->prod_cond
);
113 pthread_mutex_unlock(&ps
->lock
);
117 pipe_cons_siginit (pipe_shared
*ps
, int error
)
119 pthread_mutex_lock(&ps
->lock
);
120 assert ((ps
->cons_init_done
+ ps
->cons_init_error
) < ps
->cons_total
);
122 ++ps
->cons_init_error
;
125 ++ps
->cons_init_done
;
127 pthread_cond_broadcast(&ps
->prod_cond
);
128 DPRINTF("consumer init signal (total %u done %u error %u)\n",
129 ps
->cons_total
,ps
->cons_done
,ps
->cons_error
);
130 pthread_mutex_unlock(&ps
->lock
);
134 * Return the number of bad children, or -1 if some other error.
137 pipe_prod_wait_init (pipe_shared
*ps
)
139 pthread_mutex_lock(&ps
->lock
);
140 DPRINTF("producer initializing (total %u done %u error %u)\n",
141 ps
->cons_total
, ps
->cons_init_done
, ps
->cons_init_error
);
142 while (ps
->cons_init_done
+ ps
->cons_init_error
< ps
->cons_total
) {
143 pthread_cond_broadcast(&ps
->cons_cond
);
144 pthread_cond_wait(&ps
->prod_cond
,&ps
->lock
);
145 DPRINTF(" after sleep (total %u done %u error %u)\n",
146 ps
->cons_total
,ps
->cons_init_done
,ps
->cons_init_error
);
148 pthread_mutex_unlock(&ps
->lock
);
149 return ps
->cons_error
;
153 pipe_prod_signal (pipe_shared
*ps
, void *ptr
, size_t total
)
155 pthread_mutex_lock(&ps
->lock
);
156 if (ps
->cons_error
>= ps
->cons_total
) {
157 DPRINTF("producer posting %zu bytes as %ld, no sinks"
158 " (total %u error %u)\n",
159 total
,ps
->sequence
+1, ps
->cons_total
,ps
->cons_error
);
160 pthread_mutex_unlock(&ps
->lock
);
164 ps
->data_len
= total
;
165 ps
->cons_done
= ps
->cons_error
;
167 DPRINTF("producer posting %zu bytes as %ld (total %u error %u)\n",
168 total
,ps
->sequence
, ps
->cons_total
,ps
->cons_error
);
169 while (ps
->cons_done
+ ps
->cons_error
< ps
->cons_total
) {
170 pthread_cond_broadcast(&ps
->cons_cond
);
171 pthread_cond_wait(&ps
->prod_cond
,&ps
->lock
);
172 DPRINTF("%u children yet to read (total %u done %u error %u)\n",
173 ps
->cons_total
- (ps
->cons_done
+ ps
->cons_error
),
174 ps
->cons_total
,ps
->cons_done
,ps
->cons_error
);
176 pthread_mutex_unlock(&ps
->lock
);
180 pipe_prod_siginit (pipe_shared
*ps
, int err
)
182 pthread_mutex_lock(&ps
->lock
);
183 assert (ps
->prod_state
== PROD_INIT
);
184 ps
->prod_state
= (err
>= 0) ? PROD_RUNNING
: PROD_ERROR
;
185 pthread_cond_broadcast(&ps
->cons_cond
);
186 pthread_mutex_unlock(&ps
->lock
);
190 pipe_cons_wait_init (pipe_shared
*ps
)
192 pthread_mutex_lock(&ps
->lock
);
193 DPRINTF("consumer initing\n");
194 while (ps
->prod_state
== PROD_INIT
) {
195 pthread_cond_broadcast(&ps
->prod_cond
);
196 pthread_cond_wait(&ps
->cons_cond
,&ps
->lock
);
197 DPRINTF(" after sleep (state = %u)\n",ps
->prod_state
);
199 pthread_mutex_unlock(&ps
->lock
);
200 return (ps
->prod_state
== PROD_ERROR
);
204 pipe_prod_finish (pipe_shared
*ps
)
206 pthread_mutex_lock(&ps
->lock
);
208 ps
->cons_done
= ps
->cons_error
;
210 DPRINTF("waiting for %u children (total %u error %u)\n",
211 ps
->cons_total
- (ps
->cons_done
+ ps
->cons_error
),
212 ps
->cons_total
,ps
->cons_error
);
213 while (ps
->cons_done
+ ps
->cons_error
< ps
->cons_total
) {
214 pthread_cond_broadcast(&ps
->cons_cond
);
215 pthread_cond_wait(&ps
->prod_cond
,&ps
->lock
);
216 DPRINTF("%u children left (total %u done %u error %u)\n",
217 ps
->cons_total
- (ps
->cons_done
+ ps
->cons_error
),
218 ps
->cons_total
,ps
->cons_done
,ps
->cons_error
);
220 pthread_mutex_unlock(&ps
->lock
);
221 DPRINTF("producer finished with sequence %ld\n",ps
->sequence
);