1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
22 pipe_init_shared (pipe_shared
*ps
, void *owner
, unsigned short ncons
)
25 pthread_mutex_init(&ps
->lock
,NULL
);
26 pthread_cond_init(&ps
->prod_cond
,NULL
);
27 pthread_cond_init(&ps
->cons_cond
,NULL
);
30 ps
->sequence
= 0; /* TBD: randomize? */
32 ps
->cons_total
= ncons
;
38 pipe_init_private (pipe_shared
*ps
)
42 pp
= malloc(sizeof(*pp
));
45 pp
->sequence
= ps
->sequence
+ 1;
52 pipe_cons_wait (pipe_private
*pp
)
54 pipe_shared
*ps
= pp
->shared
;
57 pthread_mutex_lock(&ps
->lock
);
59 while (ps
->sequence
!= pp
->sequence
) {
60 DPRINTF("consumer about to wait for %lu\n",pp
->sequence
);
61 pthread_cond_wait(&ps
->cons_cond
,&ps
->lock
);
62 DPRINTF("consumer done waiting\n");
65 rc
= (ps
->data_len
!= 0);
67 DPRINTF("consumer saw producer is done\n");
68 if (++ps
->cons_done
+ ps
->cons_error
>= ps
->cons_total
) {
69 pthread_cond_signal(&ps
->prod_cond
);
74 pthread_mutex_unlock(&ps
->lock
);
79 pipe_cons_signal (pipe_private
*pp
, int error
)
81 pipe_shared
*ps
= pp
->shared
;
83 pthread_mutex_lock(&ps
->lock
);
91 if (ps
->cons_done
+ ps
->cons_error
>= ps
->cons_total
) {
92 DPRINTF("consumer signal, done %u total %u\n",
93 ps
->cons_done
,ps
->cons_total
);
94 pthread_cond_signal(&ps
->prod_cond
);
96 pthread_mutex_unlock(&ps
->lock
);
100 pipe_cons_siginit (pipe_shared
*ps
, int error
)
102 pthread_mutex_lock(&ps
->lock
);
104 pthread_mutex_unlock(&ps
->lock
);
111 pthread_cond_broadcast(&ps
->prod_cond
);
112 DPRINTF("consumer init signal (total %u done %u error %u)\n",
113 ps
->cons_total
,ps
->cons_done
,ps
->cons_error
);
115 pthread_mutex_unlock(&ps
->lock
);
119 * Return the number of bad children, or -1 if some other error.
122 pipe_prod_wait_init (pipe_shared
*ps
)
124 pthread_mutex_lock(&ps
->lock
);
125 DPRINTF("producer initializing (total %u error %u)\n",
126 ps
->cons_total
,ps
->cons_error
);
127 while (ps
->cons_done
+ ps
->cons_error
< ps
->cons_total
) {
128 pthread_cond_broadcast(&ps
->cons_cond
);
129 pthread_cond_wait(&ps
->prod_cond
,&ps
->lock
);
130 DPRINTF("%u children yet to poll (total %u done %u error %u)\n",
131 ps
->cons_total
- (ps
->cons_done
+ ps
->cons_error
),
132 ps
->cons_total
,ps
->cons_done
,ps
->cons_error
);
134 pthread_mutex_unlock(&ps
->lock
);
135 return ps
->cons_error
;
139 pipe_prod_signal (pipe_shared
*ps
, void *ptr
, size_t total
)
141 pthread_mutex_lock(&ps
->lock
);
142 if (ps
->cons_error
>= ps
->cons_total
) {
143 DPRINTF("producer posting %zu bytes as %ld, no sinks"
144 " (total %u error %u)\n",
145 total
,ps
->sequence
+1, ps
->cons_total
,ps
->cons_error
);
146 pthread_mutex_unlock(&ps
->lock
);
150 ps
->data_len
= total
;
151 ps
->cons_done
= ps
->cons_error
;
153 DPRINTF("producer posting %zu bytes as %ld (total %u error %u)\n",
154 total
,ps
->sequence
, ps
->cons_total
,ps
->cons_error
);
155 while (ps
->cons_done
+ ps
->cons_error
< ps
->cons_total
) {
156 pthread_cond_broadcast(&ps
->cons_cond
);
157 pthread_cond_wait(&ps
->prod_cond
,&ps
->lock
);
158 DPRINTF("%u children yet to read (total %u done %u error %u)\n",
159 ps
->cons_total
- (ps
->cons_done
+ ps
->cons_error
),
160 ps
->cons_total
,ps
->cons_done
,ps
->cons_error
);
162 pthread_mutex_unlock(&ps
->lock
);
166 pipe_prod_finish (pipe_shared
*ps
)
168 pthread_mutex_lock(&ps
->lock
);
170 ps
->cons_done
= ps
->cons_error
;
172 DPRINTF("waiting for %u children (total %u error %u)\n",
173 ps
->cons_total
- (ps
->cons_done
+ ps
->cons_error
),
174 ps
->cons_total
,ps
->cons_error
);
175 while (ps
->cons_done
+ ps
->cons_error
< ps
->cons_total
) {
176 pthread_cond_broadcast(&ps
->cons_cond
);
177 pthread_cond_wait(&ps
->prod_cond
,&ps
->lock
);
178 DPRINTF("%u children left (total %u done %u error %u)\n",
179 ps
->cons_total
- (ps
->cons_done
+ ps
->cons_error
),
180 ps
->cons_total
,ps
->cons_done
,ps
->cons_error
);
182 pthread_mutex_unlock(&ps
->lock
);