usr.sbin/makefs: Sync with sys/vfs/hammer2
[dragonfly.git] / usr.bin / dsynth / bulk.c
blob3f2959714331770e31180a171243663679d731bc
1 /*
2 * Copyright (c) 2019 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * This code uses concepts and configuration based on 'synth', by
8 * John R. Marino <draco@marino.st>, which was written in ada.
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
14 * 1. Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
16 * 2. Redistributions in binary form must reproduce the above copyright
17 * notice, this list of conditions and the following disclaimer in
18 * the documentation and/or other materials provided with the
19 * distribution.
20 * 3. Neither the name of The DragonFly Project nor the names of its
21 * contributors may be used to endorse or promote products derived
22 * from this software without specific, prior written permission.
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
28 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
29 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
30 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
31 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
32 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
33 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
34 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * SUCH DAMAGE.
37 #include "dsynth.h"
39 typedef struct job {
40 pthread_t td;
41 pthread_cond_t cond;
42 bulk_t *active;
43 int terminate : 1;
44 } job_t;
47 * Most of these globals are locked with BulkMutex
49 static int BulkScanJob;
50 static int BulkCurJobs;
51 static int BulkMaxJobs;
52 static job_t JobsAry[MAXBULK];
53 static void (*BulkFunc)(bulk_t *bulk);
54 static bulk_t *BulkSubmit;
55 static bulk_t **BulkSubmitTail = &BulkSubmit;
56 static bulk_t *BulkResponse;
57 static bulk_t **BulkResponseTail = &BulkResponse;
58 static pthread_cond_t BulkResponseCond;
59 static pthread_mutex_t BulkMutex;
61 static void bulkstart(void);
62 #if 0
63 static int readall(int fd, void *buf, size_t bytes);
64 static int writeall(int fd, const void *buf, size_t bytes);
65 #endif
66 static void *bulkthread(void *arg);
69 * Initialize for bulk scan operations. Always paired with donebulk()
71 void
72 initbulk(void (*func)(bulk_t *bulk), int jobs)
74 int i;
76 if (jobs > MAXBULK)
77 jobs = MAXBULK;
79 ddassert(BulkSubmit == NULL);
80 BulkCurJobs = 0;
81 BulkMaxJobs = jobs;
82 BulkFunc = func;
83 BulkScanJob = 0;
85 addbuildenv("__MAKE_CONF", "/dev/null",
86 BENV_ENVIRONMENT | BENV_PKGLIST);
89 * CCache is a horrible unreliable hack but... leave the
90 * mechanism in-place in case someone has a death wish.
92 if (UseCCache) {
93 addbuildenv("WITH_CCACHE_BUILD", "yes", BENV_MAKECONF);
94 addbuildenv("CCACHE_DIR", CCachePath, BENV_MAKECONF);
97 pthread_mutex_init(&BulkMutex, NULL);
98 pthread_cond_init(&BulkResponseCond, NULL);
100 pthread_mutex_lock(&BulkMutex);
101 for (i = 0; i < jobs; ++i) {
102 pthread_cond_init(&JobsAry[i].cond, NULL);
103 pthread_create(&JobsAry[i].td, NULL, bulkthread, &JobsAry[i]);
105 pthread_mutex_unlock(&BulkMutex);
108 void
109 donebulk(void)
111 bulk_t *bulk;
112 int i;
114 pthread_mutex_lock(&BulkMutex);
115 while ((bulk = BulkSubmit) != NULL) {
116 BulkSubmit = bulk->next;
117 freebulk(bulk);
119 BulkSubmitTail = &BulkSubmit;
121 for (i = 0; i < BulkMaxJobs; ++i) {
122 JobsAry[i].terminate = 1;
123 pthread_cond_signal(&JobsAry[i].cond);
125 pthread_mutex_unlock(&BulkMutex);
126 for (i = 0; i < BulkMaxJobs; ++i) {
127 pthread_join(JobsAry[i].td, NULL);
128 pthread_cond_destroy(&JobsAry[i].cond);
129 if (JobsAry[i].active) {
130 freebulk(JobsAry[i].active);
131 JobsAry[i].active = NULL;
132 pthread_mutex_lock(&BulkMutex);
133 --BulkCurJobs;
134 pthread_mutex_unlock(&BulkMutex);
136 JobsAry[i].terminate = 0;
138 ddassert(BulkCurJobs == 0);
140 while ((bulk = BulkResponse) != NULL) {
141 BulkResponse = bulk->next;
142 freebulk(bulk);
144 BulkResponseTail = &BulkResponse;
146 BulkFunc = NULL;
148 bzero(JobsAry, sizeof(JobsAry));
150 if (UseCCache) {
151 delbuildenv("WITH_CCACHE_BUILD");
152 delbuildenv("CCACHE_DIR");
154 delbuildenv("__MAKE_CONF");
157 void
158 queuebulk(const char *s1, const char *s2, const char *s3, const char *s4)
160 bulk_t *bulk;
162 bulk = calloc(1, sizeof(*bulk));
163 if (s1)
164 bulk->s1 = strdup(s1);
165 if (s2)
166 bulk->s2 = strdup(s2);
167 if (s3)
168 bulk->s3 = strdup(s3);
169 if (s4)
170 bulk->s4 = strdup(s4);
171 bulk->state = ONSUBMIT;
173 pthread_mutex_lock(&BulkMutex);
174 *BulkSubmitTail = bulk;
175 BulkSubmitTail = &bulk->next;
176 if (BulkCurJobs < BulkMaxJobs) {
177 pthread_mutex_unlock(&BulkMutex);
178 bulkstart();
179 } else {
180 pthread_mutex_unlock(&BulkMutex);
185 * Fill any idle job slots with new jobs as available.
187 static
188 void
189 bulkstart(void)
191 bulk_t *bulk;
192 int i;
194 pthread_mutex_lock(&BulkMutex);
195 while ((bulk = BulkSubmit) != NULL && BulkCurJobs < BulkMaxJobs) {
196 i = BulkScanJob + 1;
197 for (;;) {
198 i = i % BulkMaxJobs;
199 if (JobsAry[i].active == NULL)
200 break;
201 ++i;
203 BulkScanJob = i;
204 BulkSubmit = bulk->next;
205 if (BulkSubmit == NULL)
206 BulkSubmitTail = &BulkSubmit;
208 bulk->state = ONRUN;
209 JobsAry[i].active = bulk;
210 pthread_cond_signal(&JobsAry[i].cond);
211 ++BulkCurJobs;
213 pthread_mutex_unlock(&BulkMutex);
217 * Retrieve completed job or job with activity
219 bulk_t *
220 getbulk(void)
222 bulk_t *bulk;
224 pthread_mutex_lock(&BulkMutex);
225 while (BulkCurJobs && BulkResponse == NULL) {
226 pthread_cond_wait(&BulkResponseCond, &BulkMutex);
228 if (BulkResponse) {
229 bulk = BulkResponse;
230 ddassert(bulk->state == ONRESPONSE);
231 BulkResponse = bulk->next;
232 if (BulkResponse == NULL)
233 BulkResponseTail = &BulkResponse;
234 bulk->state = UNLISTED;
235 } else {
236 bulk = NULL;
238 pthread_mutex_unlock(&BulkMutex);
239 bulkstart();
241 return bulk;
244 void
245 freebulk(bulk_t *bulk)
247 ddassert(bulk->state == UNLISTED);
249 if (bulk->s1) {
250 free(bulk->s1);
251 bulk->s1 = NULL;
253 if (bulk->s2) {
254 free(bulk->s2);
255 bulk->s2 = NULL;
257 if (bulk->s3) {
258 free(bulk->s3);
259 bulk->s3 = NULL;
261 if (bulk->s4) {
262 free(bulk->s4);
263 bulk->s4 = NULL;
265 if (bulk->r1) {
266 free(bulk->r1);
267 bulk->r1 = NULL;
269 if (bulk->r2) {
270 free(bulk->r2);
271 bulk->r2 = NULL;
273 if (bulk->r3) {
274 free(bulk->r3);
275 bulk->r3 = NULL;
277 if (bulk->r4) {
278 free(bulk->r4);
279 bulk->r4 = NULL;
281 free(bulk);
284 #if 0
287 * Returns non-zero if unable to read specified number of bytes
289 static
291 readall(int fd, void *buf, size_t bytes)
293 ssize_t r;
295 for (;;) {
296 r = read(fd, buf, bytes);
297 if (r == (ssize_t)bytes)
298 break;
299 if (r > 0) {
300 buf = (char *)buf + r;
301 bytes -= r;
302 continue;
304 if (r < 0 && errno == EINTR)
305 continue;
306 return 1;
308 return 0;
311 static
313 writeall(int fd, const void *buf, size_t bytes)
315 ssize_t r;
317 for (;;) {
318 r = write(fd, buf, bytes);
319 if (r == (ssize_t)bytes)
320 break;
321 if (r > 0) {
322 buf = (const char *)buf + r;
323 bytes -= r;
324 continue;
326 if (r < 0 && errno == EINTR)
327 continue;
328 return 1;
330 return 0;
333 #endif
335 static void *
336 bulkthread(void *arg)
338 job_t *job = arg;
339 bulk_t *bulk;
341 pthread_mutex_lock(&BulkMutex);
342 for (;;) {
343 if (job->terminate)
344 break;
345 if (job->active == NULL)
346 pthread_cond_wait(&job->cond, &BulkMutex);
347 bulk = job->active;
348 if (bulk) {
349 bulk->state = ISRUNNING;
351 pthread_mutex_unlock(&BulkMutex);
352 BulkFunc(bulk);
353 pthread_mutex_lock(&BulkMutex);
355 bulk->state = ONRESPONSE;
356 bulk->next = NULL;
357 *BulkResponseTail = bulk;
358 BulkResponseTail = &bulk->next;
359 --BulkCurJobs;
360 pthread_cond_signal(&BulkResponseCond);
364 * Optimization - automatically fetch the next job
366 if ((bulk = BulkSubmit) != NULL && job->terminate == 0) {
367 BulkSubmit = bulk->next;
368 if (BulkSubmit == NULL)
369 BulkSubmitTail = &BulkSubmit;
370 bulk->state = ONRUN;
371 job->active = bulk;
372 ++BulkCurJobs;
373 } else {
374 job->active = NULL;
377 pthread_mutex_unlock(&BulkMutex);
379 return NULL;