2bad1b5173ec8f16f59659684b97be1b3d5ddf13
[frac.git] / cf.c
blob2bad1b5173ec8f16f59659684b97be1b3d5ddf13
1 // Demand channels. See squint paper by McIlroy.
2 //
3 // TODO: Handle messy thread problems. What happens if a thread quits
4 // but then another tries to signal and read its channel?
5 // TODO: What if the continued fraction terminates?
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <pthread.h>
9 #include <semaphore.h>
10 #include <gmp.h>
12 struct channel_s {
13 void *data;
14 struct channel_s *next;
16 typedef struct channel_s channel_t[1];
17 typedef struct channel_s *channel_ptr;
19 struct cf_s {
20 // Each continued fraction is a separate thread.
21 pthread_t thread;
22 // Helgrind prints warnings for these condition variables.
23 // Rewrite with semaphores?
24 // When queue is empty, and there is demand for the next term.
25 sem_t demand_sem;
26 // When the queue was empty, and we just added to it.
27 pthread_cond_t read_cond;
28 pthread_mutex_t chan_mu;
29 channel_ptr chan, next;
31 // We break the CSP model slightly here: the sign of the continued
32 // fraction is read directly from a variable, not over channels.
33 // Only 'thread' may write to 'sign', and it should do so before the first
34 // cf_put(). Other threads should only read it after they have
35 // called cf_get() at least once.
36 int sign;
37 int quitflag;
38 void *data;
41 typedef struct cf_s *cf_t;
43 void *cf_data(cf_t cf) {
44 return cf->data;
47 int cf_sign(cf_t cf) {
48 return cf->sign;
51 int cf_flip_sign(cf_t cf) {
52 return cf->sign = -cf->sign;
55 // A bit like cooperative multitasking. Continued fractions are expected
56 // to call this as often as practical, and on a return value of 0,
57 // to drop everything and stop.
58 int cf_wait(cf_t cf) {
59 for (;;) {
60 sem_wait(&cf->demand_sem);
61 // The wait is over!
62 if (cf->quitflag) {
63 return 0;
65 pthread_mutex_lock(&cf->chan_mu);
66 // ... but we keep waiting unless the channel is empty.
67 if (!cf->chan) break;
68 pthread_mutex_unlock(&cf->chan_mu);
69 // The channel could be emptied in the meantime, but that
70 // implies at least one sem_post() call, so we'll notice next iteration.
72 pthread_mutex_unlock(&cf->chan_mu);
73 return 1;
76 void cf_free(cf_t cf) {
77 // These two statements force a thread out of its next/current cf_wait.
78 cf->quitflag = 1;
79 sem_post(&cf->demand_sem);
81 pthread_join(cf->thread, NULL);
82 pthread_mutex_lock(&cf->chan_mu);
83 channel_ptr c = cf->chan;
84 while (c) {
85 channel_ptr cnext = c->next;
86 mpz_clear(c->data);
87 free(c->data);
88 free(c);
89 c = cnext;
91 pthread_mutex_unlock(&cf->chan_mu);
92 sem_destroy(&cf->demand_sem);
93 free(cf);
96 void cf_put(cf_t cf, mpz_t z) {
97 // TODO: Block or something if there's a large backlog on the queue.
98 channel_ptr cnew = malloc(sizeof(*cnew));
99 mpz_ptr znew = malloc(sizeof(*znew));
100 mpz_init(znew);
101 mpz_set(znew, z);
102 cnew->data = znew;
103 cnew->next = NULL;
104 pthread_mutex_lock(&cf->chan_mu);
105 if (cf->chan) {
106 cf->next->next = cnew;
107 } else {
108 // Channel is empty. Now that we're populating it, send signal
109 // in case someone is waiting for data.
110 cf->chan = cnew;
111 pthread_cond_signal(&cf->read_cond);
113 cf->next = cnew;
114 pthread_mutex_unlock(&cf->chan_mu);
117 void cf_put_int(cf_t cf, int n) {
118 mpz_t z;
119 mpz_init(z);
120 mpz_set_si(z, n);
121 cf_put(cf, z);
122 mpz_clear(z);
125 void cf_get(mpz_t z, cf_t cf) {
126 pthread_mutex_lock(&cf->chan_mu);
127 if (!cf->chan) {
128 // If channel is empty, send demand signal and wait for read signal.
129 sem_post(&cf->demand_sem);
130 pthread_cond_wait(&cf->read_cond, &cf->chan_mu);
132 channel_ptr c = cf->chan;
133 cf->chan = c->next;
134 pthread_mutex_unlock(&cf->chan_mu);
135 mpz_ptr znew = c->data;
136 mpz_set(z, znew);
137 mpz_clear(znew);
138 free(c->data);
139 free(c);
142 cf_t cf_new(void *(*func)(cf_t), void *data) {
143 cf_t cf = malloc(sizeof(*cf));
144 cf->sign = 1;
145 cf->chan = NULL;
146 cf->next = NULL;
147 cf->quitflag = 0;
148 cf->data = data;
149 pthread_attr_t attr;
150 pthread_attr_init(&attr);
151 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
152 pthread_mutex_init(&cf->chan_mu, NULL);
153 sem_init(&cf->demand_sem, 0, 0);
154 pthread_cond_init(&cf->read_cond, NULL);
155 pthread_create(&cf->thread, &attr, (void*(*)(void *)) func, cf);
156 pthread_attr_destroy(&attr);
157 return cf;