2 * Copyright (c) 2004,2005 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $
39 static struct jhash
*JHashAry
[JHASH_SIZE
];
41 static void jnormalize(struct jstream
*js
);
42 static int jaddrecord_backtrack(struct jsession
*ss
, struct jdata
*jd
);
45 * Integrate a raw record. Deal with the transaction begin and end flags
46 * to create a forward-referenced collection of jstream records. If we are
47 * able to complete a transaction, the first js associated with that
48 * transaction is returned.
50 * XXX we need to store the data for very large multi-record transactions
51 * separately since it might not fit into memory.
53 * Note that a transaction might represent a huge I/O operation, resulting
54 * in an overall node structure that spans gigabytes, but individual
55 * subrecord leaf nodes are limited in size and we depend on this to simplify
56 * the handling of leaf records.
58 * A transaction may cover several raw records. The jstream collection for
59 * a transaction is only returned when the entire transaction has been
60 * successfully scanned. Due to the interleaving of transactions the ordering
61 * of returned JS's may be different (not exactly reversed) when scanning a
62 * journal backwards verses forwards. Since parallel operations are
63 * theoretically non-conflicting, this should not present a problem.
66 jaddrecord(struct jsession
*ss
, struct jdata
*jd
)
68 struct journal_rawrecbeg
*head
;
73 js
= malloc(sizeof(struct jstream
));
74 bzero(js
, sizeof(struct jstream
));
75 js
->js_jdata
= jref(jd
);
76 js
->js_head
= (void *)jd
->jd_data
;
81 * Check for a completely self-contained transaction, just return the
84 if ((head
->streamid
& (JREC_STREAMCTL_BEGIN
|JREC_STREAMCTL_END
)) ==
85 (JREC_STREAMCTL_BEGIN
|JREC_STREAMCTL_END
)
93 * Check for an open transaction in the hash table.
95 jhp
= &JHashAry
[head
->streamid
& JHASH_MASK
];
96 while ((jh
= *jhp
) != NULL
) {
97 if (jh
->jh_session
== ss
&&
98 ((jh
->jh_transid
^ head
->streamid
) & JREC_STREAMID_MASK
) == 0
106 * We might have picked up a transaction in the middle, which we
107 * detect by not finding a hash record coupled with a raw record
108 * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END
109 * bit if we are scanning backwards).
111 * When this case occurs we have to backtrack to locate the
112 * BEGIN (END if scanning backwards) and collect those records
113 * in order to obtain a complete transaction.
115 * This case most often occurs when a batch operation runs in
116 * prefix set whos starting raw-record transaction id is in
117 * the middle of one or more meta-transactions. It's a bit of
118 * a tricky situation, but easily resolvable by scanning the
119 * prefix set backwards (forwards if originally scanning backwards)
120 * to locate the raw record representing the start (end) of the
124 if (ss
->ss_direction
== JD_FORWARDS
&&
125 (head
->streamid
& JREC_STREAMCTL_BEGIN
) == 0
128 fprintf(stderr
, "mid-transaction detected transid %016jx "
130 (uintmax_t)jd
->jd_transid
,
131 head
->streamid
& JREC_STREAMID_MASK
);
132 if (jaddrecord_backtrack(ss
, jd
) == 0) {
134 fprintf(stderr
, "mid-transaction streamid %04x collection "
136 head
->streamid
& JREC_STREAMID_MASK
);
139 fprintf(stderr
, "mid-transaction streamid %04x collection failed\n",
140 head
->streamid
& JREC_STREAMID_MASK
);
143 } else if (ss
->ss_direction
== JD_BACKWARDS
&&
144 (head
->streamid
& JREC_STREAMCTL_END
) == 0
147 fprintf(stderr
, "mid-transaction detected transid %016jx "
149 (uintmax_t)jd
->jd_transid
,
150 head
->streamid
& JREC_STREAMID_MASK
);
151 if (jaddrecord_backtrack(ss
, jd
) == 0) {
153 fprintf(stderr
, "mid-transaction streamid %04x "
154 "collection succeeded\n",
155 head
->streamid
& JREC_STREAMID_MASK
);
158 fprintf(stderr
, "mid-transaction streamid %04x collection failed\n",
159 head
->streamid
& JREC_STREAMID_MASK
);
166 * If we've made it to here and we still don't have a hash record
167 * to track the transaction, create one.
170 jh
= malloc(sizeof(*jh
));
171 bzero(jh
, sizeof(*jh
));
175 jh
->jh_transid
= head
->streamid
;
181 * Emplace the stream segment
183 jh
->jh_transid
|= head
->streamid
& JREC_STREAMCTL_MASK
;
184 if (ss
->ss_direction
== JD_FORWARDS
) {
185 jh
->jh_last
->js_next
= js
;
188 js
->js_next
= jh
->jh_first
;
193 * If the transaction is complete, remove the hash entry and return the
194 * js representing the beginning of the transaction. Otherwise leave
195 * the hash entry intact and return NULL.
197 if ((jh
->jh_transid
& (JREC_STREAMCTL_BEGIN
|JREC_STREAMCTL_END
)) ==
198 (JREC_STREAMCTL_BEGIN
|JREC_STREAMCTL_END
)
212 * Renormalize the jscan list to remove all the meta record headers
213 * and trailers except for the very first one.
217 jnormalize(struct jstream
*js
)
219 struct jstream
*jscan
;
222 js
->js_normalized_off
= 0;
223 js
->js_normalized_base
= (void *)js
->js_head
;
224 js
->js_normalized_size
= js
->js_head
->recsize
- sizeof(struct journal_rawrecend
);
225 js
->js_normalized_total
= js
->js_normalized_size
;
226 off
= js
->js_normalized_size
;
227 for (jscan
= js
->js_next
; jscan
; jscan
= jscan
->js_next
) {
228 jscan
->js_normalized_off
= off
;
229 jscan
->js_normalized_base
= (char *)jscan
->js_head
+
230 sizeof(struct journal_rawrecbeg
);
231 jscan
->js_normalized_size
= jscan
->js_head
->recsize
-
232 sizeof(struct journal_rawrecbeg
) -
233 sizeof(struct journal_rawrecend
);
234 off
+= jscan
->js_normalized_size
;
235 js
->js_normalized_total
+= jscan
->js_normalized_size
;
240 * For sanity's sake I will describe the normal backtracking that must occur,
241 * but this routine must also operate on reverse-scanned (undo) records
242 * by forward tracking.
244 * A record has been found that represents the middle or end of a transaction
245 * when we were expecting the beginning of a transaction. We must backtrack
246 * to locate the start of the transaction, then process raw records relating
247 * to the transaction until we reach our current point (jd) again. If
248 * we find a matching streamid representing the end of a transaction instead
249 * of the expected start-of-transaction that record belongs to a wholely
250 * different meta-transaction and the record we seek is known to not be
253 * jd is the current record, directon is the normal scan direction (we have
254 * to scan in the reverse direction).
258 jaddrecord_backtrack(struct jsession
*ss
, struct jdata
*jd
)
260 struct jfile
*jf
= ss
->ss_jfin
;
266 assert(ss
->ss_direction
== JD_FORWARDS
|| ss
->ss_direction
== JD_BACKWARDS
);
267 if (jmodes
& JMODEF_INPUT_PIPE
)
270 streamid
= ((struct journal_rawrecbeg
*)jd
->jd_data
)->streamid
& JREC_STREAMID_MASK
;
272 if (ss
->ss_direction
== JD_FORWARDS
) {
274 * Backtrack in the reverse direction looking for the transaction
275 * start bit. If we find an end bit instead it belongs to an
276 * unrelated transaction using the same streamid and there is no
280 while ((scan
= jread(jf
, scan
, JD_BACKWARDS
)) != NULL
) {
281 scanid
= ((struct journal_rawrecbeg
*)scan
->jd_data
)->streamid
;
282 if ((scanid
& JREC_STREAMID_MASK
) != streamid
)
284 if (scanid
& JREC_STREAMCTL_END
) {
288 if (scanid
& JREC_STREAMCTL_BEGIN
)
293 * Now jaddrecord the related records.
295 while (scan
!= NULL
&& scan
->jd_transid
< jd
->jd_transid
) {
296 scanid
= ((struct journal_rawrecbeg
*)scan
->jd_data
)->streamid
;
297 if ((scanid
& JREC_STREAMID_MASK
) == streamid
) {
298 js
= jaddrecord(ss
, scan
);
301 scan
= jread(jf
, scan
, JD_FORWARDS
);
308 * Backtrack in the forwards direction looking for the transaction
309 * end bit. If we find a start bit instead if belongs to an
310 * unrelated transaction using the same streamid and there is no
314 while ((scan
= jread(jf
, scan
, JD_FORWARDS
)) != NULL
) {
315 scanid
= ((struct journal_rawrecbeg
*)scan
->jd_data
)->streamid
;
316 if ((scanid
& JREC_STREAMID_MASK
) != streamid
)
318 if (scanid
& JREC_STREAMCTL_BEGIN
) {
322 if (scanid
& JREC_STREAMCTL_END
)
327 * Now jaddrecord the related records.
329 while (scan
!= NULL
&& scan
->jd_transid
> jd
->jd_transid
) {
330 scanid
= ((struct journal_rawrecbeg
*)scan
->jd_data
)->streamid
;
331 if ((scanid
& JREC_STREAMID_MASK
) == streamid
) {
332 js
= jaddrecord(ss
, scan
);
335 scan
= jread(jf
, scan
, JD_BACKWARDS
);
345 jscan_dispose(struct jstream
*js
)
347 struct jstream
*jnext
;
349 if (js
->js_alloc_buf
) {
350 free(js
->js_alloc_buf
);
351 js
->js_alloc_buf
= NULL
;
352 js
->js_alloc_size
= 0;
357 jfree(js
->js_session
->ss_jfin
, js
->js_jdata
);
365 * Read the specified block of data out of a linked set of jstream
366 * structures. Returns 0 on success or an error code on error.
369 jsread(struct jstream
*js
, off_t off
, void *buf
, int bytes
)
375 n
= jsreadany(js
, off
, &ptr
);
381 buf
= (char *)buf
+ n
;
389 * Read the specified block of data out of a linked set of jstream
390 * structures. Attempt to return a pointer into the data set but
391 * allocate and copy if that is not possible. Returns 0 on success
392 * or an error code on error.
395 jsreadp(struct jstream
*js
, off_t off
, const void **bufp
,
401 n
= jsreadany(js
, off
, bufp
);
403 if (js
->js_alloc_size
< bytes
) {
404 if (js
->js_alloc_buf
)
405 free(js
->js_alloc_buf
);
406 js
->js_alloc_buf
= malloc(bytes
);
407 js
->js_alloc_size
= bytes
;
408 if (js
->js_alloc_buf
== NULL
)
409 fprintf(stderr
, "attempt to allocate %d bytes failed\n", bytes
);
410 assert(js
->js_alloc_buf
!= NULL
);
412 error
= jsread(js
, off
, js
->js_alloc_buf
, bytes
);
416 *bufp
= js
->js_alloc_buf
;
423 jsreadcallback(struct jstream
*js
, ssize_t (*func
)(int, const void *, size_t),
424 int fd
, off_t off
, int bytes
)
432 while (bytes
&& (n
= jsreadany(js
, off
, &bufp
)) > 0) {
435 r
= func(fd
, bufp
, n
);
448 * Return the largest contiguous buffer starting at the specified offset,
452 jsreadany(struct jstream
*js
, off_t off
, const void **bufp
)
454 struct jstream
*scan
;
457 if ((scan
= js
->js_cache
) == NULL
|| scan
->js_normalized_off
> off
)
459 while (scan
&& scan
->js_normalized_off
<= off
) {
461 if (scan
->js_normalized_off
+ scan
->js_normalized_size
> off
) {
462 n
= (int)(off
- scan
->js_normalized_off
);
463 *bufp
= scan
->js_normalized_base
+ n
;
464 return(scan
->js_normalized_size
- n
);
466 scan
= scan
->js_next
;