Update copyright for 2022
[pgsql.git] / src / backend / access / hash / hash.c
blobd48c8a454986971b4cf842d9a17b410e8ec5df44
1 /*-------------------------------------------------------------------------
3 * hash.c
4 * Implementation of Margo Seltzer's Hashing package for postgres.
6 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/access/hash/hash.c
13 * NOTES
14 * This file contains only the public interface routines.
16 *-------------------------------------------------------------------------
19 #include "postgres.h"
21 #include "access/hash.h"
22 #include "access/hash_xlog.h"
23 #include "access/relscan.h"
24 #include "access/tableam.h"
25 #include "catalog/index.h"
26 #include "commands/progress.h"
27 #include "commands/vacuum.h"
28 #include "miscadmin.h"
29 #include "optimizer/plancat.h"
30 #include "pgstat.h"
31 #include "utils/builtins.h"
32 #include "utils/index_selfuncs.h"
33 #include "utils/rel.h"
35 /* Working state for hashbuild and its callback */
36 typedef struct
38 HSpool *spool; /* NULL if not using spooling */
39 double indtuples; /* # tuples accepted into index */
40 Relation heapRel; /* heap relation descriptor */
41 } HashBuildState;
43 static void hashbuildCallback(Relation index,
44 ItemPointer tid,
45 Datum *values,
46 bool *isnull,
47 bool tupleIsAlive,
48 void *state);
52 * Hash handler function: return IndexAmRoutine with access method parameters
53 * and callbacks.
55 Datum
56 hashhandler(PG_FUNCTION_ARGS)
58 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
60 amroutine->amstrategies = HTMaxStrategyNumber;
61 amroutine->amsupport = HASHNProcs;
62 amroutine->amoptsprocnum = HASHOPTIONS_PROC;
63 amroutine->amcanorder = false;
64 amroutine->amcanorderbyop = false;
65 amroutine->amcanbackward = true;
66 amroutine->amcanunique = false;
67 amroutine->amcanmulticol = false;
68 amroutine->amoptionalkey = false;
69 amroutine->amsearcharray = false;
70 amroutine->amsearchnulls = false;
71 amroutine->amstorage = false;
72 amroutine->amclusterable = false;
73 amroutine->ampredlocks = true;
74 amroutine->amcanparallel = false;
75 amroutine->amcaninclude = false;
76 amroutine->amusemaintenanceworkmem = false;
77 amroutine->amhotblocking = true;
78 amroutine->amparallelvacuumoptions =
79 VACUUM_OPTION_PARALLEL_BULKDEL;
80 amroutine->amkeytype = INT4OID;
82 amroutine->ambuild = hashbuild;
83 amroutine->ambuildempty = hashbuildempty;
84 amroutine->aminsert = hashinsert;
85 amroutine->ambulkdelete = hashbulkdelete;
86 amroutine->amvacuumcleanup = hashvacuumcleanup;
87 amroutine->amcanreturn = NULL;
88 amroutine->amcostestimate = hashcostestimate;
89 amroutine->amoptions = hashoptions;
90 amroutine->amproperty = NULL;
91 amroutine->ambuildphasename = NULL;
92 amroutine->amvalidate = hashvalidate;
93 amroutine->amadjustmembers = hashadjustmembers;
94 amroutine->ambeginscan = hashbeginscan;
95 amroutine->amrescan = hashrescan;
96 amroutine->amgettuple = hashgettuple;
97 amroutine->amgetbitmap = hashgetbitmap;
98 amroutine->amendscan = hashendscan;
99 amroutine->ammarkpos = NULL;
100 amroutine->amrestrpos = NULL;
101 amroutine->amestimateparallelscan = NULL;
102 amroutine->aminitparallelscan = NULL;
103 amroutine->amparallelrescan = NULL;
105 PG_RETURN_POINTER(amroutine);
109 * hashbuild() -- build a new hash index.
111 IndexBuildResult *
112 hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
114 IndexBuildResult *result;
115 BlockNumber relpages;
116 double reltuples;
117 double allvisfrac;
118 uint32 num_buckets;
119 long sort_threshold;
120 HashBuildState buildstate;
123 * We expect to be called exactly once for any index relation. If that's
124 * not the case, big trouble's what we have.
126 if (RelationGetNumberOfBlocks(index) != 0)
127 elog(ERROR, "index \"%s\" already contains data",
128 RelationGetRelationName(index));
130 /* Estimate the number of rows currently present in the table */
131 estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
133 /* Initialize the hash index metadata page and initial buckets */
134 num_buckets = _hash_init(index, reltuples, MAIN_FORKNUM);
137 * If we just insert the tuples into the index in scan order, then
138 * (assuming their hash codes are pretty random) there will be no locality
139 * of access to the index, and if the index is bigger than available RAM
140 * then we'll thrash horribly. To prevent that scenario, we can sort the
141 * tuples by (expected) bucket number. However, such a sort is useless
142 * overhead when the index does fit in RAM. We choose to sort if the
143 * initial index size exceeds maintenance_work_mem, or the number of
144 * buffers usable for the index, whichever is less. (Limiting by the
145 * number of buffers should reduce thrashing between PG buffers and kernel
146 * buffers, which seems useful even if no physical I/O results. Limiting
147 * by maintenance_work_mem is useful to allow easy testing of the sort
148 * code path, and may be useful to DBAs as an additional control knob.)
150 * NOTE: this test will need adjustment if a bucket is ever different from
151 * one page. Also, "initial index size" accounting does not include the
152 * metapage, nor the first bitmap page.
154 sort_threshold = (maintenance_work_mem * 1024L) / BLCKSZ;
155 if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
156 sort_threshold = Min(sort_threshold, NBuffers);
157 else
158 sort_threshold = Min(sort_threshold, NLocBuffer);
160 if (num_buckets >= (uint32) sort_threshold)
161 buildstate.spool = _h_spoolinit(heap, index, num_buckets);
162 else
163 buildstate.spool = NULL;
165 /* prepare to build the index */
166 buildstate.indtuples = 0;
167 buildstate.heapRel = heap;
169 /* do the heap scan */
170 reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
171 hashbuildCallback,
172 (void *) &buildstate, NULL);
173 pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_TOTAL,
174 buildstate.indtuples);
176 if (buildstate.spool)
178 /* sort the tuples and insert them into the index */
179 _h_indexbuild(buildstate.spool, buildstate.heapRel);
180 _h_spooldestroy(buildstate.spool);
184 * Return statistics
186 result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
188 result->heap_tuples = reltuples;
189 result->index_tuples = buildstate.indtuples;
191 return result;
195 * hashbuildempty() -- build an empty hash index in the initialization fork
197 void
198 hashbuildempty(Relation index)
200 _hash_init(index, 0, INIT_FORKNUM);
204 * Per-tuple callback for table_index_build_scan
206 static void
207 hashbuildCallback(Relation index,
208 ItemPointer tid,
209 Datum *values,
210 bool *isnull,
211 bool tupleIsAlive,
212 void *state)
214 HashBuildState *buildstate = (HashBuildState *) state;
215 Datum index_values[1];
216 bool index_isnull[1];
217 IndexTuple itup;
219 /* convert data to a hash key; on failure, do not insert anything */
220 if (!_hash_convert_tuple(index,
221 values, isnull,
222 index_values, index_isnull))
223 return;
225 /* Either spool the tuple for sorting, or just put it into the index */
226 if (buildstate->spool)
227 _h_spool(buildstate->spool, tid, index_values, index_isnull);
228 else
230 /* form an index tuple and point it at the heap tuple */
231 itup = index_form_tuple(RelationGetDescr(index),
232 index_values, index_isnull);
233 itup->t_tid = *tid;
234 _hash_doinsert(index, itup, buildstate->heapRel);
235 pfree(itup);
238 buildstate->indtuples += 1;
242 * hashinsert() -- insert an index tuple into a hash table.
244 * Hash on the heap tuple's key, form an index tuple with hash code.
245 * Find the appropriate location for the new tuple, and put it there.
247 bool
248 hashinsert(Relation rel, Datum *values, bool *isnull,
249 ItemPointer ht_ctid, Relation heapRel,
250 IndexUniqueCheck checkUnique,
251 bool indexUnchanged,
252 IndexInfo *indexInfo)
254 Datum index_values[1];
255 bool index_isnull[1];
256 IndexTuple itup;
258 /* convert data to a hash key; on failure, do not insert anything */
259 if (!_hash_convert_tuple(rel,
260 values, isnull,
261 index_values, index_isnull))
262 return false;
264 /* form an index tuple and point it at the heap tuple */
265 itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
266 itup->t_tid = *ht_ctid;
268 _hash_doinsert(rel, itup, heapRel);
270 pfree(itup);
272 return false;
277 * hashgettuple() -- Get the next tuple in the scan.
279 bool
280 hashgettuple(IndexScanDesc scan, ScanDirection dir)
282 HashScanOpaque so = (HashScanOpaque) scan->opaque;
283 bool res;
285 /* Hash indexes are always lossy since we store only the hash code */
286 scan->xs_recheck = true;
289 * If we've already initialized this scan, we can just advance it in the
290 * appropriate direction. If we haven't done so yet, we call a routine to
291 * get the first item in the scan.
293 if (!HashScanPosIsValid(so->currPos))
294 res = _hash_first(scan, dir);
295 else
298 * Check to see if we should kill the previously-fetched tuple.
300 if (scan->kill_prior_tuple)
303 * Yes, so remember it for later. (We'll deal with all such tuples
304 * at once right after leaving the index page or at end of scan.)
305 * In case if caller reverses the indexscan direction it is quite
306 * possible that the same item might get entered multiple times.
307 * But, we don't detect that; instead, we just forget any excess
308 * entries.
310 if (so->killedItems == NULL)
311 so->killedItems = (int *)
312 palloc(MaxIndexTuplesPerPage * sizeof(int));
314 if (so->numKilled < MaxIndexTuplesPerPage)
315 so->killedItems[so->numKilled++] = so->currPos.itemIndex;
319 * Now continue the scan.
321 res = _hash_next(scan, dir);
324 return res;
329 * hashgetbitmap() -- get all tuples at once
331 int64
332 hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
334 HashScanOpaque so = (HashScanOpaque) scan->opaque;
335 bool res;
336 int64 ntids = 0;
337 HashScanPosItem *currItem;
339 res = _hash_first(scan, ForwardScanDirection);
341 while (res)
343 currItem = &so->currPos.items[so->currPos.itemIndex];
346 * _hash_first and _hash_next handle eliminate dead index entries
347 * whenever scan->ignore_killed_tuples is true. Therefore, there's
348 * nothing to do here except add the results to the TIDBitmap.
350 tbm_add_tuples(tbm, &(currItem->heapTid), 1, true);
351 ntids++;
353 res = _hash_next(scan, ForwardScanDirection);
356 return ntids;
361 * hashbeginscan() -- start a scan on a hash index
363 IndexScanDesc
364 hashbeginscan(Relation rel, int nkeys, int norderbys)
366 IndexScanDesc scan;
367 HashScanOpaque so;
369 /* no order by operators allowed */
370 Assert(norderbys == 0);
372 scan = RelationGetIndexScan(rel, nkeys, norderbys);
374 so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
375 HashScanPosInvalidate(so->currPos);
376 so->hashso_bucket_buf = InvalidBuffer;
377 so->hashso_split_bucket_buf = InvalidBuffer;
379 so->hashso_buc_populated = false;
380 so->hashso_buc_split = false;
382 so->killedItems = NULL;
383 so->numKilled = 0;
385 scan->opaque = so;
387 return scan;
391 * hashrescan() -- rescan an index relation
393 void
394 hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
395 ScanKey orderbys, int norderbys)
397 HashScanOpaque so = (HashScanOpaque) scan->opaque;
398 Relation rel = scan->indexRelation;
400 if (HashScanPosIsValid(so->currPos))
402 /* Before leaving current page, deal with any killed items */
403 if (so->numKilled > 0)
404 _hash_kill_items(scan);
407 _hash_dropscanbuf(rel, so);
409 /* set position invalid (this will cause _hash_first call) */
410 HashScanPosInvalidate(so->currPos);
412 /* Update scan key, if a new one is given */
413 if (scankey && scan->numberOfKeys > 0)
415 memmove(scan->keyData,
416 scankey,
417 scan->numberOfKeys * sizeof(ScanKeyData));
420 so->hashso_buc_populated = false;
421 so->hashso_buc_split = false;
425 * hashendscan() -- close down a scan
427 void
428 hashendscan(IndexScanDesc scan)
430 HashScanOpaque so = (HashScanOpaque) scan->opaque;
431 Relation rel = scan->indexRelation;
433 if (HashScanPosIsValid(so->currPos))
435 /* Before leaving current page, deal with any killed items */
436 if (so->numKilled > 0)
437 _hash_kill_items(scan);
440 _hash_dropscanbuf(rel, so);
442 if (so->killedItems != NULL)
443 pfree(so->killedItems);
444 pfree(so);
445 scan->opaque = NULL;
449 * Bulk deletion of all index entries pointing to a set of heap tuples.
450 * The set of target tuples is specified via a callback routine that tells
451 * whether any given heap tuple (identified by ItemPointer) is being deleted.
453 * This function also deletes the tuples that are moved by split to other
454 * bucket.
456 * Result: a palloc'd struct containing statistical info for VACUUM displays.
458 IndexBulkDeleteResult *
459 hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
460 IndexBulkDeleteCallback callback, void *callback_state)
462 Relation rel = info->index;
463 double tuples_removed;
464 double num_index_tuples;
465 double orig_ntuples;
466 Bucket orig_maxbucket;
467 Bucket cur_maxbucket;
468 Bucket cur_bucket;
469 Buffer metabuf = InvalidBuffer;
470 HashMetaPage metap;
471 HashMetaPage cachedmetap;
473 tuples_removed = 0;
474 num_index_tuples = 0;
477 * We need a copy of the metapage so that we can use its hashm_spares[]
478 * values to compute bucket page addresses, but a cached copy should be
479 * good enough. (If not, we'll detect that further down and refresh the
480 * cache as necessary.)
482 cachedmetap = _hash_getcachedmetap(rel, &metabuf, false);
483 Assert(cachedmetap != NULL);
485 orig_maxbucket = cachedmetap->hashm_maxbucket;
486 orig_ntuples = cachedmetap->hashm_ntuples;
488 /* Scan the buckets that we know exist */
489 cur_bucket = 0;
490 cur_maxbucket = orig_maxbucket;
492 loop_top:
493 while (cur_bucket <= cur_maxbucket)
495 BlockNumber bucket_blkno;
496 BlockNumber blkno;
497 Buffer bucket_buf;
498 Buffer buf;
499 HashPageOpaque bucket_opaque;
500 Page page;
501 bool split_cleanup = false;
503 /* Get address of bucket's start page */
504 bucket_blkno = BUCKET_TO_BLKNO(cachedmetap, cur_bucket);
506 blkno = bucket_blkno;
509 * We need to acquire a cleanup lock on the primary bucket page to out
510 * wait concurrent scans before deleting the dead tuples.
512 buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
513 LockBufferForCleanup(buf);
514 _hash_checkpage(rel, buf, LH_BUCKET_PAGE);
516 page = BufferGetPage(buf);
517 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
520 * If the bucket contains tuples that are moved by split, then we need
521 * to delete such tuples. We can't delete such tuples if the split
522 * operation on bucket is not finished as those are needed by scans.
524 if (!H_BUCKET_BEING_SPLIT(bucket_opaque) &&
525 H_NEEDS_SPLIT_CLEANUP(bucket_opaque))
527 split_cleanup = true;
530 * This bucket might have been split since we last held a lock on
531 * the metapage. If so, hashm_maxbucket, hashm_highmask and
532 * hashm_lowmask might be old enough to cause us to fail to remove
533 * tuples left behind by the most recent split. To prevent that,
534 * now that the primary page of the target bucket has been locked
535 * (and thus can't be further split), check whether we need to
536 * update our cached metapage data.
538 Assert(bucket_opaque->hasho_prevblkno != InvalidBlockNumber);
539 if (bucket_opaque->hasho_prevblkno > cachedmetap->hashm_maxbucket)
541 cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
542 Assert(cachedmetap != NULL);
546 bucket_buf = buf;
548 hashbucketcleanup(rel, cur_bucket, bucket_buf, blkno, info->strategy,
549 cachedmetap->hashm_maxbucket,
550 cachedmetap->hashm_highmask,
551 cachedmetap->hashm_lowmask, &tuples_removed,
552 &num_index_tuples, split_cleanup,
553 callback, callback_state);
555 _hash_dropbuf(rel, bucket_buf);
557 /* Advance to next bucket */
558 cur_bucket++;
561 if (BufferIsInvalid(metabuf))
562 metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
564 /* Write-lock metapage and check for split since we started */
565 LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
566 metap = HashPageGetMeta(BufferGetPage(metabuf));
568 if (cur_maxbucket != metap->hashm_maxbucket)
570 /* There's been a split, so process the additional bucket(s) */
571 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
572 cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
573 Assert(cachedmetap != NULL);
574 cur_maxbucket = cachedmetap->hashm_maxbucket;
575 goto loop_top;
578 /* Okay, we're really done. Update tuple count in metapage. */
579 START_CRIT_SECTION();
581 if (orig_maxbucket == metap->hashm_maxbucket &&
582 orig_ntuples == metap->hashm_ntuples)
585 * No one has split or inserted anything since start of scan, so
586 * believe our count as gospel.
588 metap->hashm_ntuples = num_index_tuples;
590 else
593 * Otherwise, our count is untrustworthy since we may have
594 * double-scanned tuples in split buckets. Proceed by dead-reckoning.
595 * (Note: we still return estimated_count = false, because using this
596 * count is better than not updating reltuples at all.)
598 if (metap->hashm_ntuples > tuples_removed)
599 metap->hashm_ntuples -= tuples_removed;
600 else
601 metap->hashm_ntuples = 0;
602 num_index_tuples = metap->hashm_ntuples;
605 MarkBufferDirty(metabuf);
607 /* XLOG stuff */
608 if (RelationNeedsWAL(rel))
610 xl_hash_update_meta_page xlrec;
611 XLogRecPtr recptr;
613 xlrec.ntuples = metap->hashm_ntuples;
615 XLogBeginInsert();
616 XLogRegisterData((char *) &xlrec, SizeOfHashUpdateMetaPage);
618 XLogRegisterBuffer(0, metabuf, REGBUF_STANDARD);
620 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_UPDATE_META_PAGE);
621 PageSetLSN(BufferGetPage(metabuf), recptr);
624 END_CRIT_SECTION();
626 _hash_relbuf(rel, metabuf);
628 /* return statistics */
629 if (stats == NULL)
630 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
631 stats->estimated_count = false;
632 stats->num_index_tuples = num_index_tuples;
633 stats->tuples_removed += tuples_removed;
634 /* hashvacuumcleanup will fill in num_pages */
636 return stats;
640 * Post-VACUUM cleanup.
642 * Result: a palloc'd struct containing statistical info for VACUUM displays.
644 IndexBulkDeleteResult *
645 hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
647 Relation rel = info->index;
648 BlockNumber num_pages;
650 /* If hashbulkdelete wasn't called, return NULL signifying no change */
651 /* Note: this covers the analyze_only case too */
652 if (stats == NULL)
653 return NULL;
655 /* update statistics */
656 num_pages = RelationGetNumberOfBlocks(rel);
657 stats->num_pages = num_pages;
659 return stats;
663 * Helper function to perform deletion of index entries from a bucket.
665 * This function expects that the caller has acquired a cleanup lock on the
666 * primary bucket page, and will return with a write lock again held on the
667 * primary bucket page. The lock won't necessarily be held continuously,
668 * though, because we'll release it when visiting overflow pages.
670 * There can't be any concurrent scans in progress when we first enter this
671 * function because of the cleanup lock we hold on the primary bucket page,
672 * but as soon as we release that lock, there might be. If those scans got
673 * ahead of our cleanup scan, they might see a tuple before we kill it and
674 * wake up only after VACUUM has completed and the TID has been recycled for
675 * an unrelated tuple. To avoid that calamity, we prevent scans from passing
676 * our cleanup scan by locking the next page in the bucket chain before
677 * releasing the lock on the previous page. (This type of lock chaining is not
678 * ideal, so we might want to look for a better solution at some point.)
680 * We need to retain a pin on the primary bucket to ensure that no concurrent
681 * split can start.
683 void
684 hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
685 BlockNumber bucket_blkno, BufferAccessStrategy bstrategy,
686 uint32 maxbucket, uint32 highmask, uint32 lowmask,
687 double *tuples_removed, double *num_index_tuples,
688 bool split_cleanup,
689 IndexBulkDeleteCallback callback, void *callback_state)
691 BlockNumber blkno;
692 Buffer buf;
693 Bucket new_bucket PG_USED_FOR_ASSERTS_ONLY = InvalidBucket;
694 bool bucket_dirty = false;
696 blkno = bucket_blkno;
697 buf = bucket_buf;
699 if (split_cleanup)
700 new_bucket = _hash_get_newbucket_from_oldbucket(rel, cur_bucket,
701 lowmask, maxbucket);
703 /* Scan each page in bucket */
704 for (;;)
706 HashPageOpaque opaque;
707 OffsetNumber offno;
708 OffsetNumber maxoffno;
709 Buffer next_buf;
710 Page page;
711 OffsetNumber deletable[MaxOffsetNumber];
712 int ndeletable = 0;
713 bool retain_pin = false;
714 bool clear_dead_marking = false;
716 vacuum_delay_point();
718 page = BufferGetPage(buf);
719 opaque = (HashPageOpaque) PageGetSpecialPointer(page);
721 /* Scan each tuple in page */
722 maxoffno = PageGetMaxOffsetNumber(page);
723 for (offno = FirstOffsetNumber;
724 offno <= maxoffno;
725 offno = OffsetNumberNext(offno))
727 ItemPointer htup;
728 IndexTuple itup;
729 Bucket bucket;
730 bool kill_tuple = false;
732 itup = (IndexTuple) PageGetItem(page,
733 PageGetItemId(page, offno));
734 htup = &(itup->t_tid);
737 * To remove the dead tuples, we strictly want to rely on results
738 * of callback function. refer btvacuumpage for detailed reason.
740 if (callback && callback(htup, callback_state))
742 kill_tuple = true;
743 if (tuples_removed)
744 *tuples_removed += 1;
746 else if (split_cleanup)
748 /* delete the tuples that are moved by split. */
749 bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
750 maxbucket,
751 highmask,
752 lowmask);
753 /* mark the item for deletion */
754 if (bucket != cur_bucket)
757 * We expect tuples to either belong to current bucket or
758 * new_bucket. This is ensured because we don't allow
759 * further splits from bucket that contains garbage. See
760 * comments in _hash_expandtable.
762 Assert(bucket == new_bucket);
763 kill_tuple = true;
767 if (kill_tuple)
769 /* mark the item for deletion */
770 deletable[ndeletable++] = offno;
772 else
774 /* we're keeping it, so count it */
775 if (num_index_tuples)
776 *num_index_tuples += 1;
780 /* retain the pin on primary bucket page till end of bucket scan */
781 if (blkno == bucket_blkno)
782 retain_pin = true;
783 else
784 retain_pin = false;
786 blkno = opaque->hasho_nextblkno;
789 * Apply deletions, advance to next page and write page if needed.
791 if (ndeletable > 0)
793 /* No ereport(ERROR) until changes are logged */
794 START_CRIT_SECTION();
796 PageIndexMultiDelete(page, deletable, ndeletable);
797 bucket_dirty = true;
800 * Let us mark the page as clean if vacuum removes the DEAD tuples
801 * from an index page. We do this by clearing
802 * LH_PAGE_HAS_DEAD_TUPLES flag.
804 if (tuples_removed && *tuples_removed > 0 &&
805 H_HAS_DEAD_TUPLES(opaque))
807 opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
808 clear_dead_marking = true;
811 MarkBufferDirty(buf);
813 /* XLOG stuff */
814 if (RelationNeedsWAL(rel))
816 xl_hash_delete xlrec;
817 XLogRecPtr recptr;
819 xlrec.clear_dead_marking = clear_dead_marking;
820 xlrec.is_primary_bucket_page = (buf == bucket_buf);
822 XLogBeginInsert();
823 XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
826 * bucket buffer needs to be registered to ensure that we can
827 * acquire a cleanup lock on it during replay.
829 if (!xlrec.is_primary_bucket_page)
830 XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
832 XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
833 XLogRegisterBufData(1, (char *) deletable,
834 ndeletable * sizeof(OffsetNumber));
836 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_DELETE);
837 PageSetLSN(BufferGetPage(buf), recptr);
840 END_CRIT_SECTION();
843 /* bail out if there are no more pages to scan. */
844 if (!BlockNumberIsValid(blkno))
845 break;
847 next_buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
848 LH_OVERFLOW_PAGE,
849 bstrategy);
852 * release the lock on previous page after acquiring the lock on next
853 * page
855 if (retain_pin)
856 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
857 else
858 _hash_relbuf(rel, buf);
860 buf = next_buf;
864 * lock the bucket page to clear the garbage flag and squeeze the bucket.
865 * if the current buffer is same as bucket buffer, then we already have
866 * lock on bucket page.
868 if (buf != bucket_buf)
870 _hash_relbuf(rel, buf);
871 LockBuffer(bucket_buf, BUFFER_LOCK_EXCLUSIVE);
875 * Clear the garbage flag from bucket after deleting the tuples that are
876 * moved by split. We purposefully clear the flag before squeeze bucket,
877 * so that after restart, vacuum shouldn't again try to delete the moved
878 * by split tuples.
880 if (split_cleanup)
882 HashPageOpaque bucket_opaque;
883 Page page;
885 page = BufferGetPage(bucket_buf);
886 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
888 /* No ereport(ERROR) until changes are logged */
889 START_CRIT_SECTION();
891 bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
892 MarkBufferDirty(bucket_buf);
894 /* XLOG stuff */
895 if (RelationNeedsWAL(rel))
897 XLogRecPtr recptr;
899 XLogBeginInsert();
900 XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
902 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_CLEANUP);
903 PageSetLSN(page, recptr);
906 END_CRIT_SECTION();
910 * If we have deleted anything, try to compact free space. For squeezing
911 * the bucket, we must have a cleanup lock, else it can impact the
912 * ordering of tuples for a scan that has started before it.
914 if (bucket_dirty && IsBufferCleanupOK(bucket_buf))
915 _hash_squeezebucket(rel, cur_bucket, bucket_blkno, bucket_buf,
916 bstrategy);
917 else
918 LockBuffer(bucket_buf, BUFFER_LOCK_UNLOCK);