2 * Copyright (c) 2008 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * $DragonFly: src/sys/vfs/hammer/hammer_mirror.c,v 1.15 2008/07/13 01:12:41 dillon Exp $
37 * HAMMER mirroring ioctls - serialize and deserialize modifications made
43 static int hammer_mirror_check(hammer_cursor_t cursor
,
44 struct hammer_ioc_mrecord_rec
*mrec
);
45 static int hammer_mirror_update(hammer_cursor_t cursor
,
46 struct hammer_ioc_mrecord_rec
*mrec
);
47 static int hammer_mirror_write(hammer_cursor_t cursor
,
48 struct hammer_ioc_mrecord_rec
*mrec
,
50 static int hammer_ioc_mirror_write_rec(hammer_cursor_t cursor
,
51 struct hammer_ioc_mrecord_rec
*mrec
,
52 struct hammer_ioc_mirror_rw
*mirror
,
53 u_int32_t localization
,
55 static int hammer_ioc_mirror_write_pass(hammer_cursor_t cursor
,
56 struct hammer_ioc_mrecord_rec
*mrec
,
57 struct hammer_ioc_mirror_rw
*mirror
,
58 u_int32_t localization
);
59 static int hammer_ioc_mirror_write_skip(hammer_cursor_t cursor
,
60 struct hammer_ioc_mrecord_skip
*mrec
,
61 struct hammer_ioc_mirror_rw
*mirror
,
62 u_int32_t localization
);
63 static int hammer_mirror_delete_to(hammer_cursor_t cursor
,
64 struct hammer_ioc_mirror_rw
*mirror
);
65 static int hammer_mirror_localize_data(hammer_data_ondisk_t data
,
66 hammer_btree_leaf_elm_t leaf
);
69 * All B-Tree records within the specified key range which also conform
70 * to the transaction id range are returned. Mirroring code keeps track
71 * of the last transaction id fully scanned and can efficiently pick up
72 * where it left off if interrupted.
74 * The PFS is identified in the mirror structure. The passed ip is just
75 * some directory in the overall HAMMER filesystem and has nothing to
79 hammer_ioc_mirror_read(hammer_transaction_t trans
, hammer_inode_t ip
,
80 struct hammer_ioc_mirror_rw
*mirror
)
82 struct hammer_cmirror cmirror
;
83 struct hammer_cursor cursor
;
84 union hammer_ioc_mrecord_any mrec
;
85 hammer_btree_leaf_elm_t elm
;
86 const int crc_start
= HAMMER_MREC_CRCOFF
;
92 u_int32_t localization
;
95 localization
= (u_int32_t
)mirror
->pfs_id
<< 16;
97 if ((mirror
->key_beg
.localization
| mirror
->key_end
.localization
) &
98 HAMMER_LOCALIZE_PSEUDOFS_MASK
) {
101 if (hammer_btree_cmp(&mirror
->key_beg
, &mirror
->key_end
) > 0)
104 mirror
->key_cur
= mirror
->key_beg
;
105 mirror
->key_cur
.localization
&= HAMMER_LOCALIZE_MASK
;
106 mirror
->key_cur
.localization
+= localization
;
107 bzero(&mrec
, sizeof(mrec
));
108 bzero(&cmirror
, sizeof(cmirror
));
111 error
= hammer_init_cursor(trans
, &cursor
, NULL
, NULL
);
113 hammer_done_cursor(&cursor
);
116 cursor
.key_beg
= mirror
->key_cur
;
117 cursor
.key_end
= mirror
->key_end
;
118 cursor
.key_end
.localization
&= HAMMER_LOCALIZE_MASK
;
119 cursor
.key_end
.localization
+= localization
;
121 cursor
.flags
|= HAMMER_CURSOR_END_INCLUSIVE
;
122 cursor
.flags
|= HAMMER_CURSOR_BACKEND
;
125 * This flag filters the search to only return elements whos create
126 * or delete TID is >= mirror_tid. The B-Tree uses the mirror_tid
127 * field stored with internal and leaf nodes to shortcut the scan.
129 cursor
.flags
|= HAMMER_CURSOR_MIRROR_FILTERED
;
130 cursor
.cmirror
= &cmirror
;
131 cmirror
.mirror_tid
= mirror
->tid_beg
;
133 error
= hammer_btree_first(&cursor
);
136 * Yield to more important tasks
139 error
= hammer_signal_check(trans
->hmp
);
145 * An internal node can be returned in mirror-filtered
146 * mode and indicates that the scan is returning a skip
147 * range in the cursor->cmirror structure.
149 uptr
= (char *)mirror
->ubuf
+ mirror
->count
;
150 if (cursor
.node
->ondisk
->type
== HAMMER_BTREE_TYPE_INTERNAL
) {
154 mirror
->key_cur
= cmirror
.skip_beg
;
155 bytes
= sizeof(mrec
.skip
);
156 if (mirror
->count
+ HAMMER_HEAD_DOALIGN(bytes
) >
164 mrec
.head
.signature
= HAMMER_IOC_MIRROR_SIGNATURE
;
165 mrec
.head
.type
= HAMMER_MREC_TYPE_SKIP
;
166 mrec
.head
.rec_size
= bytes
;
167 mrec
.skip
.skip_beg
= cmirror
.skip_beg
;
168 mrec
.skip
.skip_end
= cmirror
.skip_end
;
169 mrec
.head
.rec_crc
= crc32(&mrec
.head
.rec_size
,
171 error
= copyout(&mrec
, uptr
, bytes
);
177 * Leaf node. In full-history mode we could filter out
178 * elements modified outside the user-requested TID range.
180 * However, such elements must be returned so the writer
181 * can compare them against the target to detemrine what
182 * needs to be deleted on the target, particular for
183 * no-history mirrors.
185 KKASSERT(cursor
.node
->ondisk
->type
== HAMMER_BTREE_TYPE_LEAF
);
186 elm
= &cursor
.node
->ondisk
->elms
[cursor
.index
].leaf
;
187 mirror
->key_cur
= elm
->base
;
189 if ((elm
->base
.create_tid
< mirror
->tid_beg
||
190 elm
->base
.create_tid
> mirror
->tid_end
) &&
191 (elm
->base
.delete_tid
< mirror
->tid_beg
||
192 elm
->base
.delete_tid
> mirror
->tid_end
)) {
193 bytes
= sizeof(mrec
.rec
);
194 if (mirror
->count
+ HAMMER_HEAD_DOALIGN(bytes
) >
200 * Fill mrec. PASS records are records which are
201 * outside the TID range needed for the mirror
202 * update. They are sent without any data payload
203 * because the mirroring target must still compare
204 * records that fall outside the SKIP ranges to
205 * determine what might need to be deleted. Such
206 * deletions are needed if the master or files on
207 * the master are no-history, or if the slave is
208 * so far behind the master has already been pruned.
210 mrec
.head
.signature
= HAMMER_IOC_MIRROR_SIGNATURE
;
211 mrec
.head
.type
= HAMMER_MREC_TYPE_PASS
;
212 mrec
.head
.rec_size
= bytes
;
213 mrec
.rec
.leaf
= *elm
;
214 mrec
.head
.rec_crc
= crc32(&mrec
.head
.rec_size
,
216 error
= copyout(&mrec
, uptr
, bytes
);
223 * The core code exports the data to userland.
225 data_len
= (elm
->data_offset
) ? elm
->data_len
: 0;
227 error
= hammer_btree_extract(&cursor
,
228 HAMMER_CURSOR_GET_DATA
);
233 bytes
= sizeof(mrec
.rec
) + data_len
;
234 if (mirror
->count
+ HAMMER_HEAD_DOALIGN(bytes
) > mirror
->size
)
238 * Construct the record for userland and copyout.
240 * The user is asking for a snapshot, if the record was
241 * deleted beyond the user-requested ending tid, the record
242 * is not considered deleted from the point of view of
243 * userland and delete_tid is cleared.
245 mrec
.head
.signature
= HAMMER_IOC_MIRROR_SIGNATURE
;
246 mrec
.head
.type
= HAMMER_MREC_TYPE_REC
;
247 mrec
.head
.rec_size
= bytes
;
248 mrec
.rec
.leaf
= *elm
;
249 if (elm
->base
.delete_tid
>= mirror
->tid_end
)
250 mrec
.rec
.leaf
.base
.delete_tid
= 0;
251 rec_crc
= crc32(&mrec
.head
.rec_size
,
252 sizeof(mrec
.rec
) - crc_start
);
254 rec_crc
= crc32_ext(cursor
.data
, data_len
, rec_crc
);
255 mrec
.head
.rec_crc
= rec_crc
;
256 error
= copyout(&mrec
, uptr
, sizeof(mrec
.rec
));
257 if (data_len
&& error
== 0) {
258 error
= copyout(cursor
.data
, uptr
+ sizeof(mrec
.rec
),
264 * eatdisk controls whether we skip the current cursor
265 * position on the next scan or not. If doing a SKIP
266 * the cursor is already positioned properly for the next
267 * scan and eatdisk will be 0.
271 mirror
->count
+= HAMMER_HEAD_DOALIGN(bytes
);
273 cursor
.flags
|= HAMMER_CURSOR_ATEDISK
;
275 cursor
.flags
&= ~HAMMER_CURSOR_ATEDISK
;
276 error
= hammer_btree_iterate(&cursor
);
279 if (error
== ENOENT
) {
280 mirror
->key_cur
= mirror
->key_end
;
283 hammer_done_cursor(&cursor
);
284 if (error
== EDEADLK
)
286 if (error
== EINTR
) {
287 mirror
->head
.flags
|= HAMMER_IOC_HEAD_INTR
;
291 mirror
->key_cur
.localization
&= HAMMER_LOCALIZE_MASK
;
296 * Copy records from userland to the target mirror.
298 * The PFS is identified in the mirror structure. The passed ip is just
299 * some directory in the overall HAMMER filesystem and has nothing to
300 * do with the PFS. In fact, there might not even be a root directory for
304 hammer_ioc_mirror_write(hammer_transaction_t trans
, hammer_inode_t ip
,
305 struct hammer_ioc_mirror_rw
*mirror
)
307 union hammer_ioc_mrecord_any mrec
;
308 struct hammer_cursor cursor
;
309 u_int32_t localization
;
310 int checkspace_count
= 0;
316 localization
= (u_int32_t
)mirror
->pfs_id
<< 16;
317 seq
= trans
->hmp
->flusher
.act
;
320 * Validate the mirror structure and relocalize the tracking keys.
322 if (mirror
->size
< 0 || mirror
->size
> 0x70000000)
324 mirror
->key_beg
.localization
&= HAMMER_LOCALIZE_MASK
;
325 mirror
->key_beg
.localization
+= localization
;
326 mirror
->key_end
.localization
&= HAMMER_LOCALIZE_MASK
;
327 mirror
->key_end
.localization
+= localization
;
328 mirror
->key_cur
.localization
&= HAMMER_LOCALIZE_MASK
;
329 mirror
->key_cur
.localization
+= localization
;
332 * Set up our tracking cursor for the loop. The tracking cursor
333 * is used to delete records that are no longer present on the
334 * master. The last handled record at key_cur must be skipped.
336 error
= hammer_init_cursor(trans
, &cursor
, NULL
, NULL
);
338 cursor
.key_beg
= mirror
->key_cur
;
339 cursor
.key_end
= mirror
->key_end
;
340 cursor
.flags
|= HAMMER_CURSOR_BACKEND
;
341 error
= hammer_btree_first(&cursor
);
343 cursor
.flags
|= HAMMER_CURSOR_ATEDISK
;
348 * Loop until our input buffer has been exhausted.
351 mirror
->count
+ sizeof(mrec
.head
) <= mirror
->size
) {
354 * Don't blow out the buffer cache. Leave room for frontend
357 while (hammer_flusher_meta_halflimit(trans
->hmp
) ||
358 hammer_flusher_undo_exhausted(trans
, 2)) {
359 hammer_unlock_cursor(&cursor
, 0);
360 hammer_flusher_wait(trans
->hmp
, seq
);
361 hammer_lock_cursor(&cursor
, 0);
362 seq
= hammer_flusher_async_one(trans
->hmp
);
366 * If there is insufficient free space it may be due to
367 * reserved bigblocks, which flushing might fix.
369 if (hammer_checkspace(trans
->hmp
, HAMMER_CHKSPC_MIRROR
)) {
370 if (++checkspace_count
== 10) {
374 hammer_unlock_cursor(&cursor
, 0);
375 hammer_flusher_wait(trans
->hmp
, seq
);
376 hammer_lock_cursor(&cursor
, 0);
377 seq
= hammer_flusher_async(trans
->hmp
, NULL
);
382 * Acquire and validate header
384 if ((bytes
= mirror
->size
- mirror
->count
) > sizeof(mrec
))
385 bytes
= sizeof(mrec
);
386 uptr
= (char *)mirror
->ubuf
+ mirror
->count
;
387 error
= copyin(uptr
, &mrec
, bytes
);
390 if (mrec
.head
.signature
!= HAMMER_IOC_MIRROR_SIGNATURE
) {
394 if (mrec
.head
.rec_size
< sizeof(mrec
.head
) ||
395 mrec
.head
.rec_size
> sizeof(mrec
) + HAMMER_XBUFSIZE
||
396 mirror
->count
+ mrec
.head
.rec_size
> mirror
->size
) {
401 switch(mrec
.head
.type
) {
402 case HAMMER_MREC_TYPE_SKIP
:
403 if (mrec
.head
.rec_size
!= sizeof(mrec
.skip
))
406 error
= hammer_ioc_mirror_write_skip(&cursor
, &mrec
.skip
, mirror
, localization
);
408 case HAMMER_MREC_TYPE_REC
:
409 if (mrec
.head
.rec_size
< sizeof(mrec
.rec
))
412 error
= hammer_ioc_mirror_write_rec(&cursor
, &mrec
.rec
, mirror
, localization
, uptr
+ sizeof(mrec
.rec
));
414 case HAMMER_MREC_TYPE_PASS
:
415 if (mrec
.head
.rec_size
!= sizeof(mrec
.rec
))
418 error
= hammer_ioc_mirror_write_pass(&cursor
, &mrec
.rec
, mirror
, localization
);
426 * Retry the current record on deadlock, otherwise setup
429 if (error
== EDEADLK
) {
430 while (error
== EDEADLK
) {
431 hammer_recover_cursor(&cursor
);
432 error
= hammer_cursor_upgrade(&cursor
);
435 if (error
== EALREADY
)
439 HAMMER_HEAD_DOALIGN(mrec
.head
.rec_size
);
443 hammer_done_cursor(&cursor
);
449 mirror
->head
.flags
|= HAMMER_IOC_HEAD_ERROR
;
450 mirror
->head
.error
= error
;
454 * ioctls don't update the RW data structure if an error is returned,
461 * Handle skip records.
463 * We must iterate from the last resolved record position at mirror->key_cur
464 * to skip_beg and delete any records encountered.
466 * mirror->key_cur must be carefully set when we succeed in processing
470 hammer_ioc_mirror_write_skip(hammer_cursor_t cursor
,
471 struct hammer_ioc_mrecord_skip
*mrec
,
472 struct hammer_ioc_mirror_rw
*mirror
,
473 u_int32_t localization
)
478 * Relocalize the skip range
480 mrec
->skip_beg
.localization
&= HAMMER_LOCALIZE_MASK
;
481 mrec
->skip_beg
.localization
+= localization
;
482 mrec
->skip_end
.localization
&= HAMMER_LOCALIZE_MASK
;
483 mrec
->skip_end
.localization
+= localization
;
486 * Iterate from current position to skip_beg, deleting any records
489 cursor
->key_end
= mrec
->skip_beg
;
490 cursor
->flags
|= HAMMER_CURSOR_BACKEND
;
491 error
= hammer_mirror_delete_to(cursor
, mirror
);
494 * Now skip past the skip (which is the whole point point of
495 * having a skip record). The sender has not sent us any records
496 * for the skip area so we wouldn't know what to keep and what
499 * Clear ATEDISK because skip_end is non-inclusive, so we can't
500 * count an exact match if we happened to get one.
503 mirror
->key_cur
= mrec
->skip_end
;
504 cursor
->key_beg
= mrec
->skip_end
;
505 error
= hammer_btree_lookup(cursor
);
506 cursor
->flags
&= ~HAMMER_CURSOR_ATEDISK
;
514 * Handle B-Tree records.
516 * We must iterate to mrec->base.key (non-inclusively), and then process
517 * the record. We are allowed to write a new record or delete an existing
518 * record, but cannot replace an existing record.
520 * mirror->key_cur must be carefully set when we succeed in processing
524 hammer_ioc_mirror_write_rec(hammer_cursor_t cursor
,
525 struct hammer_ioc_mrecord_rec
*mrec
,
526 struct hammer_ioc_mirror_rw
*mirror
,
527 u_int32_t localization
,
530 hammer_transaction_t trans
;
534 trans
= cursor
->trans
;
535 rec_crc
= crc32(mrec
, sizeof(*mrec
));
537 if (mrec
->leaf
.data_len
< 0 ||
538 mrec
->leaf
.data_len
> HAMMER_XBUFSIZE
||
539 mrec
->leaf
.data_len
+ sizeof(*mrec
) > mrec
->head
.rec_size
) {
544 * Re-localize for target. relocalization of data is handled
545 * by hammer_mirror_write().
547 mrec
->leaf
.base
.localization
&= HAMMER_LOCALIZE_MASK
;
548 mrec
->leaf
.base
.localization
+= localization
;
551 * Delete records through until we reach (non-inclusively) the
554 cursor
->key_end
= mrec
->leaf
.base
;
555 cursor
->flags
&= ~HAMMER_CURSOR_END_INCLUSIVE
;
556 cursor
->flags
|= HAMMER_CURSOR_BACKEND
;
557 error
= hammer_mirror_delete_to(cursor
, mirror
);
562 * If the record exists only the delete_tid may be updated.
564 * If the record does not exist we create it. For now we
565 * ignore records with a non-zero delete_tid. Note that
566 * mirror operations are effective an as-of operation and
567 * delete_tid can be 0 for mirroring purposes even if it is
568 * not actually 0 at the originator.
570 * These functions can return EDEADLK
572 cursor
->key_beg
= mrec
->leaf
.base
;
573 cursor
->flags
|= HAMMER_CURSOR_BACKEND
;
574 cursor
->flags
&= ~HAMMER_CURSOR_INSERT
;
575 error
= hammer_btree_lookup(cursor
);
577 if (error
== 0 && hammer_mirror_check(cursor
, mrec
)) {
578 error
= hammer_mirror_update(cursor
, mrec
);
579 } else if (error
== ENOENT
&& mrec
->leaf
.base
.delete_tid
== 0) {
580 error
= hammer_mirror_write(cursor
, mrec
, uptr
);
581 } else if (error
== ENOENT
) {
584 if (error
== 0 || error
== EALREADY
)
585 mirror
->key_cur
= mrec
->leaf
.base
;
590 * This works like write_rec but no write or update is necessary,
591 * and no data payload is included so we couldn't do a write even
594 * We must still iterate for deletions, and we can validate the
595 * record header which is a good way to test for corrupted mirror
598 * mirror->key_cur must be carefully set when we succeed in processing
603 hammer_ioc_mirror_write_pass(hammer_cursor_t cursor
,
604 struct hammer_ioc_mrecord_rec
*mrec
,
605 struct hammer_ioc_mirror_rw
*mirror
,
606 u_int32_t localization
)
608 hammer_transaction_t trans
;
612 trans
= cursor
->trans
;
613 rec_crc
= crc32(mrec
, sizeof(*mrec
));
616 * Re-localize for target. Relocalization of data is handled
617 * by hammer_mirror_write().
619 mrec
->leaf
.base
.localization
&= HAMMER_LOCALIZE_MASK
;
620 mrec
->leaf
.base
.localization
+= localization
;
623 * Delete records through until we reach (non-inclusively) the
626 cursor
->key_end
= mrec
->leaf
.base
;
627 cursor
->flags
&= ~HAMMER_CURSOR_END_INCLUSIVE
;
628 cursor
->flags
|= HAMMER_CURSOR_BACKEND
;
630 error
= hammer_mirror_delete_to(cursor
, mirror
);
633 * Locate the record and get past it by setting ATEDISK.
636 mirror
->key_cur
= mrec
->leaf
.base
;
637 cursor
->key_beg
= mrec
->leaf
.base
;
638 cursor
->flags
|= HAMMER_CURSOR_BACKEND
;
639 cursor
->flags
&= ~HAMMER_CURSOR_INSERT
;
640 error
= hammer_btree_lookup(cursor
);
642 cursor
->flags
|= HAMMER_CURSOR_ATEDISK
;
644 cursor
->flags
&= ~HAMMER_CURSOR_ATEDISK
;
652 * As part of the mirror write we iterate across swaths of records
653 * on the target which no longer exist on the source, and mark them
656 * The caller has indexed the cursor and set up key_end. We iterate
657 * through to key_end.
661 hammer_mirror_delete_to(hammer_cursor_t cursor
,
662 struct hammer_ioc_mirror_rw
*mirror
)
664 hammer_btree_leaf_elm_t elm
;
667 error
= hammer_btree_iterate(cursor
);
669 elm
= &cursor
->node
->ondisk
->elms
[cursor
->index
].leaf
;
670 KKASSERT(elm
->base
.btype
== HAMMER_BTREE_TYPE_RECORD
);
671 if (elm
->base
.delete_tid
== 0) {
672 error
= hammer_delete_at_cursor(cursor
,
673 HAMMER_DELETE_ADJUST
,
678 cursor
->flags
|= HAMMER_CURSOR_ATEDISK
;
681 error
= hammer_btree_iterate(cursor
);
689 * Check whether an update is needed in the case where a match already
690 * exists on the target. The only type of update allowed in this case
691 * is an update of the delete_tid.
693 * Return non-zero if the update should proceed.
697 hammer_mirror_check(hammer_cursor_t cursor
, struct hammer_ioc_mrecord_rec
*mrec
)
699 hammer_btree_leaf_elm_t leaf
= cursor
->leaf
;
701 if (leaf
->base
.delete_tid
!= mrec
->leaf
.base
.delete_tid
) {
702 if (mrec
->leaf
.base
.delete_tid
!= 0)
709 * Update a record in-place. Only the delete_tid can change, and
710 * only from zero to non-zero.
714 hammer_mirror_update(hammer_cursor_t cursor
,
715 struct hammer_ioc_mrecord_rec
*mrec
)
720 * This case shouldn't occur.
722 if (mrec
->leaf
.base
.delete_tid
== 0)
726 * Mark the record deleted on the mirror target.
728 error
= hammer_delete_at_cursor(cursor
, HAMMER_DELETE_ADJUST
,
729 mrec
->leaf
.base
.delete_tid
,
730 mrec
->leaf
.delete_ts
,
732 cursor
->flags
|= HAMMER_CURSOR_ATEDISK
;
737 * Write out a new record.
741 hammer_mirror_write(hammer_cursor_t cursor
,
742 struct hammer_ioc_mrecord_rec
*mrec
,
745 hammer_transaction_t trans
;
746 hammer_buffer_t data_buffer
;
747 hammer_off_t ndata_offset
;
748 hammer_tid_t high_tid
;
753 trans
= cursor
->trans
;
757 * Get the sync lock so the whole mess is atomic
759 hammer_sync_lock_sh(trans
);
762 * Allocate and adjust data
764 if (mrec
->leaf
.data_len
&& mrec
->leaf
.data_offset
) {
765 ndata
= hammer_alloc_data(trans
, mrec
->leaf
.data_len
,
766 mrec
->leaf
.base
.rec_type
,
767 &ndata_offset
, &data_buffer
, &error
);
770 mrec
->leaf
.data_offset
= ndata_offset
;
771 hammer_modify_buffer(trans
, data_buffer
, NULL
, 0);
772 error
= copyin(udata
, ndata
, mrec
->leaf
.data_len
);
774 if (hammer_crc_test_leaf(ndata
, &mrec
->leaf
) == 0) {
775 kprintf("data crc mismatch on pipe\n");
778 error
= hammer_mirror_localize_data(
782 hammer_modify_buffer_done(data_buffer
);
784 mrec
->leaf
.data_offset
= 0;
792 * Do the insertion. This can fail with a EDEADLK or EALREADY
794 cursor
->flags
|= HAMMER_CURSOR_INSERT
;
795 error
= hammer_btree_lookup(cursor
);
796 if (error
!= ENOENT
) {
802 error
= hammer_btree_insert(cursor
, &mrec
->leaf
, &doprop
);
805 * Cursor is left on the current element, we want to skip it now.
807 cursor
->flags
|= HAMMER_CURSOR_ATEDISK
;
808 cursor
->flags
&= ~HAMMER_CURSOR_INSERT
;
811 * Track a count of active inodes.
814 mrec
->leaf
.base
.rec_type
== HAMMER_RECTYPE_INODE
&&
815 mrec
->leaf
.base
.delete_tid
== 0) {
816 hammer_modify_volume_field(trans
,
819 ++trans
->hmp
->rootvol
->ondisk
->vol0_stat_inodes
;
820 hammer_modify_volume_done(trans
->rootvol
);
824 * vol0_next_tid must track the highest TID stored in the filesystem.
825 * We do not need to generate undo for this update.
827 high_tid
= mrec
->leaf
.base
.create_tid
;
828 if (high_tid
< mrec
->leaf
.base
.delete_tid
)
829 high_tid
= mrec
->leaf
.base
.delete_tid
;
830 if (trans
->rootvol
->ondisk
->vol0_next_tid
< high_tid
) {
831 hammer_modify_volume(trans
, trans
->rootvol
, NULL
, 0);
832 trans
->rootvol
->ondisk
->vol0_next_tid
= high_tid
;
833 hammer_modify_volume_done(trans
->rootvol
);
836 if (error
== 0 && doprop
)
837 hammer_btree_do_propagation(cursor
, NULL
, &mrec
->leaf
);
843 if (error
&& mrec
->leaf
.data_offset
) {
844 hammer_blockmap_free(cursor
->trans
,
845 mrec
->leaf
.data_offset
,
846 mrec
->leaf
.data_len
);
848 hammer_sync_unlock(trans
);
850 hammer_rel_buffer(data_buffer
, 0);
855 * Localize the data payload. Directory entries may need their
856 * localization adjusted.
858 * PFS directory entries must be skipped entirely (return EALREADY).
862 hammer_mirror_localize_data(hammer_data_ondisk_t data
,
863 hammer_btree_leaf_elm_t leaf
)
865 u_int32_t localization
;
867 if (leaf
->base
.rec_type
== HAMMER_RECTYPE_DIRENTRY
) {
868 if (data
->entry
.obj_id
== HAMMER_OBJID_ROOT
)
870 localization
= leaf
->base
.localization
&
871 HAMMER_LOCALIZE_PSEUDOFS_MASK
;
872 if (data
->entry
.localization
!= localization
) {
873 data
->entry
.localization
= localization
;
874 hammer_crc_set_leaf(data
, leaf
);