2 * Copyright (c) 2004,2005 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $
39 static struct jhash
*JHashAry
[JHASH_SIZE
];
41 static void jnormalize(struct jstream
*js
);
42 static int jaddrecord_backtrack(struct jsession
*ss
, struct jdata
*jd
);
45 * Integrate a raw record. Deal with the transaction begin and end flags
46 * to create a forward-referenced collection of jstream records. If we are
47 * able to complete a transaction, the first js associated with that
48 * transaction is returned.
50 * XXX we need to store the data for very large multi-record transactions
51 * separately since it might not fit into memory.
53 * Note that a transaction might represent a huge I/O operation, resulting
54 * in an overall node structure that spans gigabytes, but individual
55 * subrecord leaf nodes are limited in size and we depend on this to simplify
56 * the handling of leaf records.
58 * A transaction may cover several raw records. The jstream collection for
59 * a transaction is only returned when the entire transaction has been
60 * successfully scanned. Due to the interleaving of transactions the ordering
61 * of returned JS's may be different (not exactly reversed) when scanning a
62 * journal backwards verses forwards. Since parallel operations are
63 * theoretically non-conflicting, this should not present a problem.
66 jaddrecord(struct jsession
*ss
, struct jdata
*jd
)
68 struct journal_rawrecbeg
*head
;
73 js
= malloc(sizeof(struct jstream
));
74 bzero(js
, sizeof(struct jstream
));
75 js
->js_jdata
= jref(jd
);
76 js
->js_head
= (void *)jd
->jd_data
;
81 * Check for a completely self-contained transaction, just return the
84 if ((head
->streamid
& (JREC_STREAMCTL_BEGIN
|JREC_STREAMCTL_END
)) ==
85 (JREC_STREAMCTL_BEGIN
|JREC_STREAMCTL_END
)
93 * Check for an open transaction in the hash table.
95 jhp
= &JHashAry
[head
->streamid
& JHASH_MASK
];
96 while ((jh
= *jhp
) != NULL
) {
97 if (jh
->jh_session
== ss
&&
98 ((jh
->jh_transid
^ head
->streamid
) & JREC_STREAMID_MASK
) == 0
106 * We might have picked up a transaction in the middle, which we
107 * detect by not finding a hash record coupled with a raw record
108 * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END
109 * bit if we are scanning backwards).
111 * When this case occurs we have to backtrack to locate the
112 * BEGIN (END if scanning backwards) and collect those records
113 * in order to obtain a complete transaction.
115 * This case most often occurs when a batch operation runs in
116 * prefix set whos starting raw-record transaction id is in
117 * the middle of one or more meta-transactions. It's a bit of
118 * a tricky situation, but easily resolvable by scanning the
119 * prefix set backwards (forwards if originally scanning backwards)
120 * to locate the raw record representing the start (end) of the
124 if (ss
->ss_direction
== JD_FORWARDS
&&
125 (head
->streamid
& JREC_STREAMCTL_BEGIN
) == 0
128 fprintf(stderr
, "mid-transaction detected transid %016llx streamid %04x\n", jd
->jd_transid
, head
->streamid
& JREC_STREAMID_MASK
);
129 if (jaddrecord_backtrack(ss
, jd
) == 0) {
131 fprintf(stderr
, "mid-transaction streamid %04x collection succeeded\n", head
->streamid
& JREC_STREAMID_MASK
);
134 fprintf(stderr
, "mid-transaction streamid %04x collection failed\n", head
->streamid
& JREC_STREAMID_MASK
);
137 } else if (ss
->ss_direction
== JD_BACKWARDS
&&
138 (head
->streamid
& JREC_STREAMCTL_END
) == 0
141 fprintf(stderr
, "mid-transaction detected transid %016llx streamid %04x\n", jd
->jd_transid
, head
->streamid
& JREC_STREAMID_MASK
);
142 if (jaddrecord_backtrack(ss
, jd
) == 0) {
144 fprintf(stderr
, "mid-transaction streamid %04x collection succeeded\n", head
->streamid
& JREC_STREAMID_MASK
);
147 fprintf(stderr
, "mid-transaction streamid %04x collection failed\n", head
->streamid
& JREC_STREAMID_MASK
);
154 * If we've made it to here and we still don't have a hash record
155 * to track the transaction, create one.
158 jh
= malloc(sizeof(*jh
));
159 bzero(jh
, sizeof(*jh
));
163 jh
->jh_transid
= head
->streamid
;
169 * Emplace the stream segment
171 jh
->jh_transid
|= head
->streamid
& JREC_STREAMCTL_MASK
;
172 if (ss
->ss_direction
== JD_FORWARDS
) {
173 jh
->jh_last
->js_next
= js
;
176 js
->js_next
= jh
->jh_first
;
181 * If the transaction is complete, remove the hash entry and return the
182 * js representing the beginning of the transaction. Otherwise leave
183 * the hash entry intact and return NULL.
185 if ((jh
->jh_transid
& (JREC_STREAMCTL_BEGIN
|JREC_STREAMCTL_END
)) ==
186 (JREC_STREAMCTL_BEGIN
|JREC_STREAMCTL_END
)
200 * Renormalize the jscan list to remove all the meta record headers
201 * and trailers except for the very first one.
205 jnormalize(struct jstream
*js
)
207 struct jstream
*jscan
;
210 js
->js_normalized_off
= 0;
211 js
->js_normalized_base
= (void *)js
->js_head
;
212 js
->js_normalized_size
= js
->js_head
->recsize
- sizeof(struct journal_rawrecend
);
213 js
->js_normalized_total
= js
->js_normalized_size
;
214 off
= js
->js_normalized_size
;
215 for (jscan
= js
->js_next
; jscan
; jscan
= jscan
->js_next
) {
216 jscan
->js_normalized_off
= off
;
217 jscan
->js_normalized_base
= (char *)jscan
->js_head
+
218 sizeof(struct journal_rawrecbeg
);
219 jscan
->js_normalized_size
= jscan
->js_head
->recsize
-
220 sizeof(struct journal_rawrecbeg
) -
221 sizeof(struct journal_rawrecend
);
222 off
+= jscan
->js_normalized_size
;
223 js
->js_normalized_total
+= jscan
->js_normalized_size
;
228 * For sanity's sake I will describe the normal backtracking that must occur,
229 * but this routine must also operate on reverse-scanned (undo) records
230 * by forward tracking.
232 * A record has been found that represents the middle or end of a transaction
233 * when we were expecting the beginning of a transaction. We must backtrack
234 * to locate the start of the transaction, then process raw records relating
235 * to the transaction until we reach our current point (jd) again. If
236 * we find a matching streamid representing the end of a transaction instead
237 * of the expected start-of-transaction that record belongs to a wholely
238 * different meta-transaction and the record we seek is known to not be
241 * jd is the current record, directon is the normal scan direction (we have
242 * to scan in the reverse direction).
246 jaddrecord_backtrack(struct jsession
*ss
, struct jdata
*jd
)
248 struct jfile
*jf
= ss
->ss_jfin
;
254 assert(ss
->ss_direction
== JD_FORWARDS
|| ss
->ss_direction
== JD_BACKWARDS
);
255 if (jmodes
& JMODEF_INPUT_PIPE
)
258 streamid
= ((struct journal_rawrecbeg
*)jd
->jd_data
)->streamid
& JREC_STREAMID_MASK
;
260 if (ss
->ss_direction
== JD_FORWARDS
) {
262 * Backtrack in the reverse direction looking for the transaction
263 * start bit. If we find an end bit instead it belongs to an
264 * unrelated transaction using the same streamid and there is no
268 while ((scan
= jread(jf
, scan
, JD_BACKWARDS
)) != NULL
) {
269 scanid
= ((struct journal_rawrecbeg
*)scan
->jd_data
)->streamid
;
270 if ((scanid
& JREC_STREAMID_MASK
) != streamid
)
272 if (scanid
& JREC_STREAMCTL_END
) {
276 if (scanid
& JREC_STREAMCTL_BEGIN
)
281 * Now jaddrecord the related records.
283 while (scan
!= NULL
&& scan
->jd_transid
< jd
->jd_transid
) {
284 scanid
= ((struct journal_rawrecbeg
*)scan
->jd_data
)->streamid
;
285 if ((scanid
& JREC_STREAMID_MASK
) == streamid
) {
286 js
= jaddrecord(ss
, scan
);
289 scan
= jread(jf
, scan
, JD_FORWARDS
);
296 * Backtrack in the forwards direction looking for the transaction
297 * end bit. If we find a start bit instead if belongs to an
298 * unrelated transaction using the same streamid and there is no
302 while ((scan
= jread(jf
, scan
, JD_FORWARDS
)) != NULL
) {
303 scanid
= ((struct journal_rawrecbeg
*)scan
->jd_data
)->streamid
;
304 if ((scanid
& JREC_STREAMID_MASK
) != streamid
)
306 if (scanid
& JREC_STREAMCTL_BEGIN
) {
310 if (scanid
& JREC_STREAMCTL_END
)
315 * Now jaddrecord the related records.
317 while (scan
!= NULL
&& scan
->jd_transid
> jd
->jd_transid
) {
318 scanid
= ((struct journal_rawrecbeg
*)scan
->jd_data
)->streamid
;
319 if ((scanid
& JREC_STREAMID_MASK
) == streamid
) {
320 js
= jaddrecord(ss
, scan
);
323 scan
= jread(jf
, scan
, JD_BACKWARDS
);
333 jscan_dispose(struct jstream
*js
)
335 struct jstream
*jnext
;
337 if (js
->js_alloc_buf
) {
338 free(js
->js_alloc_buf
);
339 js
->js_alloc_buf
= NULL
;
340 js
->js_alloc_size
= 0;
345 jfree(js
->js_session
->ss_jfin
, js
->js_jdata
);
353 * Read the specified block of data out of a linked set of jstream
354 * structures. Returns 0 on success or an error code on error.
357 jsread(struct jstream
*js
, off_t off
, void *buf
, int bytes
)
363 n
= jsreadany(js
, off
, &ptr
);
369 buf
= (char *)buf
+ n
;
377 * Read the specified block of data out of a linked set of jstream
378 * structures. Attempt to return a pointer into the data set but
379 * allocate and copy if that is not possible. Returns 0 on success
380 * or an error code on error.
383 jsreadp(struct jstream
*js
, off_t off
, const void **bufp
,
389 n
= jsreadany(js
, off
, bufp
);
391 if (js
->js_alloc_size
< bytes
) {
392 if (js
->js_alloc_buf
)
393 free(js
->js_alloc_buf
);
394 js
->js_alloc_buf
= malloc(bytes
);
395 js
->js_alloc_size
= bytes
;
396 if (js
->js_alloc_buf
== NULL
)
397 fprintf(stderr
, "attempt to allocate %d bytes failed\n", bytes
);
398 assert(js
->js_alloc_buf
!= NULL
);
400 error
= jsread(js
, off
, js
->js_alloc_buf
, bytes
);
404 *bufp
= js
->js_alloc_buf
;
411 jsreadcallback(struct jstream
*js
, ssize_t (*func
)(int, const void *, size_t),
412 int fd
, off_t off
, int bytes
)
420 while (bytes
&& (n
= jsreadany(js
, off
, &bufp
)) > 0) {
423 r
= func(fd
, bufp
, n
);
436 * Return the largest contiguous buffer starting at the specified offset,
440 jsreadany(struct jstream
*js
, off_t off
, const void **bufp
)
442 struct jstream
*scan
;
445 if ((scan
= js
->js_cache
) == NULL
|| scan
->js_normalized_off
> off
)
447 while (scan
&& scan
->js_normalized_off
<= off
) {
449 if (scan
->js_normalized_off
+ scan
->js_normalized_size
> off
) {
450 n
= (int)(off
- scan
->js_normalized_off
);
451 *bufp
= scan
->js_normalized_base
+ n
;
452 return(scan
->js_normalized_size
- n
);
454 scan
= scan
->js_next
;