1 // Demand channels. See squint paper by McIlroy.
3 // TODO: Free unread channels on exit.
4 // TODO: Handle messy thread problems. What happens if a thread quits
5 // but then another tries to signal and read its channel?
6 // TODO: What if the continued fraction terminates?
14 struct channel_s
*next
;
16 typedef struct channel_s channel_t
[1];
17 typedef struct channel_s
*channel_ptr
;
20 // Each continued fraction is a separate thread.
23 // Signal demand to compute next term.
24 pthread_cond_t demand
;
27 pthread_cond_t read_cond
;
28 pthread_mutex_t chan_mu
;
29 channel_ptr chan
, next
;
35 typedef struct cf_s
*cf_t
;
37 void *cf_data(cf_t cf
) {
41 int cf_wait(cf_t cf
) {
42 pthread_mutex_lock(&cf
->chan_mu
);
44 // If there is still unread output, wait for 'demand' signal.
45 pthread_cond_wait(&cf
->demand
, &cf
->chan_mu
);
47 pthread_mutex_unlock(&cf
->chan_mu
);
54 void cf_free(cf_t cf
) {
56 pthread_cond_signal(&cf
->demand
);
57 pthread_join(cf
->thread
, NULL
);
58 pthread_cond_destroy(&cf
->demand
);
62 void cf_put(cf_t cf
, mpz_t z
) {
63 // TODO: Block or something if there's a large backlog on the queue.
64 channel_ptr cnew
= malloc(sizeof(*cnew
));
66 unsigned char *uc
= malloc(4);
67 uc
[0] = uc
[1] = uc
[2] = uc
[3] = 0;
70 size_t count
= (mpz_sizeinbase(z
, 2) + 8 - 1) / 8;
71 unsigned char *uc
= malloc(count
+ 4);
73 uc
[0] = count
>> (8 * 3);
74 uc
[1] = (count
>> (8 * 2)) & 255;
75 uc
[2] = (count
>> 8) & 255;
77 mpz_export(uc
+ 4, NULL
, 1, 1, 1, 0, z
);
80 pthread_mutex_lock(&cf
->chan_mu
);
82 cf
->next
->next
= cnew
;
84 // Channel is empty. Now that we're populating it, send signal
85 // in case someone is waiting for data.
87 pthread_cond_signal(&cf
->read_cond
);
90 pthread_mutex_unlock(&cf
->chan_mu
);
93 void cf_get(mpz_t z
, cf_t cf
) {
94 pthread_mutex_lock(&cf
->chan_mu
);
96 // If channel is empty, send demand signal and wait for read signal.
97 pthread_cond_signal(&cf
->demand
);
98 pthread_cond_wait(&cf
->read_cond
, &cf
->chan_mu
);
100 channel_ptr c
= cf
->chan
;
102 pthread_mutex_unlock(&cf
->chan_mu
);
103 unsigned char *uc
= c
->data
;
107 + (uc
[0] << (8 * 3));
108 if (count
) mpz_import(z
, count
, 1, 1, 1, 0, uc
+ 4);
109 else mpz_set_ui(z
, 0);
114 cf_t
cf_new(void *(*func
)(cf_t
), void *data
) {
115 cf_t cf
= malloc(sizeof(*cf
));
117 pthread_cond_init(&cf
->demand
, NULL
);
118 pthread_cond_init(&cf
->read_cond
, NULL
);
119 pthread_mutex_init(&cf
->chan_mu
, NULL
);
120 pthread_attr_init(&attr
);
121 pthread_attr_setdetachstate(&attr
, PTHREAD_CREATE_JOINABLE
);
126 pthread_create(&cf
->thread
, &attr
, (void*(*)(void *)) func
, cf
);
127 pthread_attr_destroy(&attr
);