Cleaned up lock mess monster.
[frac.git] / cf.c
blob4879144b4efb7a63e9ff0ea06253f582bbb563e9
1 // Demand channels. See squint paper by McIlroy.
2 //
3 // TODO: Free unread channels on exit.
4 // TODO: Handle messy thread problems. What happens if a thread quits
5 // but then another tries to signal and read its channel?
6 // TODO: What if the continued fraction terminates?
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <pthread.h>
10 #include <gmp.h>
12 struct channel_s {
13 void *data;
14 struct channel_s *next;
16 typedef struct channel_s channel_t[1];
17 typedef struct channel_s *channel_ptr;
19 struct cf_s {
20 // Each continued fraction is a separate thread.
21 pthread_t thread;
23 // Signal demand to compute next term.
24 pthread_cond_t demand;
26 // Demand channel.
27 pthread_cond_t read_cond;
28 pthread_mutex_t chan_mu;
29 channel_ptr chan, next;
31 int quitflag;
32 void *data;
35 typedef struct cf_s *cf_t;
37 void *cf_data(cf_t cf) {
38 return cf->data;
41 int cf_wait(cf_t cf) {
42 pthread_mutex_lock(&cf->chan_mu);
43 if (cf->chan) {
44 // If there is still unread output, wait for 'demand' signal.
45 pthread_cond_wait(&cf->demand, &cf->chan_mu);
47 pthread_mutex_unlock(&cf->chan_mu);
48 if (cf->quitflag) {
49 return 0;
51 return 1;
54 void cf_free(cf_t cf) {
55 cf->quitflag = 1;
56 pthread_cond_signal(&cf->demand);
57 pthread_join(cf->thread, NULL);
58 pthread_cond_destroy(&cf->demand);
59 free(cf);
62 void cf_put(cf_t cf, mpz_t z) {
63 // TODO: Block or something if there's a large backlog on the queue.
64 channel_ptr cnew = malloc(sizeof(*cnew));
65 if (!mpz_sgn(z)) {
66 unsigned char *uc = malloc(4);
67 uc[0] = uc[1] = uc[2] = uc[3] = 0;
68 cnew->data=uc;
69 } else {
70 size_t count = (mpz_sizeinbase(z, 2) + 8 - 1) / 8;
71 unsigned char *uc = malloc(count + 4);
72 cnew->data = uc;
73 uc[0] = count >> (8 * 3);
74 uc[1] = (count >> (8 * 2)) & 255;
75 uc[2] = (count >> 8) & 255;
76 uc[3] = count & 255;
77 mpz_export(uc + 4, NULL, 1, 1, 1, 0, z);
79 cnew->next = NULL;
80 pthread_mutex_lock(&cf->chan_mu);
81 if (cf->chan) {
82 cf->next->next = cnew;
83 } else {
84 // Channel is empty. Now that we're populating it, send signal
85 // in case someone is waiting for data.
86 cf->chan = cnew;
87 pthread_cond_signal(&cf->read_cond);
89 cf->next = cnew;
90 pthread_mutex_unlock(&cf->chan_mu);
93 void cf_get(mpz_t z, cf_t cf) {
94 pthread_mutex_lock(&cf->chan_mu);
95 if (!cf->chan) {
96 // If channel is empty, send demand signal and wait for read signal.
97 pthread_cond_signal(&cf->demand);
98 pthread_cond_wait(&cf->read_cond, &cf->chan_mu);
100 channel_ptr c = cf->chan;
101 cf->chan = c->next;
102 pthread_mutex_unlock(&cf->chan_mu);
103 unsigned char *uc = c->data;
104 size_t count = uc[3]
105 + (uc[2] << 8)
106 + (uc[1] << (8 * 2))
107 + (uc[0] << (8 * 3));
108 if (count) mpz_import(z, count, 1, 1, 1, 0, uc + 4);
109 else mpz_set_ui(z, 0);
110 free(c->data);
111 free(c);
114 cf_t cf_new(void *(*func)(cf_t), void *data) {
115 cf_t cf = malloc(sizeof(*cf));
116 pthread_attr_t attr;
117 pthread_cond_init(&cf->demand, NULL);
118 pthread_cond_init(&cf->read_cond, NULL);
119 pthread_mutex_init(&cf->chan_mu, NULL);
120 pthread_attr_init(&attr);
121 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
122 cf->chan = NULL;
123 cf->next = NULL;
124 cf->quitflag = 0;
125 cf->data = data;
126 pthread_create(&cf->thread, &attr, (void*(*)(void *)) func, cf);
127 pthread_attr_destroy(&attr);
128 return cf;