1 /*-------------------------------------------------------------------------
4 * interface routines for the postgres GiST index access method.
7 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
13 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/gist_private.h"
19 #include "catalog/storage.h"
20 #include "commands/vacuum.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "storage/freespace.h"
24 #include "storage/indexfsm.h"
25 #include "storage/lmgr.h"
26 #include "utils/memutils.h"
29 typedef struct GistBulkDeleteResult
31 IndexBulkDeleteResult std
; /* common state */
33 } GistBulkDeleteResult
;
40 GistBulkDeleteResult
*result
;
41 BufferAccessStrategy strategy
;
52 * Make union of keys on page
55 PageMakeUnionKey(GistVacuum
*gv
, Buffer buffer
)
57 Page page
= BufferGetPage(buffer
);
62 MemoryContext oldCtx
= MemoryContextSwitchTo(gv
->opCtx
);
64 vec
= gistextractpage(page
, &veclen
);
67 * we call gistunion() in temprorary context because user-defined
68 * functions called in gistunion() may do not free all memory
70 tmp
= gistunion(gv
->index
, vec
, veclen
, &(gv
->giststate
));
71 MemoryContextSwitchTo(oldCtx
);
73 res
= (IndexTuple
) palloc(IndexTupleSize(tmp
));
74 memcpy(res
, tmp
, IndexTupleSize(tmp
));
76 ItemPointerSetBlockNumber(&(res
->t_tid
), BufferGetBlockNumber(buffer
));
77 GistTupleSetValid(res
);
79 MemoryContextReset(gv
->opCtx
);
85 gistDeleteSubtree(GistVacuum
*gv
, BlockNumber blkno
)
90 buffer
= ReadBufferExtended(gv
->index
, MAIN_FORKNUM
, blkno
, RBM_NORMAL
,
92 LockBuffer(buffer
, GIST_EXCLUSIVE
);
93 page
= (Page
) BufferGetPage(buffer
);
95 if (!GistPageIsLeaf(page
))
99 for (i
= FirstOffsetNumber
; i
<= PageGetMaxOffsetNumber(page
); i
= OffsetNumberNext(i
))
101 ItemId iid
= PageGetItemId(page
, i
);
102 IndexTuple idxtuple
= (IndexTuple
) PageGetItem(page
, iid
);
104 gistDeleteSubtree(gv
, ItemPointerGetBlockNumber(&(idxtuple
->t_tid
)));
108 START_CRIT_SECTION();
110 MarkBufferDirty(buffer
);
112 page
= (Page
) BufferGetPage(buffer
);
113 GistPageSetDeleted(page
);
114 gv
->result
->std
.pages_deleted
++;
116 if (!gv
->index
->rd_istemp
)
118 XLogRecData rdata
[2];
120 gistxlogPageDelete xlrec
;
122 xlrec
.node
= gv
->index
->rd_node
;
125 rdata
[0].buffer
= buffer
;
126 rdata
[0].buffer_std
= true;
127 rdata
[0].data
= NULL
;
129 rdata
[0].next
= &(rdata
[1]);
131 rdata
[1].buffer
= InvalidBuffer
;
132 rdata
[1].data
= (char *) &xlrec
;
133 rdata
[1].len
= sizeof(gistxlogPageDelete
);
134 rdata
[1].next
= NULL
;
136 recptr
= XLogInsert(RM_GIST_ID
, XLOG_GIST_PAGE_DELETE
, rdata
);
137 PageSetLSN(page
, recptr
);
138 PageSetTLI(page
, ThisTimeLineID
);
141 PageSetLSN(page
, XLogRecPtrForTemp
);
145 UnlockReleaseBuffer(buffer
);
149 vacuumSplitPage(GistVacuum
*gv
, Page tempPage
, Buffer buffer
, IndexTuple
*addon
, int curlenaddon
)
151 ArrayTuple res
= {NULL
, 0, false};
153 SplitedPageLayout
*dist
= NULL
,
157 BlockNumber blkno
= BufferGetBlockNumber(buffer
);
158 MemoryContext oldCtx
= MemoryContextSwitchTo(gv
->opCtx
);
160 vec
= gistextractpage(tempPage
, &veclen
);
161 vec
= gistjoinvector(vec
, &veclen
, addon
, curlenaddon
);
162 dist
= gistSplit(gv
->index
, tempPage
, vec
, veclen
, &(gv
->giststate
));
164 MemoryContextSwitchTo(oldCtx
);
166 if (blkno
!= GIST_ROOT_BLKNO
)
168 /* if non-root split then we should not allocate new buffer */
169 dist
->buffer
= buffer
;
170 dist
->page
= tempPage
;
171 /* during vacuum we never split leaf page */
172 GistPageGetOpaque(dist
->page
)->flags
= 0;
177 res
.itup
= (IndexTuple
*) palloc(sizeof(IndexTuple
) * veclen
);
180 /* make new pages and fills them */
181 for (ptr
= dist
; ptr
; ptr
= ptr
->next
)
185 if (ptr
->buffer
== InvalidBuffer
)
187 ptr
->buffer
= gistNewBuffer(gv
->index
);
188 GISTInitBuffer(ptr
->buffer
, 0);
189 ptr
->page
= BufferGetPage(ptr
->buffer
);
191 ptr
->block
.blkno
= BufferGetBlockNumber(ptr
->buffer
);
193 data
= (char *) (ptr
->list
);
194 for (i
= 0; i
< ptr
->block
.num
; i
++)
196 if (PageAddItem(ptr
->page
, (Item
) data
, IndexTupleSize((IndexTuple
) data
), i
+ FirstOffsetNumber
, false, false) == InvalidOffsetNumber
)
197 elog(ERROR
, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv
->index
));
198 data
+= IndexTupleSize((IndexTuple
) data
);
201 ItemPointerSetBlockNumber(&(ptr
->itup
->t_tid
), ptr
->block
.blkno
);
202 res
.itup
[res
.ituplen
] = (IndexTuple
) palloc(IndexTupleSize(ptr
->itup
));
203 memcpy(res
.itup
[res
.ituplen
], ptr
->itup
, IndexTupleSize(ptr
->itup
));
207 START_CRIT_SECTION();
209 for (ptr
= dist
; ptr
; ptr
= ptr
->next
)
211 MarkBufferDirty(ptr
->buffer
);
212 GistPageGetOpaque(ptr
->page
)->rightlink
= InvalidBlockNumber
;
215 /* restore splitted non-root page */
216 if (blkno
!= GIST_ROOT_BLKNO
)
218 PageRestoreTempPage(dist
->page
, BufferGetPage(dist
->buffer
));
219 dist
->page
= BufferGetPage(dist
->buffer
);
222 if (!gv
->index
->rd_istemp
)
226 ItemPointerData key
; /* set key for incomplete insert */
229 ItemPointerSet(&key
, blkno
, TUPLE_IS_VALID
);
231 rdata
= formSplitRdata(gv
->index
->rd_node
, blkno
,
233 xlinfo
= rdata
->data
;
235 recptr
= XLogInsert(RM_GIST_ID
, XLOG_GIST_PAGE_SPLIT
, rdata
);
236 for (ptr
= dist
; ptr
; ptr
= ptr
->next
)
238 PageSetLSN(BufferGetPage(ptr
->buffer
), recptr
);
239 PageSetTLI(BufferGetPage(ptr
->buffer
), ThisTimeLineID
);
247 for (ptr
= dist
; ptr
; ptr
= ptr
->next
)
248 PageSetLSN(BufferGetPage(ptr
->buffer
), XLogRecPtrForTemp
);
251 for (ptr
= dist
; ptr
; ptr
= ptr
->next
)
253 /* we must keep the buffer pin on the head page */
254 if (BufferGetBlockNumber(ptr
->buffer
) != blkno
)
255 UnlockReleaseBuffer(ptr
->buffer
);
258 if (blkno
== GIST_ROOT_BLKNO
)
260 ItemPointerData key
; /* set key for incomplete insert */
262 ItemPointerSet(&key
, blkno
, TUPLE_IS_VALID
);
264 gistnewroot(gv
->index
, buffer
, res
.itup
, res
.ituplen
, &key
);
269 MemoryContextReset(gv
->opCtx
);
275 gistVacuumUpdate(GistVacuum
*gv
, BlockNumber blkno
, bool needunion
)
277 ArrayTuple res
= {NULL
, 0, false};
290 bool needwrite
= false;
291 OffsetNumber offToDelete
[MaxOffsetNumber
];
292 BlockNumber blkToDelete
[MaxOffsetNumber
];
293 ItemPointerData
*completed
= NULL
;
297 vacuum_delay_point();
299 buffer
= ReadBufferExtended(gv
->index
, MAIN_FORKNUM
, blkno
, RBM_NORMAL
,
301 LockBuffer(buffer
, GIST_EXCLUSIVE
);
302 gistcheckpage(gv
->index
, buffer
);
303 page
= (Page
) BufferGetPage(buffer
);
304 maxoff
= PageGetMaxOffsetNumber(page
);
306 if (GistPageIsLeaf(page
))
308 if (GistTuplesDeleted(page
))
309 needunion
= needwrite
= true;
313 completed
= (ItemPointerData
*) palloc(sizeof(ItemPointerData
) * lencompleted
);
314 addon
= (IndexTuple
*) palloc(sizeof(IndexTuple
) * lenaddon
);
316 /* get copy of page to work */
317 tempPage
= PageGetTempPageCopy(page
);
319 for (i
= FirstOffsetNumber
; i
<= maxoff
; i
= OffsetNumberNext(i
))
321 ArrayTuple chldtuple
;
324 iid
= PageGetItemId(tempPage
, i
);
325 idxtuple
= (IndexTuple
) PageGetItem(tempPage
, iid
);
326 needchildunion
= (GistTupleIsInvalid(idxtuple
)) ? true : false;
329 elog(DEBUG2
, "gistVacuumUpdate: need union for block %u",
330 ItemPointerGetBlockNumber(&(idxtuple
->t_tid
)));
332 chldtuple
= gistVacuumUpdate(gv
, ItemPointerGetBlockNumber(&(idxtuple
->t_tid
)),
334 if (chldtuple
.ituplen
|| chldtuple
.emptypage
)
336 /* update tuple or/and inserts new */
337 if (chldtuple
.emptypage
)
338 blkToDelete
[nBlkToDelete
++] = ItemPointerGetBlockNumber(&(idxtuple
->t_tid
));
339 offToDelete
[nOffToDelete
++] = i
;
340 PageIndexTupleDelete(tempPage
, i
);
343 needwrite
= needunion
= true;
345 if (chldtuple
.ituplen
)
348 Assert(chldtuple
.emptypage
== false);
349 while (curlenaddon
+ chldtuple
.ituplen
>= lenaddon
)
352 addon
= (IndexTuple
*) repalloc(addon
, sizeof(IndexTuple
) * lenaddon
);
355 memcpy(addon
+ curlenaddon
, chldtuple
.itup
, chldtuple
.ituplen
* sizeof(IndexTuple
));
357 curlenaddon
+= chldtuple
.ituplen
;
359 if (chldtuple
.ituplen
> 1)
362 * child was split, so we need mark completion
367 while (ncompleted
+ chldtuple
.ituplen
> lencompleted
)
370 completed
= (ItemPointerData
*) repalloc(completed
, sizeof(ItemPointerData
) * lencompleted
);
372 for (j
= 0; j
< chldtuple
.ituplen
; j
++)
374 ItemPointerCopy(&(chldtuple
.itup
[j
]->t_tid
), completed
+ ncompleted
);
378 pfree(chldtuple
.itup
);
383 Assert(maxoff
== PageGetMaxOffsetNumber(tempPage
));
387 /* insert updated tuples */
388 if (gistnospace(tempPage
, addon
, curlenaddon
, InvalidOffsetNumber
, 0))
390 /* there is no space on page to insert tuples */
391 res
= vacuumSplitPage(gv
, tempPage
, buffer
, addon
, curlenaddon
);
392 tempPage
= NULL
; /* vacuumSplitPage() free tempPage */
393 needwrite
= needunion
= false; /* gistSplit already forms
394 * unions and writes pages */
397 /* enough free space */
398 gistfillbuffer(tempPage
, addon
, curlenaddon
, InvalidOffsetNumber
);
403 * If page is empty, we should remove pointer to it before deleting page
407 if (blkno
!= GIST_ROOT_BLKNO
&& (PageIsEmpty(page
) || (tempPage
&& PageIsEmpty(tempPage
))))
410 * New version of page is empty, so leave it unchanged, upper call
411 * will mark our page as deleted. In case of page split we never will
414 * If page was empty it can't become non-empty during processing
416 res
.emptypage
= true;
417 UnlockReleaseBuffer(buffer
);
421 /* write page and remove its childs if it need */
423 START_CRIT_SECTION();
425 if (tempPage
&& needwrite
)
427 PageRestoreTempPage(tempPage
, page
);
432 if (PageIsEmpty(page
) && blkno
== GIST_ROOT_BLKNO
)
435 GistPageSetLeaf(page
);
441 MarkBufferDirty(buffer
);
442 GistClearTuplesDeleted(page
);
444 if (!gv
->index
->rd_istemp
)
450 rdata
= formUpdateRdata(gv
->index
->rd_node
, buffer
,
451 offToDelete
, nOffToDelete
,
452 addon
, curlenaddon
, NULL
);
453 xlinfo
= rdata
->next
->data
;
455 recptr
= XLogInsert(RM_GIST_ID
, XLOG_GIST_PAGE_UPDATE
, rdata
);
456 PageSetLSN(page
, recptr
);
457 PageSetTLI(page
, ThisTimeLineID
);
463 PageSetLSN(page
, XLogRecPtrForTemp
);
468 if (needunion
&& !PageIsEmpty(page
))
470 res
.itup
= (IndexTuple
*) palloc(sizeof(IndexTuple
));
472 res
.itup
[0] = PageMakeUnionKey(gv
, buffer
);
475 UnlockReleaseBuffer(buffer
);
477 /* delete empty children, now we havn't any links to pointed subtrees */
478 for (i
= 0; i
< nBlkToDelete
; i
++)
479 gistDeleteSubtree(gv
, blkToDelete
[i
]);
481 if (ncompleted
&& !gv
->index
->rd_istemp
)
482 gistxlogInsertCompletion(gv
->index
->rd_node
, completed
, ncompleted
);
486 for (i
= 0; i
< curlenaddon
; i
++)
499 * For usual vacuum just update FSM, for full vacuum
500 * reforms parent tuples if some of childs was deleted or changed,
501 * update invalid tuples (they can exist from last crash recovery only),
502 * tries to get smaller index
506 gistvacuumcleanup(PG_FUNCTION_ARGS
)
508 IndexVacuumInfo
*info
= (IndexVacuumInfo
*) PG_GETARG_POINTER(0);
509 GistBulkDeleteResult
*stats
= (GistBulkDeleteResult
*) PG_GETARG_POINTER(1);
510 Relation rel
= info
->index
;
513 BlockNumber totFreePages
;
514 BlockNumber lastBlock
= GIST_ROOT_BLKNO
,
515 lastFilledBlock
= GIST_ROOT_BLKNO
;
518 /* Set up all-zero stats if gistbulkdelete wasn't called */
521 stats
= (GistBulkDeleteResult
*) palloc0(sizeof(GistBulkDeleteResult
));
522 /* use heap's tuple count */
523 Assert(info
->num_heap_tuples
>= 0);
524 stats
->std
.num_index_tuples
= info
->num_heap_tuples
;
527 * XXX the above is wrong if index is partial. Would it be OK to just
528 * return NULL, or is there work we must do below?
532 /* gistVacuumUpdate may cause hard work */
533 if (info
->vacuum_full
)
538 /* note: vacuum.c already acquired AccessExclusiveLock on index */
541 initGISTstate(&(gv
.giststate
), rel
);
542 gv
.opCtx
= createTempGistContext();
544 gv
.strategy
= info
->strategy
;
546 /* walk through the entire index for update tuples */
547 res
= gistVacuumUpdate(&gv
, GIST_ROOT_BLKNO
, false);
553 for (i
= 0; i
< res
.ituplen
; i
++)
557 freeGISTstate(&(gv
.giststate
));
558 MemoryContextDelete(gv
.opCtx
);
560 else if (stats
->needFullVacuum
)
562 (errmsg("index \"%s\" needs VACUUM FULL or REINDEX to finish crash recovery",
563 RelationGetRelationName(rel
))));
566 * If vacuum full, we already have exclusive lock on the index. Otherwise,
567 * need lock unless it's local to this backend.
569 if (info
->vacuum_full
)
572 needLock
= !RELATION_IS_LOCAL(rel
);
574 /* try to find deleted pages */
576 LockRelationForExtension(rel
, ExclusiveLock
);
577 npages
= RelationGetNumberOfBlocks(rel
);
579 UnlockRelationForExtension(rel
, ExclusiveLock
);
582 for (blkno
= GIST_ROOT_BLKNO
+ 1; blkno
< npages
; blkno
++)
587 vacuum_delay_point();
589 buffer
= ReadBufferExtended(rel
, MAIN_FORKNUM
, blkno
, RBM_NORMAL
,
591 LockBuffer(buffer
, GIST_SHARE
);
592 page
= (Page
) BufferGetPage(buffer
);
594 if (PageIsNew(page
) || GistPageIsDeleted(page
))
597 RecordFreeIndexPage(rel
, blkno
);
600 lastFilledBlock
= blkno
;
601 UnlockReleaseBuffer(buffer
);
603 lastBlock
= npages
- 1;
605 if (info
->vacuum_full
&& lastFilledBlock
< lastBlock
)
606 { /* try to truncate index */
607 RelationTruncate(rel
, lastFilledBlock
+ 1);
609 stats
->std
.pages_removed
= lastBlock
- lastFilledBlock
;
610 totFreePages
= totFreePages
- stats
->std
.pages_removed
;
613 /* Finally, vacuum the FSM */
614 IndexFreeSpaceMapVacuum(info
->index
);
616 /* return statistics */
617 stats
->std
.pages_free
= totFreePages
;
619 LockRelationForExtension(rel
, ExclusiveLock
);
620 stats
->std
.num_pages
= RelationGetNumberOfBlocks(rel
);
622 UnlockRelationForExtension(rel
, ExclusiveLock
);
624 PG_RETURN_POINTER(stats
);
627 typedef struct GistBDItem
631 struct GistBDItem
*next
;
635 pushStackIfSplited(Page page
, GistBDItem
*stack
)
637 GISTPageOpaque opaque
= GistPageGetOpaque(page
);
639 if (stack
->blkno
!= GIST_ROOT_BLKNO
&& !XLogRecPtrIsInvalid(stack
->parentlsn
) &&
640 XLByteLT(stack
->parentlsn
, opaque
->nsn
) &&
641 opaque
->rightlink
!= InvalidBlockNumber
/* sanity check */ )
643 /* split page detected, install right link to the stack */
645 GistBDItem
*ptr
= (GistBDItem
*) palloc(sizeof(GistBDItem
));
647 ptr
->blkno
= opaque
->rightlink
;
648 ptr
->parentlsn
= stack
->parentlsn
;
649 ptr
->next
= stack
->next
;
656 * Bulk deletion of all index entries pointing to a set of heap tuples and
657 * check invalid tuples after crash recovery.
658 * The set of target tuples is specified via a callback routine that tells
659 * whether any given heap tuple (identified by ItemPointer) is being deleted.
661 * Result: a palloc'd struct containing statistical info for VACUUM displays.
664 gistbulkdelete(PG_FUNCTION_ARGS
)
666 IndexVacuumInfo
*info
= (IndexVacuumInfo
*) PG_GETARG_POINTER(0);
667 GistBulkDeleteResult
*stats
= (GistBulkDeleteResult
*) PG_GETARG_POINTER(1);
668 IndexBulkDeleteCallback callback
= (IndexBulkDeleteCallback
) PG_GETARG_POINTER(2);
669 void *callback_state
= (void *) PG_GETARG_POINTER(3);
670 Relation rel
= info
->index
;
674 /* first time through? */
676 stats
= (GistBulkDeleteResult
*) palloc0(sizeof(GistBulkDeleteResult
));
677 /* we'll re-count the tuples each time */
678 stats
->std
.num_index_tuples
= 0;
680 stack
= (GistBDItem
*) palloc0(sizeof(GistBDItem
));
681 stack
->blkno
= GIST_ROOT_BLKNO
;
692 buffer
= ReadBufferExtended(rel
, MAIN_FORKNUM
, stack
->blkno
,
693 RBM_NORMAL
, info
->strategy
);
694 LockBuffer(buffer
, GIST_SHARE
);
695 gistcheckpage(rel
, buffer
);
696 page
= (Page
) BufferGetPage(buffer
);
698 if (GistPageIsLeaf(page
))
700 OffsetNumber todelete
[MaxOffsetNumber
];
703 LockBuffer(buffer
, GIST_UNLOCK
);
704 LockBuffer(buffer
, GIST_EXCLUSIVE
);
706 page
= (Page
) BufferGetPage(buffer
);
707 if (stack
->blkno
== GIST_ROOT_BLKNO
&& !GistPageIsLeaf(page
))
709 /* only the root can become non-leaf during relock */
710 UnlockReleaseBuffer(buffer
);
716 * check for split proceeded after look at parent, we should check
719 pushStackIfSplited(page
, stack
);
722 * Remove deletable tuples from page
725 maxoff
= PageGetMaxOffsetNumber(page
);
727 for (i
= FirstOffsetNumber
; i
<= maxoff
; i
= OffsetNumberNext(i
))
729 iid
= PageGetItemId(page
, i
);
730 idxtuple
= (IndexTuple
) PageGetItem(page
, iid
);
732 if (callback(&(idxtuple
->t_tid
), callback_state
))
734 todelete
[ntodelete
] = i
- ntodelete
;
736 stats
->std
.tuples_removed
+= 1;
739 stats
->std
.num_index_tuples
+= 1;
744 START_CRIT_SECTION();
746 MarkBufferDirty(buffer
);
748 for (i
= 0; i
< ntodelete
; i
++)
749 PageIndexTupleDelete(page
, todelete
[i
]);
750 GistMarkTuplesDeleted(page
);
756 gistxlogPageUpdate
*xlinfo
;
758 rdata
= formUpdateRdata(rel
->rd_node
, buffer
,
762 xlinfo
= (gistxlogPageUpdate
*) rdata
->next
->data
;
764 recptr
= XLogInsert(RM_GIST_ID
, XLOG_GIST_PAGE_UPDATE
, rdata
);
765 PageSetLSN(page
, recptr
);
766 PageSetTLI(page
, ThisTimeLineID
);
772 PageSetLSN(page
, XLogRecPtrForTemp
);
780 /* check for split proceeded after look at parent */
781 pushStackIfSplited(page
, stack
);
783 maxoff
= PageGetMaxOffsetNumber(page
);
785 for (i
= FirstOffsetNumber
; i
<= maxoff
; i
= OffsetNumberNext(i
))
787 iid
= PageGetItemId(page
, i
);
788 idxtuple
= (IndexTuple
) PageGetItem(page
, iid
);
790 ptr
= (GistBDItem
*) palloc(sizeof(GistBDItem
));
791 ptr
->blkno
= ItemPointerGetBlockNumber(&(idxtuple
->t_tid
));
792 ptr
->parentlsn
= PageGetLSN(page
);
793 ptr
->next
= stack
->next
;
796 if (GistTupleIsInvalid(idxtuple
))
797 stats
->needFullVacuum
= true;
801 UnlockReleaseBuffer(buffer
);
807 vacuum_delay_point();
810 PG_RETURN_POINTER(stats
);