Improve comment about GetWALAvailability's WALAVAIL_REMOVED code.
[pgsql.git] / src / backend / access / transam / xlogprefetcher.c
blob046e40d143ae49b0fe8c5f15f03ec16eafaca23f
1 /*-------------------------------------------------------------------------
3 * xlogprefetcher.c
4 * Prefetching support for recovery.
6 * Portions Copyright (c) 2022-2023, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/access/transam/xlogprefetcher.c
13 * This module provides a drop-in replacement for an XLogReader that tries to
14 * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 * accessed in the near future are not already in the buffer pool, it initiates
16 * I/Os that might complete before the caller eventually needs the data. When
17 * referenced blocks are found in the buffer pool already, the buffer is
18 * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 * avoid a second buffer mapping table lookup.
21 * Currently, only the main fork is considered for prefetching. Currently,
22 * prefetching is only effective on systems where PrefetchBuffer() does
23 * something useful (mainly Linux).
25 *-------------------------------------------------------------------------
28 #include "postgres.h"
30 #include "access/xlog.h"
31 #include "access/xlogprefetcher.h"
32 #include "access/xlogreader.h"
33 #include "access/xlogutils.h"
34 #include "catalog/pg_class.h"
35 #include "catalog/pg_control.h"
36 #include "catalog/storage_xlog.h"
37 #include "commands/dbcommands_xlog.h"
38 #include "utils/fmgrprotos.h"
39 #include "utils/timestamp.h"
40 #include "funcapi.h"
41 #include "pgstat.h"
42 #include "miscadmin.h"
43 #include "port/atomics.h"
44 #include "storage/bufmgr.h"
45 #include "storage/shmem.h"
46 #include "storage/smgr.h"
47 #include "utils/guc_hooks.h"
48 #include "utils/hsearch.h"
51 * Every time we process this much WAL, we'll update the values in
52 * pg_stat_recovery_prefetch.
54 #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
57 * To detect repeated access to the same block and skip useless extra system
58 * calls, we remember a small window of recently prefetched blocks.
60 #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
63 * When maintenance_io_concurrency is not saturated, we're prepared to look
64 * ahead up to N times that number of block references.
66 #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
68 /* Define to log internal debugging messages. */
69 /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
71 /* GUCs */
72 int recovery_prefetch = RECOVERY_PREFETCH_TRY;
74 #ifdef USE_PREFETCH
75 #define RecoveryPrefetchEnabled() \
76 (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
77 maintenance_io_concurrency > 0)
78 #else
79 #define RecoveryPrefetchEnabled() false
80 #endif
82 static int XLogPrefetchReconfigureCount = 0;
85 * Enum used to report whether an IO should be started.
87 typedef enum
89 LRQ_NEXT_NO_IO,
90 LRQ_NEXT_IO,
91 LRQ_NEXT_AGAIN
92 } LsnReadQueueNextStatus;
95 * Type of callback that can decide which block to prefetch next. For now
96 * there is only one.
98 typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
99 XLogRecPtr *lsn);
102 * A simple circular queue of LSNs, using to control the number of
103 * (potentially) inflight IOs. This stands in for a later more general IO
104 * control mechanism, which is why it has the apparently unnecessary
105 * indirection through a function pointer.
107 typedef struct LsnReadQueue
109 LsnReadQueueNextFun next;
110 uintptr_t lrq_private;
111 uint32 max_inflight;
112 uint32 inflight;
113 uint32 completed;
114 uint32 head;
115 uint32 tail;
116 uint32 size;
117 struct
119 bool io;
120 XLogRecPtr lsn;
121 } queue[FLEXIBLE_ARRAY_MEMBER];
122 } LsnReadQueue;
125 * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
126 * blocks that will be soon be referenced, to try to avoid IO stalls.
128 struct XLogPrefetcher
130 /* WAL reader and current reading state. */
131 XLogReaderState *reader;
132 DecodedXLogRecord *record;
133 int next_block_id;
135 /* When to publish stats. */
136 XLogRecPtr next_stats_shm_lsn;
138 /* Book-keeping to avoid accessing blocks that don't exist yet. */
139 HTAB *filter_table;
140 dlist_head filter_queue;
142 /* Book-keeping to avoid repeat prefetches. */
143 RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
144 BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
145 int recent_idx;
147 /* Book-keeping to disable prefetching temporarily. */
148 XLogRecPtr no_readahead_until;
150 /* IO depth manager. */
151 LsnReadQueue *streaming_read;
153 XLogRecPtr begin_ptr;
155 int reconfigure_count;
159 * A temporary filter used to track block ranges that haven't been created
160 * yet, whole relations that haven't been created yet, and whole relations
161 * that (we assume) have already been dropped, or will be created by bulk WAL
162 * operators.
164 typedef struct XLogPrefetcherFilter
166 RelFileLocator rlocator;
167 XLogRecPtr filter_until_replayed;
168 BlockNumber filter_from_block;
169 dlist_node link;
170 } XLogPrefetcherFilter;
173 * Counters exposed in shared memory for pg_stat_recovery_prefetch.
175 typedef struct XLogPrefetchStats
177 pg_atomic_uint64 reset_time; /* Time of last reset. */
178 pg_atomic_uint64 prefetch; /* Prefetches initiated. */
179 pg_atomic_uint64 hit; /* Blocks already in cache. */
180 pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
181 pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
182 pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
183 pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
185 /* Dynamic values */
186 int wal_distance; /* Number of WAL bytes ahead. */
187 int block_distance; /* Number of block references ahead. */
188 int io_depth; /* Number of I/Os in progress. */
189 } XLogPrefetchStats;
191 static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
192 RelFileLocator rlocator,
193 BlockNumber blockno,
194 XLogRecPtr lsn);
195 static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
196 RelFileLocator rlocator,
197 BlockNumber blockno);
198 static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
199 XLogRecPtr replaying_lsn);
200 static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
201 XLogRecPtr *lsn);
203 static XLogPrefetchStats *SharedStats;
205 static inline LsnReadQueue *
206 lrq_alloc(uint32 max_distance,
207 uint32 max_inflight,
208 uintptr_t lrq_private,
209 LsnReadQueueNextFun next)
211 LsnReadQueue *lrq;
212 uint32 size;
214 Assert(max_distance >= max_inflight);
216 size = max_distance + 1; /* full ring buffer has a gap */
217 lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
218 lrq->lrq_private = lrq_private;
219 lrq->max_inflight = max_inflight;
220 lrq->size = size;
221 lrq->next = next;
222 lrq->head = 0;
223 lrq->tail = 0;
224 lrq->inflight = 0;
225 lrq->completed = 0;
227 return lrq;
230 static inline void
231 lrq_free(LsnReadQueue *lrq)
233 pfree(lrq);
236 static inline uint32
237 lrq_inflight(LsnReadQueue *lrq)
239 return lrq->inflight;
242 static inline uint32
243 lrq_completed(LsnReadQueue *lrq)
245 return lrq->completed;
248 static inline void
249 lrq_prefetch(LsnReadQueue *lrq)
251 /* Try to start as many IOs as we can within our limits. */
252 while (lrq->inflight < lrq->max_inflight &&
253 lrq->inflight + lrq->completed < lrq->size - 1)
255 Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
256 switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
258 case LRQ_NEXT_AGAIN:
259 return;
260 case LRQ_NEXT_IO:
261 lrq->queue[lrq->head].io = true;
262 lrq->inflight++;
263 break;
264 case LRQ_NEXT_NO_IO:
265 lrq->queue[lrq->head].io = false;
266 lrq->completed++;
267 break;
269 lrq->head++;
270 if (lrq->head == lrq->size)
271 lrq->head = 0;
275 static inline void
276 lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
279 * We know that LSNs before 'lsn' have been replayed, so we can now assume
280 * that any IOs that were started before then have finished.
282 while (lrq->tail != lrq->head &&
283 lrq->queue[lrq->tail].lsn < lsn)
285 if (lrq->queue[lrq->tail].io)
286 lrq->inflight--;
287 else
288 lrq->completed--;
289 lrq->tail++;
290 if (lrq->tail == lrq->size)
291 lrq->tail = 0;
293 if (RecoveryPrefetchEnabled())
294 lrq_prefetch(lrq);
297 size_t
298 XLogPrefetchShmemSize(void)
300 return sizeof(XLogPrefetchStats);
304 * Reset all counters to zero.
306 void
307 XLogPrefetchResetStats(void)
309 pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
310 pg_atomic_write_u64(&SharedStats->prefetch, 0);
311 pg_atomic_write_u64(&SharedStats->hit, 0);
312 pg_atomic_write_u64(&SharedStats->skip_init, 0);
313 pg_atomic_write_u64(&SharedStats->skip_new, 0);
314 pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
315 pg_atomic_write_u64(&SharedStats->skip_rep, 0);
318 void
319 XLogPrefetchShmemInit(void)
321 bool found;
323 SharedStats = (XLogPrefetchStats *)
324 ShmemInitStruct("XLogPrefetchStats",
325 sizeof(XLogPrefetchStats),
326 &found);
328 if (!found)
330 pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
331 pg_atomic_init_u64(&SharedStats->prefetch, 0);
332 pg_atomic_init_u64(&SharedStats->hit, 0);
333 pg_atomic_init_u64(&SharedStats->skip_init, 0);
334 pg_atomic_init_u64(&SharedStats->skip_new, 0);
335 pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
336 pg_atomic_init_u64(&SharedStats->skip_rep, 0);
341 * Called when any GUC is changed that affects prefetching.
343 void
344 XLogPrefetchReconfigure(void)
346 XLogPrefetchReconfigureCount++;
350 * Increment a counter in shared memory. This is equivalent to *counter++ on a
351 * plain uint64 without any memory barrier or locking, except on platforms
352 * where readers can't read uint64 without possibly observing a torn value.
354 static inline void
355 XLogPrefetchIncrement(pg_atomic_uint64 *counter)
357 Assert(AmStartupProcess() || !IsUnderPostmaster);
358 pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
362 * Create a prefetcher that is ready to begin prefetching blocks referenced by
363 * WAL records.
365 XLogPrefetcher *
366 XLogPrefetcherAllocate(XLogReaderState *reader)
368 XLogPrefetcher *prefetcher;
369 static HASHCTL hash_table_ctl = {
370 .keysize = sizeof(RelFileLocator),
371 .entrysize = sizeof(XLogPrefetcherFilter)
374 prefetcher = palloc0(sizeof(XLogPrefetcher));
376 prefetcher->reader = reader;
377 prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
378 &hash_table_ctl,
379 HASH_ELEM | HASH_BLOBS);
380 dlist_init(&prefetcher->filter_queue);
382 SharedStats->wal_distance = 0;
383 SharedStats->block_distance = 0;
384 SharedStats->io_depth = 0;
386 /* First usage will cause streaming_read to be allocated. */
387 prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
389 return prefetcher;
393 * Destroy a prefetcher and release all resources.
395 void
396 XLogPrefetcherFree(XLogPrefetcher *prefetcher)
398 lrq_free(prefetcher->streaming_read);
399 hash_destroy(prefetcher->filter_table);
400 pfree(prefetcher);
404 * Provide access to the reader.
406 XLogReaderState *
407 XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
409 return prefetcher->reader;
413 * Update the statistics visible in the pg_stat_recovery_prefetch view.
415 void
416 XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
418 uint32 io_depth;
419 uint32 completed;
420 int64 wal_distance;
423 /* How far ahead of replay are we now? */
424 if (prefetcher->reader->decode_queue_tail)
426 wal_distance =
427 prefetcher->reader->decode_queue_tail->lsn -
428 prefetcher->reader->decode_queue_head->lsn;
430 else
432 wal_distance = 0;
435 /* How many IOs are currently in flight and completed? */
436 io_depth = lrq_inflight(prefetcher->streaming_read);
437 completed = lrq_completed(prefetcher->streaming_read);
439 /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
440 SharedStats->io_depth = io_depth;
441 SharedStats->block_distance = io_depth + completed;
442 SharedStats->wal_distance = wal_distance;
444 prefetcher->next_stats_shm_lsn =
445 prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
449 * A callback that examines the next block reference in the WAL, and possibly
450 * starts an IO so that a later read will be fast.
452 * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
454 * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
455 * that isn't in the buffer pool, and the kernel has been asked to start
456 * reading it to make a future read system call faster. An LSN is written to
457 * *lsn, and the I/O will be considered to have completed once that LSN is
458 * replayed.
460 * Returns LRQ_NO_IO if we examined the next block reference and found that it
461 * was already in the buffer pool, or we decided for various reasons not to
462 * prefetch.
464 static LsnReadQueueNextStatus
465 XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
467 XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
468 XLogReaderState *reader = prefetcher->reader;
469 XLogRecPtr replaying_lsn = reader->ReadRecPtr;
472 * We keep track of the record and block we're up to between calls with
473 * prefetcher->record and prefetcher->next_block_id.
475 for (;;)
477 DecodedXLogRecord *record;
479 /* Try to read a new future record, if we don't already have one. */
480 if (prefetcher->record == NULL)
482 bool nonblocking;
485 * If there are already records or an error queued up that could
486 * be replayed, we don't want to block here. Otherwise, it's OK
487 * to block waiting for more data: presumably the caller has
488 * nothing else to do.
490 nonblocking = XLogReaderHasQueuedRecordOrError(reader);
492 /* Readahead is disabled until we replay past a certain point. */
493 if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
494 return LRQ_NEXT_AGAIN;
496 record = XLogReadAhead(prefetcher->reader, nonblocking);
497 if (record == NULL)
500 * We can't read any more, due to an error or lack of data in
501 * nonblocking mode. Don't try to read ahead again until
502 * we've replayed everything already decoded.
504 if (nonblocking && prefetcher->reader->decode_queue_tail)
505 prefetcher->no_readahead_until =
506 prefetcher->reader->decode_queue_tail->lsn;
508 return LRQ_NEXT_AGAIN;
512 * If prefetching is disabled, we don't need to analyze the record
513 * or issue any prefetches. We just need to cause one record to
514 * be decoded.
516 if (!RecoveryPrefetchEnabled())
518 *lsn = InvalidXLogRecPtr;
519 return LRQ_NEXT_NO_IO;
522 /* We have a new record to process. */
523 prefetcher->record = record;
524 prefetcher->next_block_id = 0;
526 else
528 /* Continue to process from last call, or last loop. */
529 record = prefetcher->record;
533 * Check for operations that require us to filter out block ranges, or
534 * pause readahead completely.
536 if (replaying_lsn < record->lsn)
538 uint8 rmid = record->header.xl_rmid;
539 uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
541 if (rmid == RM_XLOG_ID)
543 if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
544 record_type == XLOG_END_OF_RECOVERY)
547 * These records might change the TLI. Avoid potential
548 * bugs if we were to allow "read TLI" and "replay TLI" to
549 * differ without more analysis.
551 prefetcher->no_readahead_until = record->lsn;
553 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
554 elog(XLOGPREFETCHER_DEBUG_LEVEL,
555 "suppressing all readahead until %X/%X is replayed due to possible TLI change",
556 LSN_FORMAT_ARGS(record->lsn));
557 #endif
559 /* Fall through so we move past this record. */
562 else if (rmid == RM_DBASE_ID)
565 * When databases are created with the file-copy strategy,
566 * there are no WAL records to tell us about the creation of
567 * individual relations.
569 if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
571 xl_dbase_create_file_copy_rec *xlrec =
572 (xl_dbase_create_file_copy_rec *) record->main_data;
573 RelFileLocator rlocator =
574 {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
577 * Don't try to prefetch anything in this database until
578 * it has been created, or we might confuse the blocks of
579 * different generations, if a database OID or
580 * relfilenumber is reused. It's also more efficient than
581 * discovering that relations don't exist on disk yet with
582 * ENOENT errors.
584 XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
586 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
587 elog(XLOGPREFETCHER_DEBUG_LEVEL,
588 "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
589 rlocator.dbOid,
590 LSN_FORMAT_ARGS(record->lsn));
591 #endif
594 else if (rmid == RM_SMGR_ID)
596 if (record_type == XLOG_SMGR_CREATE)
598 xl_smgr_create *xlrec = (xl_smgr_create *)
599 record->main_data;
601 if (xlrec->forkNum == MAIN_FORKNUM)
604 * Don't prefetch anything for this whole relation
605 * until it has been created. Otherwise we might
606 * confuse the blocks of different generations, if a
607 * relfilenumber is reused. This also avoids the need
608 * to discover the problem via extra syscalls that
609 * report ENOENT.
611 XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
612 record->lsn);
614 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
615 elog(XLOGPREFETCHER_DEBUG_LEVEL,
616 "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
617 xlrec->rlocator.spcOid,
618 xlrec->rlocator.dbOid,
619 xlrec->rlocator.relNumber,
620 LSN_FORMAT_ARGS(record->lsn));
621 #endif
624 else if (record_type == XLOG_SMGR_TRUNCATE)
626 xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
627 record->main_data;
630 * Don't consider prefetching anything in the truncated
631 * range until the truncation has been performed.
633 XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
634 xlrec->blkno,
635 record->lsn);
637 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
638 elog(XLOGPREFETCHER_DEBUG_LEVEL,
639 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
640 xlrec->rlocator.spcOid,
641 xlrec->rlocator.dbOid,
642 xlrec->rlocator.relNumber,
643 xlrec->blkno,
644 LSN_FORMAT_ARGS(record->lsn));
645 #endif
650 /* Scan the block references, starting where we left off last time. */
651 while (prefetcher->next_block_id <= record->max_block_id)
653 int block_id = prefetcher->next_block_id++;
654 DecodedBkpBlock *block = &record->blocks[block_id];
655 SMgrRelation reln;
656 PrefetchBufferResult result;
658 if (!block->in_use)
659 continue;
661 Assert(!BufferIsValid(block->prefetch_buffer));
664 * Record the LSN of this record. When it's replayed,
665 * LsnReadQueue will consider any IOs submitted for earlier LSNs
666 * to be finished.
668 *lsn = record->lsn;
670 /* We don't try to prefetch anything but the main fork for now. */
671 if (block->forknum != MAIN_FORKNUM)
673 return LRQ_NEXT_NO_IO;
677 * If there is a full page image attached, we won't be reading the
678 * page, so don't bother trying to prefetch.
680 if (block->has_image)
682 XLogPrefetchIncrement(&SharedStats->skip_fpw);
683 return LRQ_NEXT_NO_IO;
686 /* There is no point in reading a page that will be zeroed. */
687 if (block->flags & BKPBLOCK_WILL_INIT)
689 XLogPrefetchIncrement(&SharedStats->skip_init);
690 return LRQ_NEXT_NO_IO;
693 /* Should we skip prefetching this block due to a filter? */
694 if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
696 XLogPrefetchIncrement(&SharedStats->skip_new);
697 return LRQ_NEXT_NO_IO;
700 /* There is no point in repeatedly prefetching the same block. */
701 for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
703 if (block->blkno == prefetcher->recent_block[i] &&
704 RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
707 * XXX If we also remembered where it was, we could set
708 * recent_buffer so that recovery could skip smgropen()
709 * and a buffer table lookup.
711 XLogPrefetchIncrement(&SharedStats->skip_rep);
712 return LRQ_NEXT_NO_IO;
715 prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
716 prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
717 prefetcher->recent_idx =
718 (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
721 * We could try to have a fast path for repeated references to the
722 * same relation (with some scheme to handle invalidations
723 * safely), but for now we'll call smgropen() every time.
725 reln = smgropen(block->rlocator, InvalidBackendId);
728 * If the relation file doesn't exist on disk, for example because
729 * we're replaying after a crash and the file will be created and
730 * then unlinked by WAL that hasn't been replayed yet, suppress
731 * further prefetching in the relation until this record is
732 * replayed.
734 if (!smgrexists(reln, MAIN_FORKNUM))
736 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
737 elog(XLOGPREFETCHER_DEBUG_LEVEL,
738 "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
739 reln->smgr_rlocator.locator.spcOid,
740 reln->smgr_rlocator.locator.dbOid,
741 reln->smgr_rlocator.locator.relNumber,
742 LSN_FORMAT_ARGS(record->lsn));
743 #endif
744 XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
745 record->lsn);
746 XLogPrefetchIncrement(&SharedStats->skip_new);
747 return LRQ_NEXT_NO_IO;
751 * If the relation isn't big enough to contain the referenced
752 * block yet, suppress prefetching of this block and higher until
753 * this record is replayed.
755 if (block->blkno >= smgrnblocks(reln, block->forknum))
757 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
758 elog(XLOGPREFETCHER_DEBUG_LEVEL,
759 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
760 reln->smgr_rlocator.locator.spcOid,
761 reln->smgr_rlocator.locator.dbOid,
762 reln->smgr_rlocator.locator.relNumber,
763 block->blkno,
764 LSN_FORMAT_ARGS(record->lsn));
765 #endif
766 XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
767 record->lsn);
768 XLogPrefetchIncrement(&SharedStats->skip_new);
769 return LRQ_NEXT_NO_IO;
772 /* Try to initiate prefetching. */
773 result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
774 if (BufferIsValid(result.recent_buffer))
776 /* Cache hit, nothing to do. */
777 XLogPrefetchIncrement(&SharedStats->hit);
778 block->prefetch_buffer = result.recent_buffer;
779 return LRQ_NEXT_NO_IO;
781 else if (result.initiated_io)
783 /* Cache miss, I/O (presumably) started. */
784 XLogPrefetchIncrement(&SharedStats->prefetch);
785 block->prefetch_buffer = InvalidBuffer;
786 return LRQ_NEXT_IO;
788 else
791 * This shouldn't be possible, because we already determined
792 * that the relation exists on disk and is big enough.
793 * Something is wrong with the cache invalidation for
794 * smgrexists(), smgrnblocks(), or the file was unlinked or
795 * truncated beneath our feet?
797 elog(ERROR,
798 "could not prefetch relation %u/%u/%u block %u",
799 reln->smgr_rlocator.locator.spcOid,
800 reln->smgr_rlocator.locator.dbOid,
801 reln->smgr_rlocator.locator.relNumber,
802 block->blkno);
807 * Several callsites need to be able to read exactly one record
808 * without any internal readahead. Examples: xlog.c reading
809 * checkpoint records with emode set to PANIC, which might otherwise
810 * cause XLogPageRead() to panic on some future page, and xlog.c
811 * determining where to start writing WAL next, which depends on the
812 * contents of the reader's internal buffer after reading one record.
813 * Therefore, don't even think about prefetching until the first
814 * record after XLogPrefetcherBeginRead() has been consumed.
816 if (prefetcher->reader->decode_queue_tail &&
817 prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
818 return LRQ_NEXT_AGAIN;
820 /* Advance to the next record. */
821 prefetcher->record = NULL;
823 pg_unreachable();
827 * Expose statistics about recovery prefetching.
829 Datum
830 pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
832 #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
833 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
834 Datum values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
835 bool nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
837 InitMaterializedSRF(fcinfo, 0);
839 for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
840 nulls[i] = false;
842 values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
843 values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
844 values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
845 values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
846 values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
847 values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
848 values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
849 values[7] = Int32GetDatum(SharedStats->wal_distance);
850 values[8] = Int32GetDatum(SharedStats->block_distance);
851 values[9] = Int32GetDatum(SharedStats->io_depth);
852 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
854 return (Datum) 0;
858 * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
859 * has been replayed.
861 static inline void
862 XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
863 BlockNumber blockno, XLogRecPtr lsn)
865 XLogPrefetcherFilter *filter;
866 bool found;
868 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
869 if (!found)
872 * Don't allow any prefetching of this block or higher until replayed.
874 filter->filter_until_replayed = lsn;
875 filter->filter_from_block = blockno;
876 dlist_push_head(&prefetcher->filter_queue, &filter->link);
878 else
881 * We were already filtering this rlocator. Extend the filter's
882 * lifetime to cover this WAL record, but leave the lower of the block
883 * numbers there because we don't want to have to track individual
884 * blocks.
886 filter->filter_until_replayed = lsn;
887 dlist_delete(&filter->link);
888 dlist_push_head(&prefetcher->filter_queue, &filter->link);
889 filter->filter_from_block = Min(filter->filter_from_block, blockno);
894 * Have we replayed any records that caused us to begin filtering a block
895 * range? That means that relations should have been created, extended or
896 * dropped as required, so we can stop filtering out accesses to a given
897 * relfilenumber.
899 static inline void
900 XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
902 while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
904 XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
905 link,
906 &prefetcher->filter_queue);
908 if (filter->filter_until_replayed >= replaying_lsn)
909 break;
911 dlist_delete(&filter->link);
912 hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
917 * Check if a given block should be skipped due to a filter.
919 static inline bool
920 XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
921 BlockNumber blockno)
924 * Test for empty queue first, because we expect it to be empty most of
925 * the time and we can avoid the hash table lookup in that case.
927 if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
929 XLogPrefetcherFilter *filter;
931 /* See if the block range is filtered. */
932 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
933 if (filter && filter->filter_from_block <= blockno)
935 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
936 elog(XLOGPREFETCHER_DEBUG_LEVEL,
937 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
938 rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
939 LSN_FORMAT_ARGS(filter->filter_until_replayed),
940 filter->filter_from_block);
941 #endif
942 return true;
945 /* See if the whole database is filtered. */
946 rlocator.relNumber = InvalidRelFileNumber;
947 rlocator.spcOid = InvalidOid;
948 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
949 if (filter)
951 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
952 elog(XLOGPREFETCHER_DEBUG_LEVEL,
953 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
954 rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
955 LSN_FORMAT_ARGS(filter->filter_until_replayed));
956 #endif
957 return true;
961 return false;
965 * A wrapper for XLogBeginRead() that also resets the prefetcher.
967 void
968 XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
970 /* This will forget about any in-flight IO. */
971 prefetcher->reconfigure_count--;
973 /* Book-keeping to avoid readahead on first read. */
974 prefetcher->begin_ptr = recPtr;
976 prefetcher->no_readahead_until = 0;
978 /* This will forget about any queued up records in the decoder. */
979 XLogBeginRead(prefetcher->reader, recPtr);
983 * A wrapper for XLogReadRecord() that provides the same interface, but also
984 * tries to initiate I/O for blocks referenced in future WAL records.
986 XLogRecord *
987 XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
989 DecodedXLogRecord *record;
990 XLogRecPtr replayed_up_to;
993 * See if it's time to reset the prefetching machinery, because a relevant
994 * GUC was changed.
996 if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
998 uint32 max_distance;
999 uint32 max_inflight;
1001 if (prefetcher->streaming_read)
1002 lrq_free(prefetcher->streaming_read);
1004 if (RecoveryPrefetchEnabled())
1006 Assert(maintenance_io_concurrency > 0);
1007 max_inflight = maintenance_io_concurrency;
1008 max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1010 else
1012 max_inflight = 1;
1013 max_distance = 1;
1016 prefetcher->streaming_read = lrq_alloc(max_distance,
1017 max_inflight,
1018 (uintptr_t) prefetcher,
1019 XLogPrefetcherNextBlock);
1021 prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1025 * Release last returned record, if there is one, as it's now been
1026 * replayed.
1028 replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1031 * Can we drop any filters yet? If we were waiting for a relation to be
1032 * created or extended, it is now OK to access blocks in the covered
1033 * range.
1035 XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1038 * All IO initiated by earlier WAL is now completed. This might trigger
1039 * further prefetching.
1041 lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1044 * If there's nothing queued yet, then start prefetching to cause at least
1045 * one record to be queued.
1047 if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1049 Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1050 Assert(lrq_completed(prefetcher->streaming_read) == 0);
1051 lrq_prefetch(prefetcher->streaming_read);
1054 /* Read the next record. */
1055 record = XLogNextRecord(prefetcher->reader, errmsg);
1056 if (!record)
1057 return NULL;
1060 * The record we just got is the "current" one, for the benefit of the
1061 * XLogRecXXX() macros.
1063 Assert(record == prefetcher->reader->record);
1066 * If maintenance_io_concurrency is set very low, we might have started
1067 * prefetching some but not all of the blocks referenced in the record
1068 * we're about to return. Forget about the rest of the blocks in this
1069 * record by dropping the prefetcher's reference to it.
1071 if (record == prefetcher->record)
1072 prefetcher->record = NULL;
1075 * See if it's time to compute some statistics, because enough WAL has
1076 * been processed.
1078 if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1079 XLogPrefetcherComputeStats(prefetcher);
1081 Assert(record == prefetcher->reader->record);
1083 return &record->header;
1086 bool
1087 check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1089 #ifndef USE_PREFETCH
1090 if (*new_value == RECOVERY_PREFETCH_ON)
1092 GUC_check_errdetail("recovery_prefetch is not supported on platforms that lack posix_fadvise().");
1093 return false;
1095 #endif
1097 return true;
1100 void
1101 assign_recovery_prefetch(int new_value, void *extra)
1103 /* Reconfigure prefetching, because a setting it depends on changed. */
1104 recovery_prefetch = new_value;
1105 if (AmStartupProcess())
1106 XLogPrefetchReconfigure();