1 // Demand channels. See squint paper by McIlroy.
3 // TODO: Free unread channels on exit.
4 // TODO: Handle messy thread problems. What happens if a thread quits
5 // but then another tries to signal and read its channel?
6 // TODO: What if the continued fraction terminates?
14 struct channel_s
*next
;
16 typedef struct channel_s channel_t
[1];
17 typedef struct channel_s
*channel_ptr
;
20 // Each continued fraction is a separate thread.
23 // Signal 'demand' to compute next term.
24 // 'ack' acknowledges that the signal was received,
25 // to prevent a new 'demand' signal while processing a previous one.
26 pthread_cond_t demand
, ack
;
27 pthread_mutex_t demand_mu
, ack_mu
;
30 pthread_cond_t read_cond
;
31 pthread_mutex_t chan_mu
;
32 channel_ptr chan
, next
;
38 typedef struct cf_s
*cf_t
;
40 void *cf_data(cf_t cf
) {
44 int cf_wait(cf_t cf
) {
46 // Wait for 'demand' signal.
47 pthread_cond_wait(&cf
->demand
, &cf
->demand_mu
);
48 // Acknowledge it, allowing future 'demand' signals.
49 pthread_mutex_lock(&cf
->ack_mu
);
50 pthread_cond_signal(&cf
->ack
);
51 pthread_mutex_unlock(&cf
->ack_mu
);
53 pthread_mutex_unlock(&cf
->demand_mu
);
56 pthread_mutex_lock(&cf
->chan_mu
);
57 // If there is still unread output, don't compute yet.
59 pthread_mutex_unlock(&cf
->chan_mu
);
62 pthread_mutex_unlock(&cf
->chan_mu
);
67 void cf_signal(cf_t cf
) {
68 pthread_mutex_lock(&cf
->demand_mu
);
69 pthread_cond_signal(&cf
->demand
);
70 pthread_mutex_unlock(&cf
->demand_mu
);
71 pthread_cond_wait(&cf
->ack
, &cf
->ack_mu
);
74 void cf_free(cf_t cf
) {
76 pthread_mutex_lock(&cf
->demand_mu
);
77 pthread_cond_signal(&cf
->demand
);
78 pthread_mutex_unlock(&cf
->demand_mu
);
79 pthread_cond_wait(&cf
->ack
, &cf
->ack_mu
);
80 pthread_join(cf
->thread
, NULL
);
81 pthread_mutex_destroy(&cf
->demand_mu
);
82 pthread_mutex_destroy(&cf
->ack_mu
);
83 pthread_cond_destroy(&cf
->demand
);
84 pthread_cond_destroy(&cf
->ack
);
88 void cf_put(cf_t cf
, mpz_t z
) {
89 // TODO: Block or something if there's a large backlog on the queue.
90 channel_ptr cnew
= malloc(sizeof(*cnew
));
91 size_t count
= (mpz_sizeinbase(z
, 2) + 8 - 1) / 8;
92 unsigned char *uc
= malloc(count
+ 4);
94 uc
[0] = count
>> (8 * 3);
95 uc
[1] = (count
>> (8 * 2)) & 255;
96 uc
[2] = (count
>> 8) & 255;
98 mpz_export(uc
+ 4, NULL
, 1, 1, 1, 0, z
);
100 pthread_mutex_lock(&cf
->chan_mu
);
102 cf
->next
->next
= cnew
;
104 // Channel is empty so send signal in case someone is waiting for it.
106 pthread_cond_signal(&cf
->read_cond
);
109 pthread_mutex_unlock(&cf
->chan_mu
);
112 void cf_get(mpz_t z
, cf_t cf
) {
113 pthread_mutex_lock(&cf
->chan_mu
);
115 // If channel is empty, send demand signal and wait for ready-to-read
118 pthread_cond_wait(&cf
->read_cond
, &cf
->chan_mu
);
120 channel_ptr c
= cf
->chan
;
122 unsigned char *uc
= c
->data
;
126 + (uc
[0] << (8 * 3));
127 mpz_import(z
, count
, 1, 1, 1, 0, uc
+ 4);
130 pthread_mutex_unlock(&cf
->chan_mu
);
133 cf_t
cf_new(void *(*func
)(cf_t
), void *data
) {
134 cf_t cf
= malloc(sizeof(*cf
));
136 pthread_cond_init(&cf
->demand
, NULL
);
137 pthread_mutex_init(&cf
->demand_mu
, NULL
);
138 pthread_cond_init(&cf
->ack
, NULL
);
139 pthread_mutex_init(&cf
->ack_mu
, NULL
);
140 pthread_cond_init(&cf
->read_cond
, NULL
);
141 pthread_mutex_init(&cf
->chan_mu
, NULL
);
142 pthread_mutex_lock(&cf
->demand_mu
);
143 pthread_mutex_lock(&cf
->ack_mu
);
144 pthread_attr_init(&attr
);
145 pthread_attr_setdetachstate(&attr
, PTHREAD_CREATE_JOINABLE
);
150 pthread_create(&cf
->thread
, &attr
, (void*(*)(void *)) func
, cf
);
151 pthread_attr_destroy(&attr
);