Update copyright for 2022
[pgsql.git] / src / backend / access / nbtree / nbtxlog.c
blob611f412ba8a5b1331f9a0945367302063813e299
1 /*-------------------------------------------------------------------------
3 * nbtxlog.c
4 * WAL replay logic for btrees.
7 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/access/nbtree/nbtxlog.c
13 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/bufmask.h"
18 #include "access/nbtree.h"
19 #include "access/nbtxlog.h"
20 #include "access/transam.h"
21 #include "access/xlog.h"
22 #include "access/xlogutils.h"
23 #include "miscadmin.h"
24 #include "storage/procarray.h"
25 #include "utils/memutils.h"
27 static MemoryContext opCtx; /* working memory for operations */
30 * _bt_restore_page -- re-enter all the index tuples on a page
32 * The page is freshly init'd, and *from (length len) is a copy of what
33 * had been its upper part (pd_upper to pd_special). We assume that the
34 * tuples had been added to the page in item-number order, and therefore
35 * the one with highest item number appears first (lowest on the page).
37 static void
38 _bt_restore_page(Page page, char *from, int len)
40 IndexTupleData itupdata;
41 Size itemsz;
42 char *end = from + len;
43 Item items[MaxIndexTuplesPerPage];
44 uint16 itemsizes[MaxIndexTuplesPerPage];
45 int i;
46 int nitems;
49 * To get the items back in the original order, we add them to the page in
50 * reverse. To figure out where one tuple ends and another begins, we
51 * have to scan them in forward order first.
53 i = 0;
54 while (from < end)
57 * As we step through the items, 'from' won't always be properly
58 * aligned, so we need to use memcpy(). Further, we use Item (which
59 * is just a char*) here for our items array for the same reason;
60 * wouldn't want the compiler or anyone thinking that an item is
61 * aligned when it isn't.
63 memcpy(&itupdata, from, sizeof(IndexTupleData));
64 itemsz = IndexTupleSize(&itupdata);
65 itemsz = MAXALIGN(itemsz);
67 items[i] = (Item) from;
68 itemsizes[i] = itemsz;
69 i++;
71 from += itemsz;
73 nitems = i;
75 for (i = nitems - 1; i >= 0; i--)
77 if (PageAddItem(page, items[i], itemsizes[i], nitems - i,
78 false, false) == InvalidOffsetNumber)
79 elog(PANIC, "_bt_restore_page: cannot add item to page");
83 static void
84 _bt_restore_meta(XLogReaderState *record, uint8 block_id)
86 XLogRecPtr lsn = record->EndRecPtr;
87 Buffer metabuf;
88 Page metapg;
89 BTMetaPageData *md;
90 BTPageOpaque pageop;
91 xl_btree_metadata *xlrec;
92 char *ptr;
93 Size len;
95 metabuf = XLogInitBufferForRedo(record, block_id);
96 ptr = XLogRecGetBlockData(record, block_id, &len);
98 Assert(len == sizeof(xl_btree_metadata));
99 Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
100 xlrec = (xl_btree_metadata *) ptr;
101 metapg = BufferGetPage(metabuf);
103 _bt_pageinit(metapg, BufferGetPageSize(metabuf));
105 md = BTPageGetMeta(metapg);
106 md->btm_magic = BTREE_MAGIC;
107 md->btm_version = xlrec->version;
108 md->btm_root = xlrec->root;
109 md->btm_level = xlrec->level;
110 md->btm_fastroot = xlrec->fastroot;
111 md->btm_fastlevel = xlrec->fastlevel;
112 /* Cannot log BTREE_MIN_VERSION index metapage without upgrade */
113 Assert(md->btm_version >= BTREE_NOVAC_VERSION);
114 md->btm_last_cleanup_num_delpages = xlrec->last_cleanup_num_delpages;
115 md->btm_last_cleanup_num_heap_tuples = -1.0;
116 md->btm_allequalimage = xlrec->allequalimage;
118 pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
119 pageop->btpo_flags = BTP_META;
122 * Set pd_lower just past the end of the metadata. This is essential,
123 * because without doing so, metadata will be lost if xlog.c compresses
124 * the page.
126 ((PageHeader) metapg)->pd_lower =
127 ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
129 PageSetLSN(metapg, lsn);
130 MarkBufferDirty(metabuf);
131 UnlockReleaseBuffer(metabuf);
135 * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
137 * This is a common subroutine of the redo functions of all the WAL record
138 * types that can insert a downlink: insert, split, and newroot.
140 static void
141 _bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
143 XLogRecPtr lsn = record->EndRecPtr;
144 Buffer buf;
146 if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO)
148 Page page = (Page) BufferGetPage(buf);
149 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
151 Assert(P_INCOMPLETE_SPLIT(pageop));
152 pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
154 PageSetLSN(page, lsn);
155 MarkBufferDirty(buf);
157 if (BufferIsValid(buf))
158 UnlockReleaseBuffer(buf);
161 static void
162 btree_xlog_insert(bool isleaf, bool ismeta, bool posting,
163 XLogReaderState *record)
165 XLogRecPtr lsn = record->EndRecPtr;
166 xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
167 Buffer buffer;
168 Page page;
171 * Insertion to an internal page finishes an incomplete split at the child
172 * level. Clear the incomplete-split flag in the child. Note: during
173 * normal operation, the child and parent pages are locked at the same
174 * time (the locks are coupled), so that clearing the flag and inserting
175 * the downlink appear atomic to other backends. We don't bother with
176 * that during replay, because readers don't care about the
177 * incomplete-split flag and there cannot be updates happening.
179 if (!isleaf)
180 _bt_clear_incomplete_split(record, 1);
181 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
183 Size datalen;
184 char *datapos = XLogRecGetBlockData(record, 0, &datalen);
186 page = BufferGetPage(buffer);
188 if (!posting)
190 /* Simple retail insertion */
191 if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
192 false, false) == InvalidOffsetNumber)
193 elog(PANIC, "failed to add new item");
195 else
197 ItemId itemid;
198 IndexTuple oposting,
199 newitem,
200 nposting;
201 uint16 postingoff;
204 * A posting list split occurred during leaf page insertion. WAL
205 * record data will start with an offset number representing the
206 * point in an existing posting list that a split occurs at.
208 * Use _bt_swap_posting() to repeat posting list split steps from
209 * primary. Note that newitem from WAL record is 'orignewitem',
210 * not the final version of newitem that is actually inserted on
211 * page.
213 postingoff = *((uint16 *) datapos);
214 datapos += sizeof(uint16);
215 datalen -= sizeof(uint16);
217 itemid = PageGetItemId(page, OffsetNumberPrev(xlrec->offnum));
218 oposting = (IndexTuple) PageGetItem(page, itemid);
220 /* Use mutable, aligned newitem copy in _bt_swap_posting() */
221 Assert(isleaf && postingoff > 0);
222 newitem = CopyIndexTuple((IndexTuple) datapos);
223 nposting = _bt_swap_posting(newitem, oposting, postingoff);
225 /* Replace existing posting list with post-split version */
226 memcpy(oposting, nposting, MAXALIGN(IndexTupleSize(nposting)));
228 /* Insert "final" new item (not orignewitem from WAL stream) */
229 Assert(IndexTupleSize(newitem) == datalen);
230 if (PageAddItem(page, (Item) newitem, datalen, xlrec->offnum,
231 false, false) == InvalidOffsetNumber)
232 elog(PANIC, "failed to add posting split new item");
235 PageSetLSN(page, lsn);
236 MarkBufferDirty(buffer);
238 if (BufferIsValid(buffer))
239 UnlockReleaseBuffer(buffer);
242 * Note: in normal operation, we'd update the metapage while still holding
243 * lock on the page we inserted into. But during replay it's not
244 * necessary to hold that lock, since no other index updates can be
245 * happening concurrently, and readers will cope fine with following an
246 * obsolete link from the metapage.
248 if (ismeta)
249 _bt_restore_meta(record, 2);
252 static void
253 btree_xlog_split(bool newitemonleft, XLogReaderState *record)
255 XLogRecPtr lsn = record->EndRecPtr;
256 xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
257 bool isleaf = (xlrec->level == 0);
258 Buffer buf;
259 Buffer rbuf;
260 Page rpage;
261 BTPageOpaque ropaque;
262 char *datapos;
263 Size datalen;
264 BlockNumber origpagenumber;
265 BlockNumber rightpagenumber;
266 BlockNumber spagenumber;
268 XLogRecGetBlockTag(record, 0, NULL, NULL, &origpagenumber);
269 XLogRecGetBlockTag(record, 1, NULL, NULL, &rightpagenumber);
270 if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &spagenumber))
271 spagenumber = P_NONE;
274 * Clear the incomplete split flag on the appropriate child page one level
275 * down when origpage/buf is an internal page (there must have been
276 * cascading page splits during original execution in the event of an
277 * internal page split). This is like the corresponding btree_xlog_insert
278 * call for internal pages. We're not clearing the incomplete split flag
279 * for the current page split here (you can think of this as part of the
280 * insert of newitem that the page split action needs to perform in
281 * passing).
283 * Like in btree_xlog_insert, this can be done before locking other pages.
284 * We never need to couple cross-level locks in REDO routines.
286 if (!isleaf)
287 _bt_clear_incomplete_split(record, 3);
289 /* Reconstruct right (new) sibling page from scratch */
290 rbuf = XLogInitBufferForRedo(record, 1);
291 datapos = XLogRecGetBlockData(record, 1, &datalen);
292 rpage = (Page) BufferGetPage(rbuf);
294 _bt_pageinit(rpage, BufferGetPageSize(rbuf));
295 ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
297 ropaque->btpo_prev = origpagenumber;
298 ropaque->btpo_next = spagenumber;
299 ropaque->btpo_level = xlrec->level;
300 ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
301 ropaque->btpo_cycleid = 0;
303 _bt_restore_page(rpage, datapos, datalen);
305 PageSetLSN(rpage, lsn);
306 MarkBufferDirty(rbuf);
308 /* Now reconstruct original page (left half of split) */
309 if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO)
312 * To retain the same physical order of the tuples that they had, we
313 * initialize a temporary empty page for the left page and add all the
314 * items to that in item number order. This mirrors how _bt_split()
315 * works. Retaining the same physical order makes WAL consistency
316 * checking possible. See also _bt_restore_page(), which does the
317 * same for the right page.
319 Page origpage = (Page) BufferGetPage(buf);
320 BTPageOpaque oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
321 OffsetNumber off;
322 IndexTuple newitem = NULL,
323 left_hikey = NULL,
324 nposting = NULL;
325 Size newitemsz = 0,
326 left_hikeysz = 0;
327 Page leftpage;
328 OffsetNumber leftoff,
329 replacepostingoff = InvalidOffsetNumber;
331 datapos = XLogRecGetBlockData(record, 0, &datalen);
333 if (newitemonleft || xlrec->postingoff != 0)
335 newitem = (IndexTuple) datapos;
336 newitemsz = MAXALIGN(IndexTupleSize(newitem));
337 datapos += newitemsz;
338 datalen -= newitemsz;
340 if (xlrec->postingoff != 0)
342 ItemId itemid;
343 IndexTuple oposting;
345 /* Posting list must be at offset number before new item's */
346 replacepostingoff = OffsetNumberPrev(xlrec->newitemoff);
348 /* Use mutable, aligned newitem copy in _bt_swap_posting() */
349 newitem = CopyIndexTuple(newitem);
350 itemid = PageGetItemId(origpage, replacepostingoff);
351 oposting = (IndexTuple) PageGetItem(origpage, itemid);
352 nposting = _bt_swap_posting(newitem, oposting,
353 xlrec->postingoff);
358 * Extract left hikey and its size. We assume that 16-bit alignment
359 * is enough to apply IndexTupleSize (since it's fetching from a
360 * uint16 field).
362 left_hikey = (IndexTuple) datapos;
363 left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
364 datapos += left_hikeysz;
365 datalen -= left_hikeysz;
367 Assert(datalen == 0);
369 leftpage = PageGetTempPageCopySpecial(origpage);
371 /* Add high key tuple from WAL record to temp page */
372 leftoff = P_HIKEY;
373 if (PageAddItem(leftpage, (Item) left_hikey, left_hikeysz, P_HIKEY,
374 false, false) == InvalidOffsetNumber)
375 elog(ERROR, "failed to add high key to left page after split");
376 leftoff = OffsetNumberNext(leftoff);
378 for (off = P_FIRSTDATAKEY(oopaque); off < xlrec->firstrightoff; off++)
380 ItemId itemid;
381 Size itemsz;
382 IndexTuple item;
384 /* Add replacement posting list when required */
385 if (off == replacepostingoff)
387 Assert(newitemonleft ||
388 xlrec->firstrightoff == xlrec->newitemoff);
389 if (PageAddItem(leftpage, (Item) nposting,
390 MAXALIGN(IndexTupleSize(nposting)), leftoff,
391 false, false) == InvalidOffsetNumber)
392 elog(ERROR, "failed to add new posting list item to left page after split");
393 leftoff = OffsetNumberNext(leftoff);
394 continue; /* don't insert oposting */
397 /* add the new item if it was inserted on left page */
398 else if (newitemonleft && off == xlrec->newitemoff)
400 if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff,
401 false, false) == InvalidOffsetNumber)
402 elog(ERROR, "failed to add new item to left page after split");
403 leftoff = OffsetNumberNext(leftoff);
406 itemid = PageGetItemId(origpage, off);
407 itemsz = ItemIdGetLength(itemid);
408 item = (IndexTuple) PageGetItem(origpage, itemid);
409 if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
410 false, false) == InvalidOffsetNumber)
411 elog(ERROR, "failed to add old item to left page after split");
412 leftoff = OffsetNumberNext(leftoff);
415 /* cope with possibility that newitem goes at the end */
416 if (newitemonleft && off == xlrec->newitemoff)
418 if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff,
419 false, false) == InvalidOffsetNumber)
420 elog(ERROR, "failed to add new item to left page after split");
421 leftoff = OffsetNumberNext(leftoff);
424 PageRestoreTempPage(leftpage, origpage);
426 /* Fix opaque fields */
427 oopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
428 if (isleaf)
429 oopaque->btpo_flags |= BTP_LEAF;
430 oopaque->btpo_next = rightpagenumber;
431 oopaque->btpo_cycleid = 0;
433 PageSetLSN(origpage, lsn);
434 MarkBufferDirty(buf);
437 /* Fix left-link of the page to the right of the new right sibling */
438 if (spagenumber != P_NONE)
440 Buffer sbuf;
442 if (XLogReadBufferForRedo(record, 2, &sbuf) == BLK_NEEDS_REDO)
444 Page spage = (Page) BufferGetPage(sbuf);
445 BTPageOpaque spageop = (BTPageOpaque) PageGetSpecialPointer(spage);
447 spageop->btpo_prev = rightpagenumber;
449 PageSetLSN(spage, lsn);
450 MarkBufferDirty(sbuf);
452 if (BufferIsValid(sbuf))
453 UnlockReleaseBuffer(sbuf);
457 * Finally, release the remaining buffers. sbuf, rbuf, and buf must be
458 * released together, so that readers cannot observe inconsistencies.
460 UnlockReleaseBuffer(rbuf);
461 if (BufferIsValid(buf))
462 UnlockReleaseBuffer(buf);
465 static void
466 btree_xlog_dedup(XLogReaderState *record)
468 XLogRecPtr lsn = record->EndRecPtr;
469 xl_btree_dedup *xlrec = (xl_btree_dedup *) XLogRecGetData(record);
470 Buffer buf;
472 if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO)
474 char *ptr = XLogRecGetBlockData(record, 0, NULL);
475 Page page = (Page) BufferGetPage(buf);
476 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
477 OffsetNumber offnum,
478 minoff,
479 maxoff;
480 BTDedupState state;
481 BTDedupInterval *intervals;
482 Page newpage;
484 state = (BTDedupState) palloc(sizeof(BTDedupStateData));
485 state->deduplicate = true; /* unused */
486 state->nmaxitems = 0; /* unused */
487 /* Conservatively use larger maxpostingsize than primary */
488 state->maxpostingsize = BTMaxItemSize(page);
489 state->base = NULL;
490 state->baseoff = InvalidOffsetNumber;
491 state->basetupsize = 0;
492 state->htids = palloc(state->maxpostingsize);
493 state->nhtids = 0;
494 state->nitems = 0;
495 state->phystupsize = 0;
496 state->nintervals = 0;
498 minoff = P_FIRSTDATAKEY(opaque);
499 maxoff = PageGetMaxOffsetNumber(page);
500 newpage = PageGetTempPageCopySpecial(page);
502 if (!P_RIGHTMOST(opaque))
504 ItemId itemid = PageGetItemId(page, P_HIKEY);
505 Size itemsz = ItemIdGetLength(itemid);
506 IndexTuple item = (IndexTuple) PageGetItem(page, itemid);
508 if (PageAddItem(newpage, (Item) item, itemsz, P_HIKEY,
509 false, false) == InvalidOffsetNumber)
510 elog(ERROR, "deduplication failed to add highkey");
513 intervals = (BTDedupInterval *) ptr;
514 for (offnum = minoff;
515 offnum <= maxoff;
516 offnum = OffsetNumberNext(offnum))
518 ItemId itemid = PageGetItemId(page, offnum);
519 IndexTuple itup = (IndexTuple) PageGetItem(page, itemid);
521 if (offnum == minoff)
522 _bt_dedup_start_pending(state, itup, offnum);
523 else if (state->nintervals < xlrec->nintervals &&
524 state->baseoff == intervals[state->nintervals].baseoff &&
525 state->nitems < intervals[state->nintervals].nitems)
527 if (!_bt_dedup_save_htid(state, itup))
528 elog(ERROR, "deduplication failed to add heap tid to pending posting list");
530 else
532 _bt_dedup_finish_pending(newpage, state);
533 _bt_dedup_start_pending(state, itup, offnum);
537 _bt_dedup_finish_pending(newpage, state);
538 Assert(state->nintervals == xlrec->nintervals);
539 Assert(memcmp(state->intervals, intervals,
540 state->nintervals * sizeof(BTDedupInterval)) == 0);
542 if (P_HAS_GARBAGE(opaque))
544 BTPageOpaque nopaque = (BTPageOpaque) PageGetSpecialPointer(newpage);
546 nopaque->btpo_flags &= ~BTP_HAS_GARBAGE;
549 PageRestoreTempPage(newpage, page);
550 PageSetLSN(page, lsn);
551 MarkBufferDirty(buf);
554 if (BufferIsValid(buf))
555 UnlockReleaseBuffer(buf);
558 static void
559 btree_xlog_updates(Page page, OffsetNumber *updatedoffsets,
560 xl_btree_update *updates, int nupdated)
562 BTVacuumPosting vacposting;
563 IndexTuple origtuple;
564 ItemId itemid;
565 Size itemsz;
567 for (int i = 0; i < nupdated; i++)
569 itemid = PageGetItemId(page, updatedoffsets[i]);
570 origtuple = (IndexTuple) PageGetItem(page, itemid);
572 vacposting = palloc(offsetof(BTVacuumPostingData, deletetids) +
573 updates->ndeletedtids * sizeof(uint16));
574 vacposting->updatedoffset = updatedoffsets[i];
575 vacposting->itup = origtuple;
576 vacposting->ndeletedtids = updates->ndeletedtids;
577 memcpy(vacposting->deletetids,
578 (char *) updates + SizeOfBtreeUpdate,
579 updates->ndeletedtids * sizeof(uint16));
581 _bt_update_posting(vacposting);
583 /* Overwrite updated version of tuple */
584 itemsz = MAXALIGN(IndexTupleSize(vacposting->itup));
585 if (!PageIndexTupleOverwrite(page, updatedoffsets[i],
586 (Item) vacposting->itup, itemsz))
587 elog(PANIC, "failed to update partially dead item");
589 pfree(vacposting->itup);
590 pfree(vacposting);
592 /* advance to next xl_btree_update from array */
593 updates = (xl_btree_update *)
594 ((char *) updates + SizeOfBtreeUpdate +
595 updates->ndeletedtids * sizeof(uint16));
599 static void
600 btree_xlog_vacuum(XLogReaderState *record)
602 XLogRecPtr lsn = record->EndRecPtr;
603 xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
604 Buffer buffer;
605 Page page;
606 BTPageOpaque opaque;
609 * We need to take a cleanup lock here, just like btvacuumpage(). However,
610 * it isn't necessary to exhaustively get a cleanup lock on every block in
611 * the index during recovery (just getting a cleanup lock on pages with
612 * items to kill suffices). See nbtree/README for details.
614 if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer)
615 == BLK_NEEDS_REDO)
617 char *ptr = XLogRecGetBlockData(record, 0, NULL);
619 page = (Page) BufferGetPage(buffer);
621 if (xlrec->nupdated > 0)
623 OffsetNumber *updatedoffsets;
624 xl_btree_update *updates;
626 updatedoffsets = (OffsetNumber *)
627 (ptr + xlrec->ndeleted * sizeof(OffsetNumber));
628 updates = (xl_btree_update *) ((char *) updatedoffsets +
629 xlrec->nupdated *
630 sizeof(OffsetNumber));
632 btree_xlog_updates(page, updatedoffsets, updates, xlrec->nupdated);
635 if (xlrec->ndeleted > 0)
636 PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
639 * Mark the page as not containing any LP_DEAD items --- see comments
640 * in _bt_delitems_vacuum().
642 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
643 opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
645 PageSetLSN(page, lsn);
646 MarkBufferDirty(buffer);
648 if (BufferIsValid(buffer))
649 UnlockReleaseBuffer(buffer);
652 static void
653 btree_xlog_delete(XLogReaderState *record)
655 XLogRecPtr lsn = record->EndRecPtr;
656 xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
657 Buffer buffer;
658 Page page;
659 BTPageOpaque opaque;
662 * If we have any conflict processing to do, it must happen before we
663 * update the page
665 if (InHotStandby)
667 RelFileNode rnode;
669 XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
671 ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
675 * We don't need to take a cleanup lock to apply these changes. See
676 * nbtree/README for details.
678 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
680 char *ptr = XLogRecGetBlockData(record, 0, NULL);
682 page = (Page) BufferGetPage(buffer);
684 if (xlrec->nupdated > 0)
686 OffsetNumber *updatedoffsets;
687 xl_btree_update *updates;
689 updatedoffsets = (OffsetNumber *)
690 (ptr + xlrec->ndeleted * sizeof(OffsetNumber));
691 updates = (xl_btree_update *) ((char *) updatedoffsets +
692 xlrec->nupdated *
693 sizeof(OffsetNumber));
695 btree_xlog_updates(page, updatedoffsets, updates, xlrec->nupdated);
698 if (xlrec->ndeleted > 0)
699 PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
701 /* Mark the page as not containing any LP_DEAD items */
702 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
703 opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
705 PageSetLSN(page, lsn);
706 MarkBufferDirty(buffer);
708 if (BufferIsValid(buffer))
709 UnlockReleaseBuffer(buffer);
712 static void
713 btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
715 XLogRecPtr lsn = record->EndRecPtr;
716 xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
717 Buffer buffer;
718 Page page;
719 BTPageOpaque pageop;
720 IndexTupleData trunctuple;
723 * In normal operation, we would lock all the pages this WAL record
724 * touches before changing any of them. In WAL replay, it should be okay
725 * to lock just one page at a time, since no concurrent index updates can
726 * be happening, and readers should not care whether they arrive at the
727 * target page or not (since it's surely empty).
730 /* to-be-deleted subtree's parent page */
731 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
733 OffsetNumber poffset;
734 ItemId itemid;
735 IndexTuple itup;
736 OffsetNumber nextoffset;
737 BlockNumber rightsib;
739 page = (Page) BufferGetPage(buffer);
740 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
742 poffset = xlrec->poffset;
744 nextoffset = OffsetNumberNext(poffset);
745 itemid = PageGetItemId(page, nextoffset);
746 itup = (IndexTuple) PageGetItem(page, itemid);
747 rightsib = BTreeTupleGetDownLink(itup);
749 itemid = PageGetItemId(page, poffset);
750 itup = (IndexTuple) PageGetItem(page, itemid);
751 BTreeTupleSetDownLink(itup, rightsib);
752 nextoffset = OffsetNumberNext(poffset);
753 PageIndexTupleDelete(page, nextoffset);
755 PageSetLSN(page, lsn);
756 MarkBufferDirty(buffer);
760 * Don't need to couple cross-level locks in REDO routines, so release
761 * lock on internal page immediately
763 if (BufferIsValid(buffer))
764 UnlockReleaseBuffer(buffer);
766 /* Rewrite the leaf page as a halfdead page */
767 buffer = XLogInitBufferForRedo(record, 0);
768 page = (Page) BufferGetPage(buffer);
770 _bt_pageinit(page, BufferGetPageSize(buffer));
771 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
773 pageop->btpo_prev = xlrec->leftblk;
774 pageop->btpo_next = xlrec->rightblk;
775 pageop->btpo_level = 0;
776 pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
777 pageop->btpo_cycleid = 0;
780 * Construct a dummy high key item that points to top parent page (value
781 * is InvalidBlockNumber when the top parent page is the leaf page itself)
783 MemSet(&trunctuple, 0, sizeof(IndexTupleData));
784 trunctuple.t_info = sizeof(IndexTupleData);
785 BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
787 if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
788 false, false) == InvalidOffsetNumber)
789 elog(ERROR, "could not add dummy high key to half-dead page");
791 PageSetLSN(page, lsn);
792 MarkBufferDirty(buffer);
793 UnlockReleaseBuffer(buffer);
797 static void
798 btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
800 XLogRecPtr lsn = record->EndRecPtr;
801 xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
802 BlockNumber leftsib;
803 BlockNumber rightsib;
804 uint32 level;
805 bool isleaf;
806 FullTransactionId safexid;
807 Buffer leftbuf;
808 Buffer target;
809 Buffer rightbuf;
810 Page page;
811 BTPageOpaque pageop;
813 leftsib = xlrec->leftsib;
814 rightsib = xlrec->rightsib;
815 level = xlrec->level;
816 isleaf = (level == 0);
817 safexid = xlrec->safexid;
819 /* No leaftopparent for level 0 (leaf page) or level 1 target */
820 Assert(!BlockNumberIsValid(xlrec->leaftopparent) || level > 1);
823 * In normal operation, we would lock all the pages this WAL record
824 * touches before changing any of them. In WAL replay, we at least lock
825 * the pages in the same standard left-to-right order (leftsib, target,
826 * rightsib), and don't release the sibling locks until the target is
827 * marked deleted.
830 /* Fix right-link of left sibling, if any */
831 if (leftsib != P_NONE)
833 if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
835 page = (Page) BufferGetPage(leftbuf);
836 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
837 pageop->btpo_next = rightsib;
839 PageSetLSN(page, lsn);
840 MarkBufferDirty(leftbuf);
843 else
844 leftbuf = InvalidBuffer;
846 /* Rewrite target page as empty deleted page */
847 target = XLogInitBufferForRedo(record, 0);
848 page = (Page) BufferGetPage(target);
850 _bt_pageinit(page, BufferGetPageSize(target));
851 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
853 pageop->btpo_prev = leftsib;
854 pageop->btpo_next = rightsib;
855 pageop->btpo_level = level;
856 BTPageSetDeleted(page, safexid);
857 if (isleaf)
858 pageop->btpo_flags |= BTP_LEAF;
859 pageop->btpo_cycleid = 0;
861 PageSetLSN(page, lsn);
862 MarkBufferDirty(target);
864 /* Fix left-link of right sibling */
865 if (XLogReadBufferForRedo(record, 2, &rightbuf) == BLK_NEEDS_REDO)
867 page = (Page) BufferGetPage(rightbuf);
868 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
869 pageop->btpo_prev = leftsib;
871 PageSetLSN(page, lsn);
872 MarkBufferDirty(rightbuf);
875 /* Release siblings */
876 if (BufferIsValid(leftbuf))
877 UnlockReleaseBuffer(leftbuf);
878 if (BufferIsValid(rightbuf))
879 UnlockReleaseBuffer(rightbuf);
881 /* Release target */
882 UnlockReleaseBuffer(target);
885 * If we deleted a parent of the targeted leaf page, instead of the leaf
886 * itself, update the leaf to point to the next remaining child in the
887 * to-be-deleted subtree
889 if (XLogRecHasBlockRef(record, 3))
892 * There is no real data on the page, so we just re-create it from
893 * scratch using the information from the WAL record.
895 * Note that we don't end up here when the target page is also the
896 * leafbuf page. There is no need to add a dummy hikey item with a
897 * top parent link when deleting leafbuf because it's the last page
898 * we'll delete in the subtree undergoing deletion.
900 Buffer leafbuf;
901 IndexTupleData trunctuple;
903 Assert(!isleaf);
905 leafbuf = XLogInitBufferForRedo(record, 3);
906 page = (Page) BufferGetPage(leafbuf);
908 _bt_pageinit(page, BufferGetPageSize(leafbuf));
909 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
911 pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
912 pageop->btpo_prev = xlrec->leafleftsib;
913 pageop->btpo_next = xlrec->leafrightsib;
914 pageop->btpo_level = 0;
915 pageop->btpo_cycleid = 0;
917 /* Add a dummy hikey item */
918 MemSet(&trunctuple, 0, sizeof(IndexTupleData));
919 trunctuple.t_info = sizeof(IndexTupleData);
920 BTreeTupleSetTopParent(&trunctuple, xlrec->leaftopparent);
922 if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
923 false, false) == InvalidOffsetNumber)
924 elog(ERROR, "could not add dummy high key to half-dead page");
926 PageSetLSN(page, lsn);
927 MarkBufferDirty(leafbuf);
928 UnlockReleaseBuffer(leafbuf);
931 /* Update metapage if needed */
932 if (info == XLOG_BTREE_UNLINK_PAGE_META)
933 _bt_restore_meta(record, 4);
936 static void
937 btree_xlog_newroot(XLogReaderState *record)
939 XLogRecPtr lsn = record->EndRecPtr;
940 xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
941 Buffer buffer;
942 Page page;
943 BTPageOpaque pageop;
944 char *ptr;
945 Size len;
947 buffer = XLogInitBufferForRedo(record, 0);
948 page = (Page) BufferGetPage(buffer);
950 _bt_pageinit(page, BufferGetPageSize(buffer));
951 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
953 pageop->btpo_flags = BTP_ROOT;
954 pageop->btpo_prev = pageop->btpo_next = P_NONE;
955 pageop->btpo_level = xlrec->level;
956 if (xlrec->level == 0)
957 pageop->btpo_flags |= BTP_LEAF;
958 pageop->btpo_cycleid = 0;
960 if (xlrec->level > 0)
962 ptr = XLogRecGetBlockData(record, 0, &len);
963 _bt_restore_page(page, ptr, len);
965 /* Clear the incomplete-split flag in left child */
966 _bt_clear_incomplete_split(record, 1);
969 PageSetLSN(page, lsn);
970 MarkBufferDirty(buffer);
971 UnlockReleaseBuffer(buffer);
973 _bt_restore_meta(record, 2);
977 * In general VACUUM must defer recycling as a way of avoiding certain race
978 * conditions. Deleted pages contain a safexid value that is used by VACUUM
979 * to determine whether or not it's safe to place a page that was deleted by
980 * VACUUM earlier into the FSM now. See nbtree/README.
982 * As far as any backend operating during original execution is concerned, the
983 * FSM is a cache of recycle-safe pages; the mere presence of the page in the
984 * FSM indicates that the page must already be safe to recycle (actually,
985 * _bt_getbuf() verifies it's safe using BTPageIsRecyclable(), but that's just
986 * because it would be unwise to completely trust the FSM, given its current
987 * limitations).
989 * This isn't sufficient to prevent similar concurrent recycling race
990 * conditions during Hot Standby, though. For that we need to log a
991 * xl_btree_reuse_page record at the point that a page is actually recycled
992 * and reused for an entirely unrelated page inside _bt_split(). These
993 * records include the same safexid value from the original deleted page,
994 * stored in the record's latestRemovedFullXid field.
996 * The GlobalVisCheckRemovableFullXid() test in BTPageIsRecyclable() is used
997 * to determine if it's safe to recycle a page. This mirrors our own test:
998 * the PGPROC->xmin > limitXmin test inside GetConflictingVirtualXIDs().
999 * Consequently, one XID value achieves the same exclusion effect on primary
1000 * and standby.
1002 static void
1003 btree_xlog_reuse_page(XLogReaderState *record)
1005 xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
1007 if (InHotStandby)
1008 ResolveRecoveryConflictWithSnapshotFullXid(xlrec->latestRemovedFullXid,
1009 xlrec->node);
1012 void
1013 btree_redo(XLogReaderState *record)
1015 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1016 MemoryContext oldCtx;
1018 oldCtx = MemoryContextSwitchTo(opCtx);
1019 switch (info)
1021 case XLOG_BTREE_INSERT_LEAF:
1022 btree_xlog_insert(true, false, false, record);
1023 break;
1024 case XLOG_BTREE_INSERT_UPPER:
1025 btree_xlog_insert(false, false, false, record);
1026 break;
1027 case XLOG_BTREE_INSERT_META:
1028 btree_xlog_insert(false, true, false, record);
1029 break;
1030 case XLOG_BTREE_SPLIT_L:
1031 btree_xlog_split(true, record);
1032 break;
1033 case XLOG_BTREE_SPLIT_R:
1034 btree_xlog_split(false, record);
1035 break;
1036 case XLOG_BTREE_INSERT_POST:
1037 btree_xlog_insert(true, false, true, record);
1038 break;
1039 case XLOG_BTREE_DEDUP:
1040 btree_xlog_dedup(record);
1041 break;
1042 case XLOG_BTREE_VACUUM:
1043 btree_xlog_vacuum(record);
1044 break;
1045 case XLOG_BTREE_DELETE:
1046 btree_xlog_delete(record);
1047 break;
1048 case XLOG_BTREE_MARK_PAGE_HALFDEAD:
1049 btree_xlog_mark_page_halfdead(info, record);
1050 break;
1051 case XLOG_BTREE_UNLINK_PAGE:
1052 case XLOG_BTREE_UNLINK_PAGE_META:
1053 btree_xlog_unlink_page(info, record);
1054 break;
1055 case XLOG_BTREE_NEWROOT:
1056 btree_xlog_newroot(record);
1057 break;
1058 case XLOG_BTREE_REUSE_PAGE:
1059 btree_xlog_reuse_page(record);
1060 break;
1061 case XLOG_BTREE_META_CLEANUP:
1062 _bt_restore_meta(record, 0);
1063 break;
1064 default:
1065 elog(PANIC, "btree_redo: unknown op code %u", info);
1067 MemoryContextSwitchTo(oldCtx);
1068 MemoryContextReset(opCtx);
1071 void
1072 btree_xlog_startup(void)
1074 opCtx = AllocSetContextCreate(CurrentMemoryContext,
1075 "Btree recovery temporary context",
1076 ALLOCSET_DEFAULT_SIZES);
1079 void
1080 btree_xlog_cleanup(void)
1082 MemoryContextDelete(opCtx);
1083 opCtx = NULL;
1087 * Mask a btree page before performing consistency checks on it.
1089 void
1090 btree_mask(char *pagedata, BlockNumber blkno)
1092 Page page = (Page) pagedata;
1093 BTPageOpaque maskopaq;
1095 mask_page_lsn_and_checksum(page);
1097 mask_page_hint_bits(page);
1098 mask_unused_space(page);
1100 maskopaq = (BTPageOpaque) PageGetSpecialPointer(page);
1102 if (P_ISLEAF(maskopaq))
1105 * In btree leaf pages, it is possible to modify the LP_FLAGS without
1106 * emitting any WAL record. Hence, mask the line pointer flags. See
1107 * _bt_killitems(), _bt_check_unique() for details.
1109 mask_lp_flags(page);
1113 * BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
1114 * _bt_delete_or_dedup_one_page(), _bt_killitems(), and _bt_check_unique()
1115 * for details.
1117 maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE;
1120 * During replay of a btree page split, we don't set the BTP_SPLIT_END
1121 * flag of the right sibling and initialize the cycle_id to 0 for the same
1122 * page. See btree_xlog_split() for details.
1124 maskopaq->btpo_flags &= ~BTP_SPLIT_END;
1125 maskopaq->btpo_cycleid = 0;