libc: Fix up MLINKS for the recent upgrade of vis.3 and unvis.3.
[dragonfly.git] / sbin / jscan / jstream.c
blobb1f94e5c51927549a92debd76b77bd2d9baaf8d0
1 /*
2 * Copyright (c) 2004,2005 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
34 * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $
37 #include "jscan.h"
39 static struct jhash *JHashAry[JHASH_SIZE];
41 static void jnormalize(struct jstream *js);
42 static int jaddrecord_backtrack(struct jsession *ss, struct jdata *jd);
45 * Integrate a raw record. Deal with the transaction begin and end flags
46 * to create a forward-referenced collection of jstream records. If we are
47 * able to complete a transaction, the first js associated with that
48 * transaction is returned.
50 * XXX we need to store the data for very large multi-record transactions
51 * separately since it might not fit into memory.
53 * Note that a transaction might represent a huge I/O operation, resulting
54 * in an overall node structure that spans gigabytes, but individual
55 * subrecord leaf nodes are limited in size and we depend on this to simplify
56 * the handling of leaf records.
58 * A transaction may cover several raw records. The jstream collection for
59 * a transaction is only returned when the entire transaction has been
60 * successfully scanned. Due to the interleaving of transactions the ordering
61 * of returned JS's may be different (not exactly reversed) when scanning a
62 * journal backwards verses forwards. Since parallel operations are
63 * theoretically non-conflicting, this should not present a problem.
65 struct jstream *
66 jaddrecord(struct jsession *ss, struct jdata *jd)
68 struct journal_rawrecbeg *head;
69 struct jstream *js;
70 struct jhash *jh;
71 struct jhash **jhp;
73 js = malloc(sizeof(struct jstream));
74 bzero(js, sizeof(struct jstream));
75 js->js_jdata = jref(jd);
76 js->js_head = (void *)jd->jd_data;
77 js->js_session = ss;
78 head = js->js_head;
81 * Check for a completely self-contained transaction, just return the
82 * js if possible.
84 if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
85 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
86 ) {
87 jnormalize(js);
88 return (js);
91 retry:
93 * Check for an open transaction in the hash table.
95 jhp = &JHashAry[head->streamid & JHASH_MASK];
96 while ((jh = *jhp) != NULL) {
97 if (jh->jh_session == ss &&
98 ((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0
99 ) {
100 break;
102 jhp = &jh->jh_hash;
106 * We might have picked up a transaction in the middle, which we
107 * detect by not finding a hash record coupled with a raw record
108 * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END
109 * bit if we are scanning backwards).
111 * When this case occurs we have to backtrack to locate the
112 * BEGIN (END if scanning backwards) and collect those records
113 * in order to obtain a complete transaction.
115 * This case most often occurs when a batch operation runs in
116 * prefix set whos starting raw-record transaction id is in
117 * the middle of one or more meta-transactions. It's a bit of
118 * a tricky situation, but easily resolvable by scanning the
119 * prefix set backwards (forwards if originally scanning backwards)
120 * to locate the raw record representing the start (end) of the
121 * transaction.
123 if (jh == NULL) {
124 if (ss->ss_direction == JD_FORWARDS &&
125 (head->streamid & JREC_STREAMCTL_BEGIN) == 0
127 if (verbose_opt > 1)
128 fprintf(stderr, "mid-transaction detected transid %016jx "
129 "streamid %04x\n",
130 (uintmax_t)jd->jd_transid,
131 head->streamid & JREC_STREAMID_MASK);
132 if (jaddrecord_backtrack(ss, jd) == 0) {
133 if (verbose_opt)
134 fprintf(stderr, "mid-transaction streamid %04x collection "
135 "succeeded\n",
136 head->streamid & JREC_STREAMID_MASK);
137 goto retry;
139 fprintf(stderr, "mid-transaction streamid %04x collection failed\n",
140 head->streamid & JREC_STREAMID_MASK);
141 jscan_dispose(js);
142 return(NULL);
143 } else if (ss->ss_direction == JD_BACKWARDS &&
144 (head->streamid & JREC_STREAMCTL_END) == 0
146 if (verbose_opt > 1)
147 fprintf(stderr, "mid-transaction detected transid %016jx "
148 "streamid %04x\n",
149 (uintmax_t)jd->jd_transid,
150 head->streamid & JREC_STREAMID_MASK);
151 if (jaddrecord_backtrack(ss, jd) == 0) {
152 if (verbose_opt)
153 fprintf(stderr, "mid-transaction streamid %04x "
154 "collection succeeded\n",
155 head->streamid & JREC_STREAMID_MASK);
156 goto retry;
158 fprintf(stderr, "mid-transaction streamid %04x collection failed\n",
159 head->streamid & JREC_STREAMID_MASK);
160 jscan_dispose(js);
161 return(NULL);
166 * If we've made it to here and we still don't have a hash record
167 * to track the transaction, create one.
169 if (jh == NULL) {
170 jh = malloc(sizeof(*jh));
171 bzero(jh, sizeof(*jh));
172 *jhp = jh;
173 jh->jh_first = js;
174 jh->jh_last = js;
175 jh->jh_transid = head->streamid;
176 jh->jh_session = ss;
177 return (NULL);
181 * Emplace the stream segment
183 jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
184 if (ss->ss_direction == JD_FORWARDS) {
185 jh->jh_last->js_next = js;
186 jh->jh_last = js;
187 } else {
188 js->js_next = jh->jh_first;
189 jh->jh_first = js;
193 * If the transaction is complete, remove the hash entry and return the
194 * js representing the beginning of the transaction. Otherwise leave
195 * the hash entry intact and return NULL.
197 if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
198 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
200 *jhp = jh->jh_hash;
201 js = jh->jh_first;
202 free(jh);
204 jnormalize(js);
205 } else {
206 js = NULL;
208 return (js);
212 * Renormalize the jscan list to remove all the meta record headers
213 * and trailers except for the very first one.
215 static
216 void
217 jnormalize(struct jstream *js)
219 struct jstream *jscan;
220 off_t off;
222 js->js_normalized_off = 0;
223 js->js_normalized_base = (void *)js->js_head;
224 js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend);
225 js->js_normalized_total = js->js_normalized_size;
226 off = js->js_normalized_size;
227 for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
228 jscan->js_normalized_off = off;
229 jscan->js_normalized_base = (char *)jscan->js_head +
230 sizeof(struct journal_rawrecbeg);
231 jscan->js_normalized_size = jscan->js_head->recsize -
232 sizeof(struct journal_rawrecbeg) -
233 sizeof(struct journal_rawrecend);
234 off += jscan->js_normalized_size;
235 js->js_normalized_total += jscan->js_normalized_size;
240 * For sanity's sake I will describe the normal backtracking that must occur,
241 * but this routine must also operate on reverse-scanned (undo) records
242 * by forward tracking.
244 * A record has been found that represents the middle or end of a transaction
245 * when we were expecting the beginning of a transaction. We must backtrack
246 * to locate the start of the transaction, then process raw records relating
247 * to the transaction until we reach our current point (jd) again. If
248 * we find a matching streamid representing the end of a transaction instead
249 * of the expected start-of-transaction that record belongs to a wholely
250 * different meta-transaction and the record we seek is known to not be
251 * available.
253 * jd is the current record, directon is the normal scan direction (we have
254 * to scan in the reverse direction).
256 static
258 jaddrecord_backtrack(struct jsession *ss, struct jdata *jd)
260 struct jfile *jf = ss->ss_jfin;
261 struct jdata *scan;
262 struct jstream *js;
263 u_int16_t streamid;
264 u_int16_t scanid;
266 assert(ss->ss_direction == JD_FORWARDS || ss->ss_direction == JD_BACKWARDS);
267 if (jmodes & JMODEF_INPUT_PIPE)
268 return(-1);
270 streamid = ((struct journal_rawrecbeg *)jd->jd_data)->streamid & JREC_STREAMID_MASK;
272 if (ss->ss_direction == JD_FORWARDS) {
274 * Backtrack in the reverse direction looking for the transaction
275 * start bit. If we find an end bit instead it belongs to an
276 * unrelated transaction using the same streamid and there is no
277 * point continuing.
279 scan = jref(jd);
280 while ((scan = jread(jf, scan, JD_BACKWARDS)) != NULL) {
281 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
282 if ((scanid & JREC_STREAMID_MASK) != streamid)
283 continue;
284 if (scanid & JREC_STREAMCTL_END) {
285 jfree(jf, scan);
286 return(-1);
288 if (scanid & JREC_STREAMCTL_BEGIN)
289 break;
293 * Now jaddrecord the related records.
295 while (scan != NULL && scan->jd_transid < jd->jd_transid) {
296 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
297 if ((scanid & JREC_STREAMID_MASK) == streamid) {
298 js = jaddrecord(ss, scan);
299 assert(js == NULL);
301 scan = jread(jf, scan, JD_FORWARDS);
303 if (scan == NULL)
304 return(-1);
305 jfree(jf, scan);
306 } else {
308 * Backtrack in the forwards direction looking for the transaction
309 * end bit. If we find a start bit instead if belongs to an
310 * unrelated transaction using the same streamid and there is no
311 * point continuing.
313 scan = jref(jd);
314 while ((scan = jread(jf, scan, JD_FORWARDS)) != NULL) {
315 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
316 if ((scanid & JREC_STREAMID_MASK) != streamid)
317 continue;
318 if (scanid & JREC_STREAMCTL_BEGIN) {
319 jfree(jf, scan);
320 return(-1);
322 if (scanid & JREC_STREAMCTL_END)
323 break;
327 * Now jaddrecord the related records.
329 while (scan != NULL && scan->jd_transid > jd->jd_transid) {
330 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
331 if ((scanid & JREC_STREAMID_MASK) == streamid) {
332 js = jaddrecord(ss, scan);
333 assert(js == NULL);
335 scan = jread(jf, scan, JD_BACKWARDS);
337 if (scan == NULL)
338 return(-1);
339 jfree(jf, scan);
341 return(0);
344 void
345 jscan_dispose(struct jstream *js)
347 struct jstream *jnext;
349 if (js->js_alloc_buf) {
350 free(js->js_alloc_buf);
351 js->js_alloc_buf = NULL;
352 js->js_alloc_size = 0;
355 while (js) {
356 jnext = js->js_next;
357 jfree(js->js_session->ss_jfin, js->js_jdata);
358 js->js_jdata = NULL;
359 free(js);
360 js = jnext;
365 * Read the specified block of data out of a linked set of jstream
366 * structures. Returns 0 on success or an error code on error.
369 jsread(struct jstream *js, off_t off, void *buf, int bytes)
371 const void *ptr;
372 int n;
374 while (bytes) {
375 n = jsreadany(js, off, &ptr);
376 if (n == 0)
377 return (ENOENT);
378 if (n > bytes)
379 n = bytes;
380 bcopy(ptr, buf, n);
381 buf = (char *)buf + n;
382 off += n;
383 bytes -= n;
385 return(0);
389 * Read the specified block of data out of a linked set of jstream
390 * structures. Attempt to return a pointer into the data set but
391 * allocate and copy if that is not possible. Returns 0 on success
392 * or an error code on error.
395 jsreadp(struct jstream *js, off_t off, const void **bufp,
396 int bytes)
398 int error = 0;
399 int n;
401 n = jsreadany(js, off, bufp);
402 if (n < bytes) {
403 if (js->js_alloc_size < bytes) {
404 if (js->js_alloc_buf)
405 free(js->js_alloc_buf);
406 js->js_alloc_buf = malloc(bytes);
407 js->js_alloc_size = bytes;
408 if (js->js_alloc_buf == NULL)
409 fprintf(stderr, "attempt to allocate %d bytes failed\n", bytes);
410 assert(js->js_alloc_buf != NULL);
412 error = jsread(js, off, js->js_alloc_buf, bytes);
413 if (error) {
414 *bufp = NULL;
415 } else {
416 *bufp = js->js_alloc_buf;
419 return(error);
423 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t),
424 int fd, off_t off, int bytes)
426 const void *bufp;
427 int res;
428 int n;
429 int r;
431 res = 0;
432 while (bytes && (n = jsreadany(js, off, &bufp)) > 0) {
433 if (n > bytes)
434 n = bytes;
435 r = func(fd, bufp, n);
436 if (r != n) {
437 if (res == 0)
438 res = -1;
440 res += n;
441 bytes -= n;
442 off += n;
444 return(res);
448 * Return the largest contiguous buffer starting at the specified offset,
449 * or 0.
452 jsreadany(struct jstream *js, off_t off, const void **bufp)
454 struct jstream *scan;
455 int n;
457 if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off)
458 scan = js;
459 while (scan && scan->js_normalized_off <= off) {
460 js->js_cache = scan;
461 if (scan->js_normalized_off + scan->js_normalized_size > off) {
462 n = (int)(off - scan->js_normalized_off);
463 *bufp = scan->js_normalized_base + n;
464 return(scan->js_normalized_size - n);
466 scan = scan->js_next;
468 return(0);