Patched memory leak.
[frac.git] / cf.c
blob51eca4f8f69062170b6fe460d7137f6a2caebb23
1 // Demand channels. See squint paper by McIlroy.
2 //
3 // TODO: Handle messy thread problems. What happens if a thread quits
4 // but then another tries to signal and read its channel?
5 // TODO: What if the continued fraction terminates?
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <pthread.h>
9 #include <semaphore.h>
10 #include <gmp.h>
12 struct channel_s {
13 void *data;
14 struct channel_s *next;
16 typedef struct channel_s channel_t[1];
17 typedef struct channel_s *channel_ptr;
19 struct cf_s {
20 // Each continued fraction is a separate thread.
21 pthread_t thread;
22 // Helgrind prints warnings for these condition variables.
23 // Rewrite with semaphores?
24 // When queue is empty, and there is demand for the next term.
25 sem_t demand_sem;
26 // When the queue was empty, and we just added to it.
27 pthread_cond_t read_cond;
28 pthread_mutex_t chan_mu;
29 channel_ptr chan, next;
31 int quitflag;
32 void *data;
35 typedef struct cf_s *cf_t;
37 void *cf_data(cf_t cf) {
38 return cf->data;
41 // A bit like cooperative multitasking. Continued fractions are expected
42 // to call this as often as practical, and on a return value of 0,
43 // to drop everything and stop.
44 int cf_wait(cf_t cf) {
45 for (;;) {
46 sem_wait(&cf->demand_sem);
47 // The wait is over!
48 if (cf->quitflag) {
49 return 0;
51 pthread_mutex_lock(&cf->chan_mu);
52 // ... but we keep waiting unless the channel is empty.
53 if (!cf->chan) break;
54 pthread_mutex_unlock(&cf->chan_mu);
55 // The channel could be emptied in the meantime, but that
56 // implies at least one sem_post() call, so we'll notice next iteration.
58 pthread_mutex_unlock(&cf->chan_mu);
59 return 1;
62 void cf_free(cf_t cf) {
63 // These two statements force a thread out of its next/current cf_wait.
64 cf->quitflag = 1;
65 sem_post(&cf->demand_sem);
67 pthread_join(cf->thread, NULL);
68 pthread_mutex_lock(&cf->chan_mu);
69 channel_ptr c = cf->chan;
70 while (c) {
71 channel_ptr cnext = c->next;
72 mpz_clear(c->data);
73 free(c->data);
74 free(c);
75 c = cnext;
77 pthread_mutex_unlock(&cf->chan_mu);
78 sem_destroy(&cf->demand_sem);
79 free(cf);
82 void cf_put(cf_t cf, mpz_t z) {
83 // TODO: Block or something if there's a large backlog on the queue.
84 channel_ptr cnew = malloc(sizeof(*cnew));
85 mpz_ptr znew = malloc(sizeof(*znew));
86 mpz_init(znew);
87 mpz_set(znew, z);
88 cnew->data = znew;
89 cnew->next = NULL;
90 pthread_mutex_lock(&cf->chan_mu);
91 if (cf->chan) {
92 cf->next->next = cnew;
93 } else {
94 // Channel is empty. Now that we're populating it, send signal
95 // in case someone is waiting for data.
96 cf->chan = cnew;
97 pthread_cond_signal(&cf->read_cond);
99 cf->next = cnew;
100 pthread_mutex_unlock(&cf->chan_mu);
103 void cf_put_int(cf_t cf, int n) {
104 mpz_t z;
105 mpz_init(z);
106 mpz_set_si(z, n);
107 cf_put(cf, z);
108 mpz_clear(z);
111 void cf_get(mpz_t z, cf_t cf) {
112 pthread_mutex_lock(&cf->chan_mu);
113 if (!cf->chan) {
114 // If channel is empty, send demand signal and wait for read signal.
115 sem_post(&cf->demand_sem);
116 pthread_cond_wait(&cf->read_cond, &cf->chan_mu);
118 channel_ptr c = cf->chan;
119 cf->chan = c->next;
120 pthread_mutex_unlock(&cf->chan_mu);
121 mpz_ptr znew = c->data;
122 mpz_set(z, znew);
123 mpz_clear(znew);
124 free(c->data);
125 free(c);
128 cf_t cf_new(void *(*func)(cf_t), void *data) {
129 cf_t cf = malloc(sizeof(*cf));
130 cf->chan = NULL;
131 cf->next = NULL;
132 cf->quitflag = 0;
133 cf->data = data;
134 pthread_attr_t attr;
135 pthread_attr_init(&attr);
136 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
137 pthread_mutex_init(&cf->chan_mu, NULL);
138 sem_init(&cf->demand_sem, 0, 0);
139 pthread_cond_init(&cf->read_cond, NULL);
140 pthread_create(&cf->thread, &attr, (void*(*)(void *)) func, cf);
141 pthread_attr_destroy(&attr);
142 return cf;