1 // Demand channels. See squint paper by McIlroy.
3 // TODO: Handle messy thread problems. What happens if a thread quits
4 // but then another tries to signal and read its channel?
5 // TODO: What if the continued fraction terminates?
14 struct channel_s
*next
;
16 typedef struct channel_s channel_t
[1];
17 typedef struct channel_s
*channel_ptr
;
20 // Each continued fraction is a separate thread.
22 // Helgrind prints warnings for these condition variables.
23 // Rewrite with semaphores?
24 // When queue is empty, and there is demand for the next term.
26 // When the queue was empty, and we just added to it.
27 pthread_cond_t read_cond
;
28 pthread_mutex_t chan_mu
;
29 channel_ptr chan
, next
;
31 // We break the CSP model slightly here: the sign of the continued
32 // fraction is read directly from a variable, not over channels.
33 // Only 'thread' may write to 'sign', and it should do so before the first
34 // cf_put(). Other threads should only read it after they have
35 // called cf_get() at least once.
41 typedef struct cf_s
*cf_t
;
43 void *cf_data(cf_t cf
) {
47 void cf_set_sign(cf_t cf
, int sign
) {
51 int cf_sign(cf_t cf
) {
55 int cf_flip_sign(cf_t cf
) {
56 return cf
->sign
= -cf
->sign
;
59 // A bit like cooperative multitasking. Continued fractions are expected
60 // to call this as often as practical, and on a return value of 0,
61 // to drop everything and stop.
62 int cf_wait(cf_t cf
) {
64 sem_wait(&cf
->demand_sem
);
69 pthread_mutex_lock(&cf
->chan_mu
);
70 // ... but we keep waiting unless the channel is empty.
72 pthread_mutex_unlock(&cf
->chan_mu
);
73 // The channel could be emptied in the meantime, but that
74 // implies at least one sem_post() call, so we'll notice next iteration.
76 pthread_mutex_unlock(&cf
->chan_mu
);
80 void cf_free(cf_t cf
) {
81 // These two statements force a thread out of its next/current cf_wait.
83 sem_post(&cf
->demand_sem
);
85 pthread_join(cf
->thread
, NULL
);
86 pthread_mutex_lock(&cf
->chan_mu
);
87 channel_ptr c
= cf
->chan
;
89 channel_ptr cnext
= c
->next
;
95 pthread_mutex_unlock(&cf
->chan_mu
);
96 sem_destroy(&cf
->demand_sem
);
100 void cf_put(cf_t cf
, mpz_t z
) {
101 // TODO: Block or something if there's a large backlog on the queue.
102 channel_ptr cnew
= malloc(sizeof(*cnew
));
103 mpz_ptr znew
= malloc(sizeof(*znew
));
108 pthread_mutex_lock(&cf
->chan_mu
);
110 cf
->next
->next
= cnew
;
112 // Channel is empty. Now that we're populating it, send signal
113 // in case someone is waiting for data.
115 pthread_cond_signal(&cf
->read_cond
);
118 pthread_mutex_unlock(&cf
->chan_mu
);
121 void cf_put_int(cf_t cf
, int n
) {
129 void cf_signal(cf_t cf
) {
130 sem_post(&cf
->demand_sem
);
132 void cf_wait_special(cf_t cf
) {
133 sem_wait(&cf
->demand_sem
);
136 void cf_get(mpz_t z
, cf_t cf
) {
137 pthread_mutex_lock(&cf
->chan_mu
);
139 // If channel is empty, send demand signal and wait for read signal.
140 sem_post(&cf
->demand_sem
);
141 pthread_cond_wait(&cf
->read_cond
, &cf
->chan_mu
);
143 channel_ptr c
= cf
->chan
;
145 pthread_mutex_unlock(&cf
->chan_mu
);
146 mpz_ptr znew
= c
->data
;
153 cf_t
cf_new(void *(*func
)(cf_t
), void *data
) {
154 cf_t cf
= malloc(sizeof(*cf
));
161 pthread_attr_init(&attr
);
162 pthread_attr_setdetachstate(&attr
, PTHREAD_CREATE_JOINABLE
);
163 pthread_mutex_init(&cf
->chan_mu
, NULL
);
164 sem_init(&cf
->demand_sem
, 0, 0);
165 pthread_cond_init(&cf
->read_cond
, NULL
);
166 pthread_create(&cf
->thread
, &attr
, (void*(*)(void *)) func
, cf
);
167 pthread_attr_destroy(&attr
);