Merged signal and get.
[frac.git] / cf.c
blob9dc9773175cdcb7d8b31e3ad8613d0fed65437fd
1 // Demand channels. See squint paper by McIlroy.
2 //
3 // TODO: Free unread channels on exit.
4 // TODO: Handle messy thread problems. What happens if a thread quits
5 // but then another tries to signal and read its channel?
6 // TODO: What if the continued fraction terminates?
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <pthread.h>
10 #include <gmp.h>
12 struct channel_s {
13 void *data;
14 struct channel_s *next;
16 typedef struct channel_s channel_t[1];
17 typedef struct channel_s *channel_ptr;
19 struct cf_s {
20 // Each continued fraction is a separate thread.
21 pthread_t thread;
23 // Signal 'demand' to compute next term.
24 // 'ack' acknowledges that the signal was received,
25 // to prevent a new 'demand' signal while processing a previous one.
26 pthread_cond_t demand, ack;
27 pthread_mutex_t demand_mu, ack_mu;
29 // Demand channel.
30 pthread_cond_t read_cond;
31 pthread_mutex_t chan_mu;
32 channel_ptr chan, next;
34 int quitflag;
35 void *data;
38 typedef struct cf_s *cf_t;
40 void *cf_data(cf_t cf) {
41 return cf->data;
44 int cf_wait(cf_t cf) {
45 for (;;) {
46 // Wait for 'demand' signal.
47 pthread_cond_wait(&cf->demand, &cf->demand_mu);
48 // Acknowledge it, allowing future 'demand' signals.
49 pthread_mutex_lock(&cf->ack_mu);
50 pthread_cond_signal(&cf->ack);
51 pthread_mutex_unlock(&cf->ack_mu);
52 if (cf->quitflag) {
53 pthread_mutex_unlock(&cf->demand_mu);
54 return 0;
56 pthread_mutex_lock(&cf->chan_mu);
57 // If there is still unread output, don't compute yet.
58 if (cf->chan) {
59 pthread_mutex_unlock(&cf->chan_mu);
60 continue;
62 pthread_mutex_unlock(&cf->chan_mu);
63 return 1;
67 void cf_signal(cf_t cf) {
68 pthread_mutex_lock(&cf->demand_mu);
69 pthread_cond_signal(&cf->demand);
70 pthread_mutex_unlock(&cf->demand_mu);
71 pthread_cond_wait(&cf->ack, &cf->ack_mu);
74 void cf_free(cf_t cf) {
75 cf->quitflag = 1;
76 pthread_mutex_lock(&cf->demand_mu);
77 pthread_cond_signal(&cf->demand);
78 pthread_mutex_unlock(&cf->demand_mu);
79 pthread_cond_wait(&cf->ack, &cf->ack_mu);
80 pthread_join(cf->thread, NULL);
81 pthread_mutex_destroy(&cf->demand_mu);
82 pthread_mutex_destroy(&cf->ack_mu);
83 pthread_cond_destroy(&cf->demand);
84 pthread_cond_destroy(&cf->ack);
85 free(cf);
88 void cf_put(cf_t cf, mpz_t z) {
89 // TODO: Block or something if there's a large backlog on the queue.
90 channel_ptr cnew = malloc(sizeof(*cnew));
91 size_t count = (mpz_sizeinbase(z, 2) + 8 - 1) / 8;
92 unsigned char *uc = malloc(count + 4);
93 cnew->data = uc;
94 uc[0] = count >> (8 * 3);
95 uc[1] = (count >> (8 * 2)) & 255;
96 uc[2] = (count >> 8) & 255;
97 uc[3] = count & 255;
98 mpz_export(uc + 4, NULL, 1, 1, 1, 0, z);
99 cnew->next = NULL;
100 pthread_mutex_lock(&cf->chan_mu);
101 if (cf->chan) {
102 cf->next->next = cnew;
103 } else {
104 // Channel is empty so send signal in case someone is waiting for it.
105 cf->chan = cnew;
106 pthread_cond_signal(&cf->read_cond);
108 cf->next = cnew;
109 pthread_mutex_unlock(&cf->chan_mu);
112 void cf_get(mpz_t z, cf_t cf) {
113 pthread_mutex_lock(&cf->chan_mu);
114 if (!cf->chan) {
115 // If channel is empty, send demand signal and wait for ready-to-read
116 // signal.
117 cf_signal(cf);
118 pthread_cond_wait(&cf->read_cond, &cf->chan_mu);
120 channel_ptr c = cf->chan;
121 cf->chan = c->next;
122 unsigned char *uc = c->data;
123 size_t count = uc[3]
124 + (uc[2] << 8)
125 + (uc[1] << (8 * 2))
126 + (uc[0] << (8 * 3));
127 mpz_import(z, count, 1, 1, 1, 0, uc + 4);
128 free(c->data);
129 free(c);
130 pthread_mutex_unlock(&cf->chan_mu);
133 cf_t cf_new(void *(*func)(cf_t), void *data) {
134 cf_t cf = malloc(sizeof(*cf));
135 pthread_attr_t attr;
136 pthread_cond_init(&cf->demand, NULL);
137 pthread_mutex_init(&cf->demand_mu, NULL);
138 pthread_cond_init(&cf->ack, NULL);
139 pthread_mutex_init(&cf->ack_mu, NULL);
140 pthread_cond_init(&cf->read_cond, NULL);
141 pthread_mutex_init(&cf->chan_mu, NULL);
142 pthread_mutex_lock(&cf->demand_mu);
143 pthread_mutex_lock(&cf->ack_mu);
144 pthread_attr_init(&attr);
145 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
146 cf->chan = NULL;
147 cf->next = NULL;
148 cf->quitflag = 0;
149 cf->data = data;
150 pthread_create(&cf->thread, &attr, (void*(*)(void *)) func, cf);
151 pthread_attr_destroy(&attr);
152 return cf;