Fix possible duplicate tuples while GiST scan. Now page is processed
[PostgreSQL.git] / src / backend / access / gist / gistget.c
blobb3d0d5e8315fca52ea73a1a7177bd2628ef6d667
1 /*-------------------------------------------------------------------------
3 * gistget.c
4 * fetch tuples from a GiST scan.
7 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * $PostgreSQL$
13 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/gist_private.h"
18 #include "access/relscan.h"
19 #include "executor/execdebug.h"
20 #include "miscadmin.h"
21 #include "pgstat.h"
22 #include "storage/bufmgr.h"
23 #include "utils/memutils.h"
26 static OffsetNumber gistfindnext(IndexScanDesc scan, OffsetNumber n,
27 ScanDirection dir);
28 static int64 gistnext(IndexScanDesc scan, ScanDirection dir, TIDBitmap *tbm);
29 static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan,
30 OffsetNumber offset);
32 static void
33 killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr)
35 Page p;
36 OffsetNumber offset;
38 LockBuffer(so->curbuf, GIST_SHARE);
39 gistcheckpage(r, so->curbuf);
40 p = (Page) BufferGetPage(so->curbuf);
42 if (XLByteEQ(so->stack->lsn, PageGetLSN(p)))
44 /* page unchanged, so all is simple */
45 offset = ItemPointerGetOffsetNumber(iptr);
46 ItemIdMarkDead(PageGetItemId(p, offset));
47 SetBufferCommitInfoNeedsSave(so->curbuf);
49 else
51 OffsetNumber maxoff = PageGetMaxOffsetNumber(p);
53 for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
55 IndexTuple ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset));
57 if (ItemPointerEquals(&(ituple->t_tid), iptr))
59 /* found */
60 ItemIdMarkDead(PageGetItemId(p, offset));
61 SetBufferCommitInfoNeedsSave(so->curbuf);
62 break;
67 LockBuffer(so->curbuf, GIST_UNLOCK);
71 * gistgettuple() -- Get the next tuple in the scan
73 Datum
74 gistgettuple(PG_FUNCTION_ARGS)
76 IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
77 ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
78 GISTScanOpaque so;
79 bool res;
81 so = (GISTScanOpaque) scan->opaque;
84 * If we have produced an index tuple in the past and the executor has
85 * informed us we need to mark it as "killed", do so now.
87 if (scan->kill_prior_tuple && ItemPointerIsValid(&(so->curpos)))
88 killtuple(scan->indexRelation, so, &(so->curpos));
91 * Get the next tuple that matches the search key.
93 res = (gistnext(scan, dir, NULL) > 0);
95 PG_RETURN_BOOL(res);
98 Datum
99 gistgetbitmap(PG_FUNCTION_ARGS)
101 IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
102 TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
103 int64 ntids;
105 ntids = gistnext(scan, ForwardScanDirection, tbm);
107 PG_RETURN_INT64(ntids);
111 * Fetch tuple(s) that match the search key; this can be invoked
112 * either to fetch the first such tuple or subsequent matching tuples.
114 * This function is used by both gistgettuple and gistgetbitmap. When
115 * invoked from gistgettuple, tbm is null and the next matching tuple
116 * is returned in scan->xs_ctup.t_self. When invoked from getbitmap,
117 * tbm is non-null and all matching tuples are added to tbm before
118 * returning. In both cases, the function result is the number of
119 * returned tuples.
121 * If scan specifies to skip killed tuples, continue looping until we find a
122 * non-killed tuple that matches the search key.
124 static int64
125 gistnext(IndexScanDesc scan, ScanDirection dir, TIDBitmap *tbm)
127 Page p;
128 OffsetNumber n;
129 GISTScanOpaque so;
130 GISTSearchStack *stk;
131 IndexTuple it;
132 GISTPageOpaque opaque;
133 int64 ntids = 0;
135 so = (GISTScanOpaque) scan->opaque;
137 if (ItemPointerIsValid(&so->curpos) == false)
139 /* Being asked to fetch the first entry, so start at the root */
140 Assert(so->curbuf == InvalidBuffer);
141 Assert(so->stack == NULL);
143 so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO);
145 stk = so->stack = (GISTSearchStack *) palloc0(sizeof(GISTSearchStack));
147 stk->next = NULL;
148 stk->block = GIST_ROOT_BLKNO;
150 pgstat_count_index_scan(scan->indexRelation);
152 else if (so->curbuf == InvalidBuffer)
154 return 0;
158 * check stored pointers from last visit
160 if ( so->nPageData > 0 )
163 * gistgetmulti never should go here
165 Assert( tbm == NULL );
167 if ( so->curPageData < so->nPageData )
170 * pageData is already ordered for scan's direction
172 scan->xs_ctup.t_self = so->pageData[ so->curPageData ].iptr;
173 scan->xs_recheck = so->pageData[ so->curPageData ].recheck;
174 so->curPageData ++;
176 return 1;
178 else
181 * Go to the next page
183 stk = so->stack->next;
184 pfree(so->stack);
185 so->stack = stk;
187 /* If we're out of stack entries, we're done */
188 if (so->stack == NULL)
190 ReleaseBuffer(so->curbuf);
191 so->curbuf = InvalidBuffer;
192 return 0;
195 so->curbuf = ReleaseAndReadBuffer(so->curbuf,
196 scan->indexRelation,
197 stk->block);
201 for (;;)
203 CHECK_FOR_INTERRUPTS();
205 /* First of all, we need lock buffer */
206 Assert(so->curbuf != InvalidBuffer);
207 LockBuffer(so->curbuf, GIST_SHARE);
208 gistcheckpage(scan->indexRelation, so->curbuf);
209 p = BufferGetPage(so->curbuf);
210 opaque = GistPageGetOpaque(p);
212 /* remember lsn to identify page changed for tuple's killing */
213 so->stack->lsn = PageGetLSN(p);
215 /* check page split, occured since visit to parent */
216 if (!XLogRecPtrIsInvalid(so->stack->parentlsn) &&
217 XLByteLT(so->stack->parentlsn, opaque->nsn) &&
218 opaque->rightlink != InvalidBlockNumber /* sanity check */ &&
219 (so->stack->next == NULL || so->stack->next->block != opaque->rightlink) /* check if already
220 added */ )
222 /* detect page split, follow right link to add pages */
224 stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
225 stk->next = so->stack->next;
226 stk->block = opaque->rightlink;
227 stk->parentlsn = so->stack->parentlsn;
228 memset(&(stk->lsn), 0, sizeof(GistNSN));
229 so->stack->next = stk;
232 /* if page is empty, then just skip it */
233 if (PageIsEmpty(p))
235 LockBuffer(so->curbuf, GIST_UNLOCK);
236 stk = so->stack->next;
237 pfree(so->stack);
238 so->stack = stk;
240 if (so->stack == NULL)
242 ReleaseBuffer(so->curbuf);
243 so->curbuf = InvalidBuffer;
244 return ntids;
247 so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
248 stk->block);
249 continue;
252 if (ScanDirectionIsBackward(dir))
253 n = PageGetMaxOffsetNumber(p);
254 else
255 n = FirstOffsetNumber;
257 /* wonderful, we can look at page */
258 so->nPageData = so->curPageData = 0;
260 for (;;)
262 n = gistfindnext(scan, n, dir);
264 if (!OffsetNumberIsValid(n))
267 * If we was called from gistgettuple and current buffer contains
268 * something matched then make a recursive call - it will return
269 * ItemPointer from so->pageData. But we save buffer pinned to
270 * support tuple's killing
272 if ( !tbm && so->nPageData > 0 )
274 LockBuffer(so->curbuf, GIST_UNLOCK);
275 return gistnext(scan, dir, NULL);
279 * We ran out of matching index entries on the current page,
280 * so pop the top stack entry and use it to continue the
281 * search.
283 LockBuffer(so->curbuf, GIST_UNLOCK);
284 stk = so->stack->next;
285 pfree(so->stack);
286 so->stack = stk;
288 /* If we're out of stack entries, we're done */
290 if (so->stack == NULL)
292 ReleaseBuffer(so->curbuf);
293 so->curbuf = InvalidBuffer;
294 return ntids;
297 so->curbuf = ReleaseAndReadBuffer(so->curbuf,
298 scan->indexRelation,
299 stk->block);
300 /* XXX go up */
301 break;
304 if (GistPageIsLeaf(p))
307 * We've found a matching index entry in a leaf page, so
308 * return success. Note that we keep "curbuf" pinned so that
309 * we can efficiently resume the index scan later.
311 ItemPointerSet(&(so->curpos),
312 BufferGetBlockNumber(so->curbuf), n);
314 if (!(scan->ignore_killed_tuples &&
315 ItemIdIsDead(PageGetItemId(p, n))))
317 it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
318 ntids++;
319 if (tbm != NULL)
320 tbm_add_tuples(tbm, &it->t_tid, 1, scan->xs_recheck);
321 else
323 so->pageData[ so->nPageData ].iptr = it->t_tid;
324 so->pageData[ so->nPageData ].recheck = scan->xs_recheck;
325 so->nPageData ++;
329 else
332 * We've found an entry in an internal node whose key is
333 * consistent with the search key, so push it to stack
335 stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
337 it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
338 stk->block = ItemPointerGetBlockNumber(&(it->t_tid));
339 memset(&(stk->lsn), 0, sizeof(GistNSN));
340 stk->parentlsn = so->stack->lsn;
342 stk->next = so->stack->next;
343 so->stack->next = stk;
346 if (ScanDirectionIsBackward(dir))
347 n = OffsetNumberPrev(n);
348 else
349 n = OffsetNumberNext(n);
353 return ntids;
357 * gistindex_keytest() -- does this index tuple satisfy the scan key(s)?
359 * On success return for a leaf tuple, scan->xs_recheck is set to indicate
360 * whether recheck is needed. We recheck if any of the consistent() functions
361 * request it.
363 * We must decompress the key in the IndexTuple before passing it to the
364 * sk_func (and we have previously overwritten the sk_func to use the
365 * user-defined Consistent method, so we actually are invoking that).
367 * Note that this function is always invoked in a short-lived memory context,
368 * so we don't need to worry about cleaning up allocated memory, either here
369 * or in the implementation of any Consistent methods.
371 static bool
372 gistindex_keytest(IndexTuple tuple,
373 IndexScanDesc scan,
374 OffsetNumber offset)
376 int keySize = scan->numberOfKeys;
377 ScanKey key = scan->keyData;
378 Relation r = scan->indexRelation;
379 GISTScanOpaque so;
380 Page p;
381 GISTSTATE *giststate;
383 so = (GISTScanOpaque) scan->opaque;
384 giststate = so->giststate;
385 p = BufferGetPage(so->curbuf);
387 IncrIndexProcessed();
389 scan->xs_recheck = false;
392 * Tuple doesn't restore after crash recovery because of incomplete insert
394 if (!GistPageIsLeaf(p) && GistTupleIsInvalid(tuple))
395 return true;
397 while (keySize > 0)
399 Datum datum;
400 bool isNull;
401 Datum test;
402 bool recheck;
403 GISTENTRY de;
405 datum = index_getattr(tuple,
406 key->sk_attno,
407 giststate->tupdesc,
408 &isNull);
410 if (key->sk_flags & SK_ISNULL)
413 * On non-leaf page we can't conclude that child hasn't NULL
414 * values because of assumption in GiST: uinon (VAL, NULL) is VAL
415 * But if on non-leaf page key IS NULL then all childs has NULL.
418 Assert(key->sk_flags & SK_SEARCHNULL);
420 if (GistPageIsLeaf(p) && !isNull)
421 return false;
423 else if (isNull)
425 return false;
427 else
429 gistdentryinit(giststate, key->sk_attno - 1, &de,
430 datum, r, p, offset,
431 FALSE, isNull);
434 * Call the Consistent function to evaluate the test. The
435 * arguments are the index datum (as a GISTENTRY*), the comparison
436 * datum, the comparison operator's strategy number and
437 * subtype from pg_amop, and the recheck flag.
439 * (Presently there's no need to pass the subtype since it'll
440 * always be zero, but might as well pass it for possible future
441 * use.)
443 * We initialize the recheck flag to true (the safest assumption)
444 * in case the Consistent function forgets to set it.
446 recheck = true;
448 test = FunctionCall5(&key->sk_func,
449 PointerGetDatum(&de),
450 key->sk_argument,
451 Int32GetDatum(key->sk_strategy),
452 ObjectIdGetDatum(key->sk_subtype),
453 PointerGetDatum(&recheck));
455 if (!DatumGetBool(test))
456 return false;
457 scan->xs_recheck |= recheck;
460 keySize--;
461 key++;
464 return true;
468 * Return the offset of the first index entry that is consistent with
469 * the search key after offset 'n' in the current page. If there are
470 * no more consistent entries, return InvalidOffsetNumber.
471 * On success, scan->xs_recheck is set correctly, too.
472 * Page should be locked....
474 static OffsetNumber
475 gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir)
477 OffsetNumber maxoff;
478 IndexTuple it;
479 GISTScanOpaque so;
480 MemoryContext oldcxt;
481 Page p;
483 so = (GISTScanOpaque) scan->opaque;
484 p = BufferGetPage(so->curbuf);
485 maxoff = PageGetMaxOffsetNumber(p);
488 * Make sure we're in a short-lived memory context when we invoke a
489 * user-supplied GiST method in gistindex_keytest(), so we don't leak
490 * memory
492 oldcxt = MemoryContextSwitchTo(so->tempCxt);
495 * If we modified the index during the scan, we may have a pointer to a
496 * ghost tuple, before the scan. If this is the case, back up one.
498 if (so->flags & GS_CURBEFORE)
500 so->flags &= ~GS_CURBEFORE;
501 n = OffsetNumberPrev(n);
504 while (n >= FirstOffsetNumber && n <= maxoff)
506 it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
507 if (gistindex_keytest(it, scan, n))
508 break;
510 if (ScanDirectionIsBackward(dir))
511 n = OffsetNumberPrev(n);
512 else
513 n = OffsetNumberNext(n);
516 MemoryContextSwitchTo(oldcxt);
517 MemoryContextReset(so->tempCxt);
520 * If we found a matching entry, return its offset; otherwise return
521 * InvalidOffsetNumber to inform the caller to go to the next page.
523 if (n >= FirstOffsetNumber && n <= maxoff)
524 return n;
525 else
526 return InvalidOffsetNumber;