1 /*-------------------------------------------------------------------------
4 * WAL replay logic for btrees.
7 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/nbtree/nbtxlog.c
13 *-------------------------------------------------------------------------
17 #include "access/bufmask.h"
18 #include "access/nbtree.h"
19 #include "access/nbtxlog.h"
20 #include "access/transam.h"
21 #include "access/xlog.h"
22 #include "access/xlogutils.h"
23 #include "miscadmin.h"
24 #include "storage/procarray.h"
25 #include "utils/memutils.h"
27 static MemoryContext opCtx
; /* working memory for operations */
30 * _bt_restore_page -- re-enter all the index tuples on a page
32 * The page is freshly init'd, and *from (length len) is a copy of what
33 * had been its upper part (pd_upper to pd_special). We assume that the
34 * tuples had been added to the page in item-number order, and therefore
35 * the one with highest item number appears first (lowest on the page).
38 _bt_restore_page(Page page
, char *from
, int len
)
40 IndexTupleData itupdata
;
42 char *end
= from
+ len
;
43 Item items
[MaxIndexTuplesPerPage
];
44 uint16 itemsizes
[MaxIndexTuplesPerPage
];
49 * To get the items back in the original order, we add them to the page in
50 * reverse. To figure out where one tuple ends and another begins, we
51 * have to scan them in forward order first.
57 * As we step through the items, 'from' won't always be properly
58 * aligned, so we need to use memcpy(). Further, we use Item (which
59 * is just a char*) here for our items array for the same reason;
60 * wouldn't want the compiler or anyone thinking that an item is
61 * aligned when it isn't.
63 memcpy(&itupdata
, from
, sizeof(IndexTupleData
));
64 itemsz
= IndexTupleSize(&itupdata
);
65 itemsz
= MAXALIGN(itemsz
);
67 items
[i
] = (Item
) from
;
68 itemsizes
[i
] = itemsz
;
75 for (i
= nitems
- 1; i
>= 0; i
--)
77 if (PageAddItem(page
, items
[i
], itemsizes
[i
], nitems
- i
,
78 false, false) == InvalidOffsetNumber
)
79 elog(PANIC
, "_bt_restore_page: cannot add item to page");
84 _bt_restore_meta(XLogReaderState
*record
, uint8 block_id
)
86 XLogRecPtr lsn
= record
->EndRecPtr
;
91 xl_btree_metadata
*xlrec
;
95 metabuf
= XLogInitBufferForRedo(record
, block_id
);
96 ptr
= XLogRecGetBlockData(record
, block_id
, &len
);
98 Assert(len
== sizeof(xl_btree_metadata
));
99 Assert(BufferGetBlockNumber(metabuf
) == BTREE_METAPAGE
);
100 xlrec
= (xl_btree_metadata
*) ptr
;
101 metapg
= BufferGetPage(metabuf
);
103 _bt_pageinit(metapg
, BufferGetPageSize(metabuf
));
105 md
= BTPageGetMeta(metapg
);
106 md
->btm_magic
= BTREE_MAGIC
;
107 md
->btm_version
= xlrec
->version
;
108 md
->btm_root
= xlrec
->root
;
109 md
->btm_level
= xlrec
->level
;
110 md
->btm_fastroot
= xlrec
->fastroot
;
111 md
->btm_fastlevel
= xlrec
->fastlevel
;
112 /* Cannot log BTREE_MIN_VERSION index metapage without upgrade */
113 Assert(md
->btm_version
>= BTREE_NOVAC_VERSION
);
114 md
->btm_last_cleanup_num_delpages
= xlrec
->last_cleanup_num_delpages
;
115 md
->btm_last_cleanup_num_heap_tuples
= -1.0;
116 md
->btm_allequalimage
= xlrec
->allequalimage
;
118 pageop
= (BTPageOpaque
) PageGetSpecialPointer(metapg
);
119 pageop
->btpo_flags
= BTP_META
;
122 * Set pd_lower just past the end of the metadata. This is essential,
123 * because without doing so, metadata will be lost if xlog.c compresses
126 ((PageHeader
) metapg
)->pd_lower
=
127 ((char *) md
+ sizeof(BTMetaPageData
)) - (char *) metapg
;
129 PageSetLSN(metapg
, lsn
);
130 MarkBufferDirty(metabuf
);
131 UnlockReleaseBuffer(metabuf
);
135 * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
137 * This is a common subroutine of the redo functions of all the WAL record
138 * types that can insert a downlink: insert, split, and newroot.
141 _bt_clear_incomplete_split(XLogReaderState
*record
, uint8 block_id
)
143 XLogRecPtr lsn
= record
->EndRecPtr
;
146 if (XLogReadBufferForRedo(record
, block_id
, &buf
) == BLK_NEEDS_REDO
)
148 Page page
= (Page
) BufferGetPage(buf
);
149 BTPageOpaque pageop
= (BTPageOpaque
) PageGetSpecialPointer(page
);
151 Assert(P_INCOMPLETE_SPLIT(pageop
));
152 pageop
->btpo_flags
&= ~BTP_INCOMPLETE_SPLIT
;
154 PageSetLSN(page
, lsn
);
155 MarkBufferDirty(buf
);
157 if (BufferIsValid(buf
))
158 UnlockReleaseBuffer(buf
);
162 btree_xlog_insert(bool isleaf
, bool ismeta
, bool posting
,
163 XLogReaderState
*record
)
165 XLogRecPtr lsn
= record
->EndRecPtr
;
166 xl_btree_insert
*xlrec
= (xl_btree_insert
*) XLogRecGetData(record
);
171 * Insertion to an internal page finishes an incomplete split at the child
172 * level. Clear the incomplete-split flag in the child. Note: during
173 * normal operation, the child and parent pages are locked at the same
174 * time (the locks are coupled), so that clearing the flag and inserting
175 * the downlink appear atomic to other backends. We don't bother with
176 * that during replay, because readers don't care about the
177 * incomplete-split flag and there cannot be updates happening.
180 _bt_clear_incomplete_split(record
, 1);
181 if (XLogReadBufferForRedo(record
, 0, &buffer
) == BLK_NEEDS_REDO
)
184 char *datapos
= XLogRecGetBlockData(record
, 0, &datalen
);
186 page
= BufferGetPage(buffer
);
190 /* Simple retail insertion */
191 if (PageAddItem(page
, (Item
) datapos
, datalen
, xlrec
->offnum
,
192 false, false) == InvalidOffsetNumber
)
193 elog(PANIC
, "failed to add new item");
204 * A posting list split occurred during leaf page insertion. WAL
205 * record data will start with an offset number representing the
206 * point in an existing posting list that a split occurs at.
208 * Use _bt_swap_posting() to repeat posting list split steps from
209 * primary. Note that newitem from WAL record is 'orignewitem',
210 * not the final version of newitem that is actually inserted on
213 postingoff
= *((uint16
*) datapos
);
214 datapos
+= sizeof(uint16
);
215 datalen
-= sizeof(uint16
);
217 itemid
= PageGetItemId(page
, OffsetNumberPrev(xlrec
->offnum
));
218 oposting
= (IndexTuple
) PageGetItem(page
, itemid
);
220 /* Use mutable, aligned newitem copy in _bt_swap_posting() */
221 Assert(isleaf
&& postingoff
> 0);
222 newitem
= CopyIndexTuple((IndexTuple
) datapos
);
223 nposting
= _bt_swap_posting(newitem
, oposting
, postingoff
);
225 /* Replace existing posting list with post-split version */
226 memcpy(oposting
, nposting
, MAXALIGN(IndexTupleSize(nposting
)));
228 /* Insert "final" new item (not orignewitem from WAL stream) */
229 Assert(IndexTupleSize(newitem
) == datalen
);
230 if (PageAddItem(page
, (Item
) newitem
, datalen
, xlrec
->offnum
,
231 false, false) == InvalidOffsetNumber
)
232 elog(PANIC
, "failed to add posting split new item");
235 PageSetLSN(page
, lsn
);
236 MarkBufferDirty(buffer
);
238 if (BufferIsValid(buffer
))
239 UnlockReleaseBuffer(buffer
);
242 * Note: in normal operation, we'd update the metapage while still holding
243 * lock on the page we inserted into. But during replay it's not
244 * necessary to hold that lock, since no other index updates can be
245 * happening concurrently, and readers will cope fine with following an
246 * obsolete link from the metapage.
249 _bt_restore_meta(record
, 2);
253 btree_xlog_split(bool newitemonleft
, XLogReaderState
*record
)
255 XLogRecPtr lsn
= record
->EndRecPtr
;
256 xl_btree_split
*xlrec
= (xl_btree_split
*) XLogRecGetData(record
);
257 bool isleaf
= (xlrec
->level
== 0);
261 BTPageOpaque ropaque
;
264 BlockNumber origpagenumber
;
265 BlockNumber rightpagenumber
;
266 BlockNumber spagenumber
;
268 XLogRecGetBlockTag(record
, 0, NULL
, NULL
, &origpagenumber
);
269 XLogRecGetBlockTag(record
, 1, NULL
, NULL
, &rightpagenumber
);
270 if (!XLogRecGetBlockTag(record
, 2, NULL
, NULL
, &spagenumber
))
271 spagenumber
= P_NONE
;
274 * Clear the incomplete split flag on the appropriate child page one level
275 * down when origpage/buf is an internal page (there must have been
276 * cascading page splits during original execution in the event of an
277 * internal page split). This is like the corresponding btree_xlog_insert
278 * call for internal pages. We're not clearing the incomplete split flag
279 * for the current page split here (you can think of this as part of the
280 * insert of newitem that the page split action needs to perform in
283 * Like in btree_xlog_insert, this can be done before locking other pages.
284 * We never need to couple cross-level locks in REDO routines.
287 _bt_clear_incomplete_split(record
, 3);
289 /* Reconstruct right (new) sibling page from scratch */
290 rbuf
= XLogInitBufferForRedo(record
, 1);
291 datapos
= XLogRecGetBlockData(record
, 1, &datalen
);
292 rpage
= (Page
) BufferGetPage(rbuf
);
294 _bt_pageinit(rpage
, BufferGetPageSize(rbuf
));
295 ropaque
= (BTPageOpaque
) PageGetSpecialPointer(rpage
);
297 ropaque
->btpo_prev
= origpagenumber
;
298 ropaque
->btpo_next
= spagenumber
;
299 ropaque
->btpo_level
= xlrec
->level
;
300 ropaque
->btpo_flags
= isleaf
? BTP_LEAF
: 0;
301 ropaque
->btpo_cycleid
= 0;
303 _bt_restore_page(rpage
, datapos
, datalen
);
305 PageSetLSN(rpage
, lsn
);
306 MarkBufferDirty(rbuf
);
308 /* Now reconstruct original page (left half of split) */
309 if (XLogReadBufferForRedo(record
, 0, &buf
) == BLK_NEEDS_REDO
)
312 * To retain the same physical order of the tuples that they had, we
313 * initialize a temporary empty page for the left page and add all the
314 * items to that in item number order. This mirrors how _bt_split()
315 * works. Retaining the same physical order makes WAL consistency
316 * checking possible. See also _bt_restore_page(), which does the
317 * same for the right page.
319 Page origpage
= (Page
) BufferGetPage(buf
);
320 BTPageOpaque oopaque
= (BTPageOpaque
) PageGetSpecialPointer(origpage
);
322 IndexTuple newitem
= NULL
,
328 OffsetNumber leftoff
,
329 replacepostingoff
= InvalidOffsetNumber
;
331 datapos
= XLogRecGetBlockData(record
, 0, &datalen
);
333 if (newitemonleft
|| xlrec
->postingoff
!= 0)
335 newitem
= (IndexTuple
) datapos
;
336 newitemsz
= MAXALIGN(IndexTupleSize(newitem
));
337 datapos
+= newitemsz
;
338 datalen
-= newitemsz
;
340 if (xlrec
->postingoff
!= 0)
345 /* Posting list must be at offset number before new item's */
346 replacepostingoff
= OffsetNumberPrev(xlrec
->newitemoff
);
348 /* Use mutable, aligned newitem copy in _bt_swap_posting() */
349 newitem
= CopyIndexTuple(newitem
);
350 itemid
= PageGetItemId(origpage
, replacepostingoff
);
351 oposting
= (IndexTuple
) PageGetItem(origpage
, itemid
);
352 nposting
= _bt_swap_posting(newitem
, oposting
,
358 * Extract left hikey and its size. We assume that 16-bit alignment
359 * is enough to apply IndexTupleSize (since it's fetching from a
362 left_hikey
= (IndexTuple
) datapos
;
363 left_hikeysz
= MAXALIGN(IndexTupleSize(left_hikey
));
364 datapos
+= left_hikeysz
;
365 datalen
-= left_hikeysz
;
367 Assert(datalen
== 0);
369 leftpage
= PageGetTempPageCopySpecial(origpage
);
371 /* Add high key tuple from WAL record to temp page */
373 if (PageAddItem(leftpage
, (Item
) left_hikey
, left_hikeysz
, P_HIKEY
,
374 false, false) == InvalidOffsetNumber
)
375 elog(ERROR
, "failed to add high key to left page after split");
376 leftoff
= OffsetNumberNext(leftoff
);
378 for (off
= P_FIRSTDATAKEY(oopaque
); off
< xlrec
->firstrightoff
; off
++)
384 /* Add replacement posting list when required */
385 if (off
== replacepostingoff
)
387 Assert(newitemonleft
||
388 xlrec
->firstrightoff
== xlrec
->newitemoff
);
389 if (PageAddItem(leftpage
, (Item
) nposting
,
390 MAXALIGN(IndexTupleSize(nposting
)), leftoff
,
391 false, false) == InvalidOffsetNumber
)
392 elog(ERROR
, "failed to add new posting list item to left page after split");
393 leftoff
= OffsetNumberNext(leftoff
);
394 continue; /* don't insert oposting */
397 /* add the new item if it was inserted on left page */
398 else if (newitemonleft
&& off
== xlrec
->newitemoff
)
400 if (PageAddItem(leftpage
, (Item
) newitem
, newitemsz
, leftoff
,
401 false, false) == InvalidOffsetNumber
)
402 elog(ERROR
, "failed to add new item to left page after split");
403 leftoff
= OffsetNumberNext(leftoff
);
406 itemid
= PageGetItemId(origpage
, off
);
407 itemsz
= ItemIdGetLength(itemid
);
408 item
= (IndexTuple
) PageGetItem(origpage
, itemid
);
409 if (PageAddItem(leftpage
, (Item
) item
, itemsz
, leftoff
,
410 false, false) == InvalidOffsetNumber
)
411 elog(ERROR
, "failed to add old item to left page after split");
412 leftoff
= OffsetNumberNext(leftoff
);
415 /* cope with possibility that newitem goes at the end */
416 if (newitemonleft
&& off
== xlrec
->newitemoff
)
418 if (PageAddItem(leftpage
, (Item
) newitem
, newitemsz
, leftoff
,
419 false, false) == InvalidOffsetNumber
)
420 elog(ERROR
, "failed to add new item to left page after split");
421 leftoff
= OffsetNumberNext(leftoff
);
424 PageRestoreTempPage(leftpage
, origpage
);
426 /* Fix opaque fields */
427 oopaque
->btpo_flags
= BTP_INCOMPLETE_SPLIT
;
429 oopaque
->btpo_flags
|= BTP_LEAF
;
430 oopaque
->btpo_next
= rightpagenumber
;
431 oopaque
->btpo_cycleid
= 0;
433 PageSetLSN(origpage
, lsn
);
434 MarkBufferDirty(buf
);
437 /* Fix left-link of the page to the right of the new right sibling */
438 if (spagenumber
!= P_NONE
)
442 if (XLogReadBufferForRedo(record
, 2, &sbuf
) == BLK_NEEDS_REDO
)
444 Page spage
= (Page
) BufferGetPage(sbuf
);
445 BTPageOpaque spageop
= (BTPageOpaque
) PageGetSpecialPointer(spage
);
447 spageop
->btpo_prev
= rightpagenumber
;
449 PageSetLSN(spage
, lsn
);
450 MarkBufferDirty(sbuf
);
452 if (BufferIsValid(sbuf
))
453 UnlockReleaseBuffer(sbuf
);
457 * Finally, release the remaining buffers. sbuf, rbuf, and buf must be
458 * released together, so that readers cannot observe inconsistencies.
460 UnlockReleaseBuffer(rbuf
);
461 if (BufferIsValid(buf
))
462 UnlockReleaseBuffer(buf
);
466 btree_xlog_dedup(XLogReaderState
*record
)
468 XLogRecPtr lsn
= record
->EndRecPtr
;
469 xl_btree_dedup
*xlrec
= (xl_btree_dedup
*) XLogRecGetData(record
);
472 if (XLogReadBufferForRedo(record
, 0, &buf
) == BLK_NEEDS_REDO
)
474 char *ptr
= XLogRecGetBlockData(record
, 0, NULL
);
475 Page page
= (Page
) BufferGetPage(buf
);
476 BTPageOpaque opaque
= (BTPageOpaque
) PageGetSpecialPointer(page
);
481 BTDedupInterval
*intervals
;
484 state
= (BTDedupState
) palloc(sizeof(BTDedupStateData
));
485 state
->deduplicate
= true; /* unused */
486 state
->nmaxitems
= 0; /* unused */
487 /* Conservatively use larger maxpostingsize than primary */
488 state
->maxpostingsize
= BTMaxItemSize(page
);
490 state
->baseoff
= InvalidOffsetNumber
;
491 state
->basetupsize
= 0;
492 state
->htids
= palloc(state
->maxpostingsize
);
495 state
->phystupsize
= 0;
496 state
->nintervals
= 0;
498 minoff
= P_FIRSTDATAKEY(opaque
);
499 maxoff
= PageGetMaxOffsetNumber(page
);
500 newpage
= PageGetTempPageCopySpecial(page
);
502 if (!P_RIGHTMOST(opaque
))
504 ItemId itemid
= PageGetItemId(page
, P_HIKEY
);
505 Size itemsz
= ItemIdGetLength(itemid
);
506 IndexTuple item
= (IndexTuple
) PageGetItem(page
, itemid
);
508 if (PageAddItem(newpage
, (Item
) item
, itemsz
, P_HIKEY
,
509 false, false) == InvalidOffsetNumber
)
510 elog(ERROR
, "deduplication failed to add highkey");
513 intervals
= (BTDedupInterval
*) ptr
;
514 for (offnum
= minoff
;
516 offnum
= OffsetNumberNext(offnum
))
518 ItemId itemid
= PageGetItemId(page
, offnum
);
519 IndexTuple itup
= (IndexTuple
) PageGetItem(page
, itemid
);
521 if (offnum
== minoff
)
522 _bt_dedup_start_pending(state
, itup
, offnum
);
523 else if (state
->nintervals
< xlrec
->nintervals
&&
524 state
->baseoff
== intervals
[state
->nintervals
].baseoff
&&
525 state
->nitems
< intervals
[state
->nintervals
].nitems
)
527 if (!_bt_dedup_save_htid(state
, itup
))
528 elog(ERROR
, "deduplication failed to add heap tid to pending posting list");
532 _bt_dedup_finish_pending(newpage
, state
);
533 _bt_dedup_start_pending(state
, itup
, offnum
);
537 _bt_dedup_finish_pending(newpage
, state
);
538 Assert(state
->nintervals
== xlrec
->nintervals
);
539 Assert(memcmp(state
->intervals
, intervals
,
540 state
->nintervals
* sizeof(BTDedupInterval
)) == 0);
542 if (P_HAS_GARBAGE(opaque
))
544 BTPageOpaque nopaque
= (BTPageOpaque
) PageGetSpecialPointer(newpage
);
546 nopaque
->btpo_flags
&= ~BTP_HAS_GARBAGE
;
549 PageRestoreTempPage(newpage
, page
);
550 PageSetLSN(page
, lsn
);
551 MarkBufferDirty(buf
);
554 if (BufferIsValid(buf
))
555 UnlockReleaseBuffer(buf
);
559 btree_xlog_updates(Page page
, OffsetNumber
*updatedoffsets
,
560 xl_btree_update
*updates
, int nupdated
)
562 BTVacuumPosting vacposting
;
563 IndexTuple origtuple
;
567 for (int i
= 0; i
< nupdated
; i
++)
569 itemid
= PageGetItemId(page
, updatedoffsets
[i
]);
570 origtuple
= (IndexTuple
) PageGetItem(page
, itemid
);
572 vacposting
= palloc(offsetof(BTVacuumPostingData
, deletetids
) +
573 updates
->ndeletedtids
* sizeof(uint16
));
574 vacposting
->updatedoffset
= updatedoffsets
[i
];
575 vacposting
->itup
= origtuple
;
576 vacposting
->ndeletedtids
= updates
->ndeletedtids
;
577 memcpy(vacposting
->deletetids
,
578 (char *) updates
+ SizeOfBtreeUpdate
,
579 updates
->ndeletedtids
* sizeof(uint16
));
581 _bt_update_posting(vacposting
);
583 /* Overwrite updated version of tuple */
584 itemsz
= MAXALIGN(IndexTupleSize(vacposting
->itup
));
585 if (!PageIndexTupleOverwrite(page
, updatedoffsets
[i
],
586 (Item
) vacposting
->itup
, itemsz
))
587 elog(PANIC
, "failed to update partially dead item");
589 pfree(vacposting
->itup
);
592 /* advance to next xl_btree_update from array */
593 updates
= (xl_btree_update
*)
594 ((char *) updates
+ SizeOfBtreeUpdate
+
595 updates
->ndeletedtids
* sizeof(uint16
));
600 btree_xlog_vacuum(XLogReaderState
*record
)
602 XLogRecPtr lsn
= record
->EndRecPtr
;
603 xl_btree_vacuum
*xlrec
= (xl_btree_vacuum
*) XLogRecGetData(record
);
609 * We need to take a cleanup lock here, just like btvacuumpage(). However,
610 * it isn't necessary to exhaustively get a cleanup lock on every block in
611 * the index during recovery (just getting a cleanup lock on pages with
612 * items to kill suffices). See nbtree/README for details.
614 if (XLogReadBufferForRedoExtended(record
, 0, RBM_NORMAL
, true, &buffer
)
617 char *ptr
= XLogRecGetBlockData(record
, 0, NULL
);
619 page
= (Page
) BufferGetPage(buffer
);
621 if (xlrec
->nupdated
> 0)
623 OffsetNumber
*updatedoffsets
;
624 xl_btree_update
*updates
;
626 updatedoffsets
= (OffsetNumber
*)
627 (ptr
+ xlrec
->ndeleted
* sizeof(OffsetNumber
));
628 updates
= (xl_btree_update
*) ((char *) updatedoffsets
+
630 sizeof(OffsetNumber
));
632 btree_xlog_updates(page
, updatedoffsets
, updates
, xlrec
->nupdated
);
635 if (xlrec
->ndeleted
> 0)
636 PageIndexMultiDelete(page
, (OffsetNumber
*) ptr
, xlrec
->ndeleted
);
639 * Mark the page as not containing any LP_DEAD items --- see comments
640 * in _bt_delitems_vacuum().
642 opaque
= (BTPageOpaque
) PageGetSpecialPointer(page
);
643 opaque
->btpo_flags
&= ~BTP_HAS_GARBAGE
;
645 PageSetLSN(page
, lsn
);
646 MarkBufferDirty(buffer
);
648 if (BufferIsValid(buffer
))
649 UnlockReleaseBuffer(buffer
);
653 btree_xlog_delete(XLogReaderState
*record
)
655 XLogRecPtr lsn
= record
->EndRecPtr
;
656 xl_btree_delete
*xlrec
= (xl_btree_delete
*) XLogRecGetData(record
);
662 * If we have any conflict processing to do, it must happen before we
669 XLogRecGetBlockTag(record
, 0, &rnode
, NULL
, NULL
);
671 ResolveRecoveryConflictWithSnapshot(xlrec
->latestRemovedXid
, rnode
);
675 * We don't need to take a cleanup lock to apply these changes. See
676 * nbtree/README for details.
678 if (XLogReadBufferForRedo(record
, 0, &buffer
) == BLK_NEEDS_REDO
)
680 char *ptr
= XLogRecGetBlockData(record
, 0, NULL
);
682 page
= (Page
) BufferGetPage(buffer
);
684 if (xlrec
->nupdated
> 0)
686 OffsetNumber
*updatedoffsets
;
687 xl_btree_update
*updates
;
689 updatedoffsets
= (OffsetNumber
*)
690 (ptr
+ xlrec
->ndeleted
* sizeof(OffsetNumber
));
691 updates
= (xl_btree_update
*) ((char *) updatedoffsets
+
693 sizeof(OffsetNumber
));
695 btree_xlog_updates(page
, updatedoffsets
, updates
, xlrec
->nupdated
);
698 if (xlrec
->ndeleted
> 0)
699 PageIndexMultiDelete(page
, (OffsetNumber
*) ptr
, xlrec
->ndeleted
);
701 /* Mark the page as not containing any LP_DEAD items */
702 opaque
= (BTPageOpaque
) PageGetSpecialPointer(page
);
703 opaque
->btpo_flags
&= ~BTP_HAS_GARBAGE
;
705 PageSetLSN(page
, lsn
);
706 MarkBufferDirty(buffer
);
708 if (BufferIsValid(buffer
))
709 UnlockReleaseBuffer(buffer
);
713 btree_xlog_mark_page_halfdead(uint8 info
, XLogReaderState
*record
)
715 XLogRecPtr lsn
= record
->EndRecPtr
;
716 xl_btree_mark_page_halfdead
*xlrec
= (xl_btree_mark_page_halfdead
*) XLogRecGetData(record
);
720 IndexTupleData trunctuple
;
723 * In normal operation, we would lock all the pages this WAL record
724 * touches before changing any of them. In WAL replay, it should be okay
725 * to lock just one page at a time, since no concurrent index updates can
726 * be happening, and readers should not care whether they arrive at the
727 * target page or not (since it's surely empty).
730 /* to-be-deleted subtree's parent page */
731 if (XLogReadBufferForRedo(record
, 1, &buffer
) == BLK_NEEDS_REDO
)
733 OffsetNumber poffset
;
736 OffsetNumber nextoffset
;
737 BlockNumber rightsib
;
739 page
= (Page
) BufferGetPage(buffer
);
740 pageop
= (BTPageOpaque
) PageGetSpecialPointer(page
);
742 poffset
= xlrec
->poffset
;
744 nextoffset
= OffsetNumberNext(poffset
);
745 itemid
= PageGetItemId(page
, nextoffset
);
746 itup
= (IndexTuple
) PageGetItem(page
, itemid
);
747 rightsib
= BTreeTupleGetDownLink(itup
);
749 itemid
= PageGetItemId(page
, poffset
);
750 itup
= (IndexTuple
) PageGetItem(page
, itemid
);
751 BTreeTupleSetDownLink(itup
, rightsib
);
752 nextoffset
= OffsetNumberNext(poffset
);
753 PageIndexTupleDelete(page
, nextoffset
);
755 PageSetLSN(page
, lsn
);
756 MarkBufferDirty(buffer
);
760 * Don't need to couple cross-level locks in REDO routines, so release
761 * lock on internal page immediately
763 if (BufferIsValid(buffer
))
764 UnlockReleaseBuffer(buffer
);
766 /* Rewrite the leaf page as a halfdead page */
767 buffer
= XLogInitBufferForRedo(record
, 0);
768 page
= (Page
) BufferGetPage(buffer
);
770 _bt_pageinit(page
, BufferGetPageSize(buffer
));
771 pageop
= (BTPageOpaque
) PageGetSpecialPointer(page
);
773 pageop
->btpo_prev
= xlrec
->leftblk
;
774 pageop
->btpo_next
= xlrec
->rightblk
;
775 pageop
->btpo_level
= 0;
776 pageop
->btpo_flags
= BTP_HALF_DEAD
| BTP_LEAF
;
777 pageop
->btpo_cycleid
= 0;
780 * Construct a dummy high key item that points to top parent page (value
781 * is InvalidBlockNumber when the top parent page is the leaf page itself)
783 MemSet(&trunctuple
, 0, sizeof(IndexTupleData
));
784 trunctuple
.t_info
= sizeof(IndexTupleData
);
785 BTreeTupleSetTopParent(&trunctuple
, xlrec
->topparent
);
787 if (PageAddItem(page
, (Item
) &trunctuple
, sizeof(IndexTupleData
), P_HIKEY
,
788 false, false) == InvalidOffsetNumber
)
789 elog(ERROR
, "could not add dummy high key to half-dead page");
791 PageSetLSN(page
, lsn
);
792 MarkBufferDirty(buffer
);
793 UnlockReleaseBuffer(buffer
);
798 btree_xlog_unlink_page(uint8 info
, XLogReaderState
*record
)
800 XLogRecPtr lsn
= record
->EndRecPtr
;
801 xl_btree_unlink_page
*xlrec
= (xl_btree_unlink_page
*) XLogRecGetData(record
);
803 BlockNumber rightsib
;
806 FullTransactionId safexid
;
813 leftsib
= xlrec
->leftsib
;
814 rightsib
= xlrec
->rightsib
;
815 level
= xlrec
->level
;
816 isleaf
= (level
== 0);
817 safexid
= xlrec
->safexid
;
819 /* No leaftopparent for level 0 (leaf page) or level 1 target */
820 Assert(!BlockNumberIsValid(xlrec
->leaftopparent
) || level
> 1);
823 * In normal operation, we would lock all the pages this WAL record
824 * touches before changing any of them. In WAL replay, we at least lock
825 * the pages in the same standard left-to-right order (leftsib, target,
826 * rightsib), and don't release the sibling locks until the target is
830 /* Fix right-link of left sibling, if any */
831 if (leftsib
!= P_NONE
)
833 if (XLogReadBufferForRedo(record
, 1, &leftbuf
) == BLK_NEEDS_REDO
)
835 page
= (Page
) BufferGetPage(leftbuf
);
836 pageop
= (BTPageOpaque
) PageGetSpecialPointer(page
);
837 pageop
->btpo_next
= rightsib
;
839 PageSetLSN(page
, lsn
);
840 MarkBufferDirty(leftbuf
);
844 leftbuf
= InvalidBuffer
;
846 /* Rewrite target page as empty deleted page */
847 target
= XLogInitBufferForRedo(record
, 0);
848 page
= (Page
) BufferGetPage(target
);
850 _bt_pageinit(page
, BufferGetPageSize(target
));
851 pageop
= (BTPageOpaque
) PageGetSpecialPointer(page
);
853 pageop
->btpo_prev
= leftsib
;
854 pageop
->btpo_next
= rightsib
;
855 pageop
->btpo_level
= level
;
856 BTPageSetDeleted(page
, safexid
);
858 pageop
->btpo_flags
|= BTP_LEAF
;
859 pageop
->btpo_cycleid
= 0;
861 PageSetLSN(page
, lsn
);
862 MarkBufferDirty(target
);
864 /* Fix left-link of right sibling */
865 if (XLogReadBufferForRedo(record
, 2, &rightbuf
) == BLK_NEEDS_REDO
)
867 page
= (Page
) BufferGetPage(rightbuf
);
868 pageop
= (BTPageOpaque
) PageGetSpecialPointer(page
);
869 pageop
->btpo_prev
= leftsib
;
871 PageSetLSN(page
, lsn
);
872 MarkBufferDirty(rightbuf
);
875 /* Release siblings */
876 if (BufferIsValid(leftbuf
))
877 UnlockReleaseBuffer(leftbuf
);
878 if (BufferIsValid(rightbuf
))
879 UnlockReleaseBuffer(rightbuf
);
882 UnlockReleaseBuffer(target
);
885 * If we deleted a parent of the targeted leaf page, instead of the leaf
886 * itself, update the leaf to point to the next remaining child in the
887 * to-be-deleted subtree
889 if (XLogRecHasBlockRef(record
, 3))
892 * There is no real data on the page, so we just re-create it from
893 * scratch using the information from the WAL record.
895 * Note that we don't end up here when the target page is also the
896 * leafbuf page. There is no need to add a dummy hikey item with a
897 * top parent link when deleting leafbuf because it's the last page
898 * we'll delete in the subtree undergoing deletion.
901 IndexTupleData trunctuple
;
905 leafbuf
= XLogInitBufferForRedo(record
, 3);
906 page
= (Page
) BufferGetPage(leafbuf
);
908 _bt_pageinit(page
, BufferGetPageSize(leafbuf
));
909 pageop
= (BTPageOpaque
) PageGetSpecialPointer(page
);
911 pageop
->btpo_flags
= BTP_HALF_DEAD
| BTP_LEAF
;
912 pageop
->btpo_prev
= xlrec
->leafleftsib
;
913 pageop
->btpo_next
= xlrec
->leafrightsib
;
914 pageop
->btpo_level
= 0;
915 pageop
->btpo_cycleid
= 0;
917 /* Add a dummy hikey item */
918 MemSet(&trunctuple
, 0, sizeof(IndexTupleData
));
919 trunctuple
.t_info
= sizeof(IndexTupleData
);
920 BTreeTupleSetTopParent(&trunctuple
, xlrec
->leaftopparent
);
922 if (PageAddItem(page
, (Item
) &trunctuple
, sizeof(IndexTupleData
), P_HIKEY
,
923 false, false) == InvalidOffsetNumber
)
924 elog(ERROR
, "could not add dummy high key to half-dead page");
926 PageSetLSN(page
, lsn
);
927 MarkBufferDirty(leafbuf
);
928 UnlockReleaseBuffer(leafbuf
);
931 /* Update metapage if needed */
932 if (info
== XLOG_BTREE_UNLINK_PAGE_META
)
933 _bt_restore_meta(record
, 4);
937 btree_xlog_newroot(XLogReaderState
*record
)
939 XLogRecPtr lsn
= record
->EndRecPtr
;
940 xl_btree_newroot
*xlrec
= (xl_btree_newroot
*) XLogRecGetData(record
);
947 buffer
= XLogInitBufferForRedo(record
, 0);
948 page
= (Page
) BufferGetPage(buffer
);
950 _bt_pageinit(page
, BufferGetPageSize(buffer
));
951 pageop
= (BTPageOpaque
) PageGetSpecialPointer(page
);
953 pageop
->btpo_flags
= BTP_ROOT
;
954 pageop
->btpo_prev
= pageop
->btpo_next
= P_NONE
;
955 pageop
->btpo_level
= xlrec
->level
;
956 if (xlrec
->level
== 0)
957 pageop
->btpo_flags
|= BTP_LEAF
;
958 pageop
->btpo_cycleid
= 0;
960 if (xlrec
->level
> 0)
962 ptr
= XLogRecGetBlockData(record
, 0, &len
);
963 _bt_restore_page(page
, ptr
, len
);
965 /* Clear the incomplete-split flag in left child */
966 _bt_clear_incomplete_split(record
, 1);
969 PageSetLSN(page
, lsn
);
970 MarkBufferDirty(buffer
);
971 UnlockReleaseBuffer(buffer
);
973 _bt_restore_meta(record
, 2);
977 * In general VACUUM must defer recycling as a way of avoiding certain race
978 * conditions. Deleted pages contain a safexid value that is used by VACUUM
979 * to determine whether or not it's safe to place a page that was deleted by
980 * VACUUM earlier into the FSM now. See nbtree/README.
982 * As far as any backend operating during original execution is concerned, the
983 * FSM is a cache of recycle-safe pages; the mere presence of the page in the
984 * FSM indicates that the page must already be safe to recycle (actually,
985 * _bt_getbuf() verifies it's safe using BTPageIsRecyclable(), but that's just
986 * because it would be unwise to completely trust the FSM, given its current
989 * This isn't sufficient to prevent similar concurrent recycling race
990 * conditions during Hot Standby, though. For that we need to log a
991 * xl_btree_reuse_page record at the point that a page is actually recycled
992 * and reused for an entirely unrelated page inside _bt_split(). These
993 * records include the same safexid value from the original deleted page,
994 * stored in the record's latestRemovedFullXid field.
996 * The GlobalVisCheckRemovableFullXid() test in BTPageIsRecyclable() is used
997 * to determine if it's safe to recycle a page. This mirrors our own test:
998 * the PGPROC->xmin > limitXmin test inside GetConflictingVirtualXIDs().
999 * Consequently, one XID value achieves the same exclusion effect on primary
1003 btree_xlog_reuse_page(XLogReaderState
*record
)
1005 xl_btree_reuse_page
*xlrec
= (xl_btree_reuse_page
*) XLogRecGetData(record
);
1008 ResolveRecoveryConflictWithSnapshotFullXid(xlrec
->latestRemovedFullXid
,
1013 btree_redo(XLogReaderState
*record
)
1015 uint8 info
= XLogRecGetInfo(record
) & ~XLR_INFO_MASK
;
1016 MemoryContext oldCtx
;
1018 oldCtx
= MemoryContextSwitchTo(opCtx
);
1021 case XLOG_BTREE_INSERT_LEAF
:
1022 btree_xlog_insert(true, false, false, record
);
1024 case XLOG_BTREE_INSERT_UPPER
:
1025 btree_xlog_insert(false, false, false, record
);
1027 case XLOG_BTREE_INSERT_META
:
1028 btree_xlog_insert(false, true, false, record
);
1030 case XLOG_BTREE_SPLIT_L
:
1031 btree_xlog_split(true, record
);
1033 case XLOG_BTREE_SPLIT_R
:
1034 btree_xlog_split(false, record
);
1036 case XLOG_BTREE_INSERT_POST
:
1037 btree_xlog_insert(true, false, true, record
);
1039 case XLOG_BTREE_DEDUP
:
1040 btree_xlog_dedup(record
);
1042 case XLOG_BTREE_VACUUM
:
1043 btree_xlog_vacuum(record
);
1045 case XLOG_BTREE_DELETE
:
1046 btree_xlog_delete(record
);
1048 case XLOG_BTREE_MARK_PAGE_HALFDEAD
:
1049 btree_xlog_mark_page_halfdead(info
, record
);
1051 case XLOG_BTREE_UNLINK_PAGE
:
1052 case XLOG_BTREE_UNLINK_PAGE_META
:
1053 btree_xlog_unlink_page(info
, record
);
1055 case XLOG_BTREE_NEWROOT
:
1056 btree_xlog_newroot(record
);
1058 case XLOG_BTREE_REUSE_PAGE
:
1059 btree_xlog_reuse_page(record
);
1061 case XLOG_BTREE_META_CLEANUP
:
1062 _bt_restore_meta(record
, 0);
1065 elog(PANIC
, "btree_redo: unknown op code %u", info
);
1067 MemoryContextSwitchTo(oldCtx
);
1068 MemoryContextReset(opCtx
);
1072 btree_xlog_startup(void)
1074 opCtx
= AllocSetContextCreate(CurrentMemoryContext
,
1075 "Btree recovery temporary context",
1076 ALLOCSET_DEFAULT_SIZES
);
1080 btree_xlog_cleanup(void)
1082 MemoryContextDelete(opCtx
);
1087 * Mask a btree page before performing consistency checks on it.
1090 btree_mask(char *pagedata
, BlockNumber blkno
)
1092 Page page
= (Page
) pagedata
;
1093 BTPageOpaque maskopaq
;
1095 mask_page_lsn_and_checksum(page
);
1097 mask_page_hint_bits(page
);
1098 mask_unused_space(page
);
1100 maskopaq
= (BTPageOpaque
) PageGetSpecialPointer(page
);
1102 if (P_ISLEAF(maskopaq
))
1105 * In btree leaf pages, it is possible to modify the LP_FLAGS without
1106 * emitting any WAL record. Hence, mask the line pointer flags. See
1107 * _bt_killitems(), _bt_check_unique() for details.
1109 mask_lp_flags(page
);
1113 * BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
1114 * _bt_delete_or_dedup_one_page(), _bt_killitems(), and _bt_check_unique()
1117 maskopaq
->btpo_flags
&= ~BTP_HAS_GARBAGE
;
1120 * During replay of a btree page split, we don't set the BTP_SPLIT_END
1121 * flag of the right sibling and initialize the cycle_id to 0 for the same
1122 * page. See btree_xlog_split() for details.
1124 maskopaq
->btpo_flags
&= ~BTP_SPLIT_END
;
1125 maskopaq
->btpo_cycleid
= 0;