4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
12 ** This file contains code for the VdbeSorter object, used in concert with
13 ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
14 ** or by SELECT statements with ORDER BY clauses that cannot be satisfied
15 ** using indexes and without LIMIT clauses.
17 ** The VdbeSorter object implements a multi-threaded external merge sort
18 ** algorithm that is efficient even if the number of elements being sorted
19 ** exceeds the available memory.
21 ** Here is the (internal, non-API) interface between this module and the
22 ** rest of the SQLite system:
24 ** sqlite3VdbeSorterInit() Create a new VdbeSorter object.
26 ** sqlite3VdbeSorterWrite() Add a single new row to the VdbeSorter
27 ** object. The row is a binary blob in the
28 ** OP_MakeRecord format that contains both
29 ** the ORDER BY key columns and result columns
30 ** in the case of a SELECT w/ ORDER BY, or
31 ** the complete record for an index entry
32 ** in the case of a CREATE INDEX.
34 ** sqlite3VdbeSorterRewind() Sort all content previously added.
35 ** Position the read cursor on the
36 ** first sorted element.
38 ** sqlite3VdbeSorterNext() Advance the read cursor to the next sorted
41 ** sqlite3VdbeSorterRowkey() Return the complete binary blob for the
42 ** row currently under the read cursor.
44 ** sqlite3VdbeSorterCompare() Compare the binary blob for the row
45 ** currently under the read cursor against
46 ** another binary blob X and report if
47 ** X is strictly less than the read cursor.
48 ** Used to enforce uniqueness in a
49 ** CREATE UNIQUE INDEX statement.
51 ** sqlite3VdbeSorterClose() Close the VdbeSorter object and reclaim
54 ** sqlite3VdbeSorterReset() Refurbish the VdbeSorter for reuse. This
55 ** is like Close() followed by Init() only
58 ** The interfaces above must be called in a particular order. Write() can
59 ** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and
60 ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
63 ** for each record: Write()
71 ** Records passed to the sorter via calls to Write() are initially held
72 ** unsorted in main memory. Assuming the amount of memory used never exceeds
73 ** a threshold, when Rewind() is called the set of records is sorted using
74 ** an in-memory merge sort. In this case, no temporary files are required
75 ** and subsequent calls to Rowkey(), Next() and Compare() read records
76 ** directly from main memory.
78 ** If the amount of space used to store records in main memory exceeds the
79 ** threshold, then the set of records currently in memory are sorted and
80 ** written to a temporary file in "Packed Memory Array" (PMA) format.
81 ** A PMA created at this point is known as a "level-0 PMA". Higher levels
82 ** of PMAs may be created by merging existing PMAs together - for example
83 ** merging two or more level-0 PMAs together creates a level-1 PMA.
85 ** The threshold for the amount of main memory to use before flushing
86 ** records to a PMA is roughly the same as the limit configured for the
87 ** page-cache of the main database. Specifically, the threshold is set to
88 ** the value returned by "PRAGMA main.page_size" multiplied by
89 ** that returned by "PRAGMA main.cache_size", in bytes.
91 ** If the sorter is running in single-threaded mode, then all PMAs generated
92 ** are appended to a single temporary file. Or, if the sorter is running in
93 ** multi-threaded mode then up to (N+1) temporary files may be opened, where
94 ** N is the configured number of worker threads. In this case, instead of
95 ** sorting the records and writing the PMA to a temporary file itself, the
96 ** calling thread usually launches a worker thread to do so. Except, if
97 ** there are already N worker threads running, the main thread does the work
100 ** The sorter is running in multi-threaded mode if (a) the library was built
101 ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
102 ** than zero, and (b) worker threads have been enabled at runtime by calling
103 ** "PRAGMA threads=N" with some value of N greater than 0.
105 ** When Rewind() is called, any data remaining in memory is flushed to a
106 ** final PMA. So at this point the data is stored in some number of sorted
107 ** PMAs within temporary files on disk.
109 ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
110 ** sorter is running in single-threaded mode, then these PMAs are merged
111 ** incrementally as keys are retrieved from the sorter by the VDBE. The
112 ** MergeEngine object, described in further detail below, performs this
115 ** Or, if running in multi-threaded mode, then a background thread is
116 ** launched to merge the existing PMAs. Once the background thread has
117 ** merged T bytes of data into a single sorted PMA, the main thread
118 ** begins reading keys from that PMA while the background thread proceeds
119 ** with merging the next T bytes of data. And so on.
121 ** Parameter T is set to half the value of the memory threshold used
122 ** by Write() above to determine when to create a new PMA.
124 ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when
125 ** Rewind() is called, then a hierarchy of incremental-merges is used.
126 ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on
127 ** disk are merged together. Then T bytes of data from the second set, and
128 ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
129 ** PMAs at a time. This done is to improve locality.
131 ** If running in multi-threaded mode and there are more than
132 ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
133 ** than one background thread may be created. Specifically, there may be
134 ** one background thread for each temporary file on disk, and one background
135 ** thread to merge the output of each of the others to a single PMA for
136 ** the main thread to read from.
138 #include "sqliteInt.h"
142 ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
143 ** messages to stderr that may be helpful in understanding the performance
144 ** characteristics of the sorter in multi-threaded mode.
147 # define SQLITE_DEBUG_SORTER_THREADS 1
151 ** Hard-coded maximum amount of data to accumulate in memory before flushing
152 ** to a level 0 PMA. The purpose of this limit is to prevent various integer
153 ** overflows. 512MiB.
155 #define SQLITE_MAX_PMASZ (1<<29)
158 ** Private objects used by the sorter
160 typedef struct MergeEngine MergeEngine
; /* Merge PMAs together */
161 typedef struct PmaReader PmaReader
; /* Incrementally read one PMA */
162 typedef struct PmaWriter PmaWriter
; /* Incrementally write one PMA */
163 typedef struct SorterRecord SorterRecord
; /* A record being sorted */
164 typedef struct SortSubtask SortSubtask
; /* A sub-task in the sort process */
165 typedef struct SorterFile SorterFile
; /* Temporary file object wrapper */
166 typedef struct SorterList SorterList
; /* In-memory list of records */
167 typedef struct IncrMerger IncrMerger
; /* Read & merge multiple PMAs */
170 ** A container for a temp file handle and the current amount of data
171 ** stored in the file.
174 sqlite3_file
*pFd
; /* File handle */
175 i64 iEof
; /* Bytes of data stored in pFd */
179 ** An in-memory list of objects to be sorted.
181 ** If aMemory==0 then each object is allocated separately and the objects
182 ** are connected using SorterRecord.u.pNext. If aMemory!=0 then all objects
183 ** are stored in the aMemory[] bulk memory, one right after the other, and
184 ** are connected using SorterRecord.u.iNext.
187 SorterRecord
*pList
; /* Linked list of records */
188 u8
*aMemory
; /* If non-NULL, bulk memory to hold pList */
189 i64 szPMA
; /* Size of pList as PMA in bytes */
193 ** The MergeEngine object is used to combine two or more smaller PMAs into
194 ** one big PMA using a merge operation. Separate PMAs all need to be
195 ** combined into one big PMA in order to be able to step through the sorted
198 ** The aReadr[] array contains a PmaReader object for each of the PMAs being
199 ** merged. An aReadr[] object either points to a valid key or else is at EOF.
200 ** ("EOF" means "End Of File". When aReadr[] is at EOF there is no more data.)
201 ** For the purposes of the paragraphs below, we assume that the array is
202 ** actually N elements in size, where N is the smallest power of 2 greater
203 ** to or equal to the number of PMAs being merged. The extra aReadr[] elements
204 ** are treated as if they are empty (always at EOF).
206 ** The aTree[] array is also N elements in size. The value of N is stored in
207 ** the MergeEngine.nTree variable.
209 ** The final (N/2) elements of aTree[] contain the results of comparing
210 ** pairs of PMA keys together. Element i contains the result of
211 ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the
212 ** aTree element is set to the index of it.
214 ** For the purposes of this comparison, EOF is considered greater than any
215 ** other key value. If the keys are equal (only possible with two EOF
216 ** values), it doesn't matter which index is stored.
218 ** The (N/4) elements of aTree[] that precede the final (N/2) described
219 ** above contains the index of the smallest of each block of 4 PmaReaders
220 ** And so on. So that aTree[1] contains the index of the PmaReader that
221 ** currently points to the smallest key value. aTree[0] is unused.
225 ** aReadr[0] -> Banana
226 ** aReadr[1] -> Feijoa
227 ** aReadr[2] -> Elderberry
228 ** aReadr[3] -> Currant
229 ** aReadr[4] -> Grapefruit
230 ** aReadr[5] -> Apple
231 ** aReadr[6] -> Durian
234 ** aTree[] = { X, 5 0, 5 0, 3, 5, 6 }
236 ** The current element is "Apple" (the value of the key indicated by
237 ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will
238 ** be advanced to the next key in its segment. Say the next key is
241 ** aReadr[5] -> Eggplant
243 ** The contents of aTree[] are updated first by comparing the new PmaReader
244 ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader
245 ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree.
246 ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader
247 ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian),
248 ** so the value written into element 1 of the array is 0. As follows:
250 ** aTree[] = { X, 0 0, 6 0, 3, 5, 6 }
252 ** In other words, each time we advance to the next sorter element, log2(N)
253 ** key comparison operations are required, where N is the number of segments
254 ** being merged (rounded up to the next power of 2).
257 int nTree
; /* Used size of aTree/aReadr (power of 2) */
258 SortSubtask
*pTask
; /* Used by this thread only */
259 int *aTree
; /* Current state of incremental merge */
260 PmaReader
*aReadr
; /* Array of PmaReaders to merge data from */
264 ** This object represents a single thread of control in a sort operation.
265 ** Exactly VdbeSorter.nTask instances of this object are allocated
266 ** as part of each VdbeSorter object. Instances are never allocated any
267 ** other way. VdbeSorter.nTask is set to the number of worker threads allowed
268 ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). Thus for
269 ** single-threaded operation, there is exactly one instance of this object
270 ** and for multi-threaded operation there are two or more instances.
272 ** Essentially, this structure contains all those fields of the VdbeSorter
273 ** structure for which each thread requires a separate instance. For example,
274 ** each thread requeries its own UnpackedRecord object to unpack records in
275 ** as part of comparison operations.
277 ** Before a background thread is launched, variable bDone is set to 0. Then,
278 ** right before it exits, the thread itself sets bDone to 1. This is used for
281 ** 1. When flushing the contents of memory to a level-0 PMA on disk, to
282 ** attempt to select a SortSubtask for which there is not already an
283 ** active background thread (since doing so causes the main thread
284 ** to block until it finishes).
286 ** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
287 ** to sqlite3ThreadJoin() is likely to block. Cases that are likely to
288 ** block provoke debugging output.
290 ** In both cases, the effects of the main thread seeing (bDone==0) even
291 ** after the thread has finished are not dire. So we don't worry about
292 ** memory barriers and such here.
294 typedef int (*SorterCompare
)(SortSubtask
*,int*,const void*,int,const void*,int);
296 SQLiteThread
*pThread
; /* Background thread, if any */
297 int bDone
; /* Set if thread is finished but not joined */
298 int nPMA
; /* Number of PMAs currently in file */
299 VdbeSorter
*pSorter
; /* Sorter that owns this sub-task */
300 UnpackedRecord
*pUnpacked
; /* Space to unpack a record */
301 SorterList list
; /* List for thread to write to a PMA */
302 SorterCompare xCompare
; /* Compare function to use */
303 SorterFile file
; /* Temp file for level-0 PMAs */
304 SorterFile file2
; /* Space for other PMAs */
309 ** Main sorter structure. A single instance of this is allocated for each
310 ** sorter cursor created by the VDBE.
313 ** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
314 ** this variable is updated so as to be set to the size on disk of the
315 ** largest record in the sorter.
318 int mnPmaSize
; /* Minimum PMA size, in bytes */
319 int mxPmaSize
; /* Maximum PMA size, in bytes. 0==no limit */
320 int mxKeysize
; /* Largest serialized key seen so far */
321 int pgsz
; /* Main database page size */
322 PmaReader
*pReader
; /* Readr data from here after Rewind() */
323 MergeEngine
*pMerger
; /* Or here, if bUseThreads==0 */
324 sqlite3
*db
; /* Database connection */
325 KeyInfo
*pKeyInfo
; /* How to compare records */
326 UnpackedRecord
*pUnpacked
; /* Used by VdbeSorterCompare() */
327 SorterList list
; /* List of in-memory records */
328 int iMemory
; /* Offset of free space in list.aMemory */
329 int nMemory
; /* Size of list.aMemory allocation in bytes */
330 u8 bUsePMA
; /* True if one or more PMAs created */
331 u8 bUseThreads
; /* True to use background threads */
332 u8 iPrev
; /* Previous thread used to flush PMA */
333 u8 nTask
; /* Size of aTask[] array */
335 SortSubtask aTask
[1]; /* One or more subtasks */
338 #define SORTER_TYPE_INTEGER 0x01
339 #define SORTER_TYPE_TEXT 0x02
342 ** An instance of the following object is used to read records out of a
343 ** PMA, in sorted order. The next key to be read is cached in nKey/aKey.
344 ** aKey might point into aMap or into aBuffer. If neither of those locations
345 ** contain a contiguous representation of the key, then aAlloc is allocated
346 ** and the key is copied into aAlloc and aKey is made to point to aAlloc.
351 i64 iReadOff
; /* Current read offset */
352 i64 iEof
; /* 1 byte past EOF for this PmaReader */
353 int nAlloc
; /* Bytes of space at aAlloc */
354 int nKey
; /* Number of bytes in key */
355 sqlite3_file
*pFd
; /* File handle we are reading from */
356 u8
*aAlloc
; /* Space for aKey if aBuffer and pMap wont work */
357 u8
*aKey
; /* Pointer to current key */
358 u8
*aBuffer
; /* Current read buffer */
359 int nBuffer
; /* Size of read buffer in bytes */
360 u8
*aMap
; /* Pointer to mapping of entire file */
361 IncrMerger
*pIncr
; /* Incremental merger */
365 ** Normally, a PmaReader object iterates through an existing PMA stored
366 ** within a temp file. However, if the PmaReader.pIncr variable points to
367 ** an object of the following type, it may be used to iterate/merge through
368 ** multiple PMAs simultaneously.
370 ** There are two types of IncrMerger object - single (bUseThread==0) and
371 ** multi-threaded (bUseThread==1).
373 ** A multi-threaded IncrMerger object uses two temporary files - aFile[0]
374 ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in
375 ** size. When the IncrMerger is initialized, it reads enough data from
376 ** pMerger to populate aFile[0]. It then sets variables within the
377 ** corresponding PmaReader object to read from that file and kicks off
378 ** a background thread to populate aFile[1] with the next mxSz bytes of
379 ** sorted record data from pMerger.
381 ** When the PmaReader reaches the end of aFile[0], it blocks until the
382 ** background thread has finished populating aFile[1]. It then exchanges
383 ** the contents of the aFile[0] and aFile[1] variables within this structure,
384 ** sets the PmaReader fields to read from the new aFile[0] and kicks off
385 ** another background thread to populate the new aFile[1]. And so on, until
386 ** the contents of pMerger are exhausted.
388 ** A single-threaded IncrMerger does not open any temporary files of its
389 ** own. Instead, it has exclusive access to mxSz bytes of space beginning
390 ** at offset iStartOff of file pTask->file2. And instead of using a
391 ** background thread to prepare data for the PmaReader, with a single
392 ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
393 ** keys from pMerger by the calling thread whenever the PmaReader runs out
397 SortSubtask
*pTask
; /* Task that owns this merger */
398 MergeEngine
*pMerger
; /* Merge engine thread reads data from */
399 i64 iStartOff
; /* Offset to start writing file at */
400 int mxSz
; /* Maximum bytes of data to store */
401 int bEof
; /* Set to true when merge is finished */
402 int bUseThread
; /* True to use a bg thread for this object */
403 SorterFile aFile
[2]; /* aFile[0] for reading, [1] for writing */
407 ** An instance of this object is used for writing a PMA.
409 ** The PMA is written one record at a time. Each record is of an arbitrary
410 ** size. But I/O is more efficient if it occurs in page-sized blocks where
411 ** each block is aligned on a page boundary. This object caches writes to
412 ** the PMA so that aligned, page-size blocks are written.
415 int eFWErr
; /* Non-zero if in an error state */
416 u8
*aBuffer
; /* Pointer to write buffer */
417 int nBuffer
; /* Size of write buffer in bytes */
418 int iBufStart
; /* First byte of buffer to write */
419 int iBufEnd
; /* Last byte of buffer to write */
420 i64 iWriteOff
; /* Offset of start of buffer in file */
421 sqlite3_file
*pFd
; /* File handle to write to */
425 ** This object is the header on a single record while that record is being
426 ** held in memory and prior to being written out as part of a PMA.
428 ** How the linked list is connected depends on how memory is being managed
429 ** by this module. If using a separate allocation for each in-memory record
430 ** (VdbeSorter.list.aMemory==0), then the list is always connected using the
431 ** SorterRecord.u.pNext pointers.
433 ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
434 ** then while records are being accumulated the list is linked using the
435 ** SorterRecord.u.iNext offset. This is because the aMemory[] array may
436 ** be sqlite3Realloc()ed while records are being accumulated. Once the VM
437 ** has finished passing records to the sorter, or when the in-memory buffer
438 ** is full, the list is sorted. As part of the sorting process, it is
439 ** converted to use the SorterRecord.u.pNext pointers. See function
440 ** vdbeSorterSort() for details.
442 struct SorterRecord
{
443 int nVal
; /* Size of the record in bytes */
445 SorterRecord
*pNext
; /* Pointer to next record in list */
446 int iNext
; /* Offset within aMemory of next record */
448 /* The data for the record immediately follows this header */
451 /* Return a pointer to the buffer containing the record data for SorterRecord
452 ** object p. Should be used as if:
454 ** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; }
456 #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
459 /* Maximum number of PMAs that a single MergeEngine can merge */
460 #define SORTER_MAX_MERGE_COUNT 16
462 static int vdbeIncrSwap(IncrMerger
*);
463 static void vdbeIncrFree(IncrMerger
*);
466 ** Free all memory belonging to the PmaReader object passed as the
467 ** argument. All structure fields are set to zero before returning.
469 static void vdbePmaReaderClear(PmaReader
*pReadr
){
470 sqlite3_free(pReadr
->aAlloc
);
471 sqlite3_free(pReadr
->aBuffer
);
472 if( pReadr
->aMap
) sqlite3OsUnfetch(pReadr
->pFd
, 0, pReadr
->aMap
);
473 vdbeIncrFree(pReadr
->pIncr
);
474 memset(pReadr
, 0, sizeof(PmaReader
));
478 ** Read the next nByte bytes of data from the PMA p.
479 ** If successful, set *ppOut to point to a buffer containing the data
480 ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite
483 ** The buffer returned in *ppOut is only valid until the
484 ** next call to this function.
486 static int vdbePmaReadBlob(
487 PmaReader
*p
, /* PmaReader from which to take the blob */
488 int nByte
, /* Bytes of data to read */
489 u8
**ppOut
/* OUT: Pointer to buffer containing data */
491 int iBuf
; /* Offset within buffer to read from */
492 int nAvail
; /* Bytes of data available in buffer */
495 *ppOut
= &p
->aMap
[p
->iReadOff
];
496 p
->iReadOff
+= nByte
;
500 assert( p
->aBuffer
);
502 /* If there is no more data to be read from the buffer, read the next
503 ** p->nBuffer bytes of data from the file into it. Or, if there are less
504 ** than p->nBuffer bytes remaining in the PMA, read all remaining data. */
505 iBuf
= p
->iReadOff
% p
->nBuffer
;
507 int nRead
; /* Bytes to read from disk */
508 int rc
; /* sqlite3OsRead() return code */
510 /* Determine how many bytes of data to read. */
511 if( (p
->iEof
- p
->iReadOff
) > (i64
)p
->nBuffer
){
514 nRead
= (int)(p
->iEof
- p
->iReadOff
);
518 /* Readr data from the file. Return early if an error occurs. */
519 rc
= sqlite3OsRead(p
->pFd
, p
->aBuffer
, nRead
, p
->iReadOff
);
520 assert( rc
!=SQLITE_IOERR_SHORT_READ
);
521 if( rc
!=SQLITE_OK
) return rc
;
523 nAvail
= p
->nBuffer
- iBuf
;
526 /* The requested data is available in the in-memory buffer. In this
527 ** case there is no need to make a copy of the data, just return a
528 ** pointer into the buffer to the caller. */
529 *ppOut
= &p
->aBuffer
[iBuf
];
530 p
->iReadOff
+= nByte
;
532 /* The requested data is not all available in the in-memory buffer.
533 ** In this case, allocate space at p->aAlloc[] to copy the requested
534 ** range into. Then return a copy of pointer p->aAlloc to the caller. */
535 int nRem
; /* Bytes remaining to copy */
537 /* Extend the p->aAlloc[] allocation if required. */
538 if( p
->nAlloc
<nByte
){
540 sqlite3_int64 nNew
= MAX(128, 2*(sqlite3_int64
)p
->nAlloc
);
541 while( nByte
>nNew
) nNew
= nNew
*2;
542 aNew
= sqlite3Realloc(p
->aAlloc
, nNew
);
543 if( !aNew
) return SQLITE_NOMEM_BKPT
;
548 /* Copy as much data as is available in the buffer into the start of
550 memcpy(p
->aAlloc
, &p
->aBuffer
[iBuf
], nAvail
);
551 p
->iReadOff
+= nAvail
;
552 nRem
= nByte
- nAvail
;
554 /* The following loop copies up to p->nBuffer bytes per iteration into
555 ** the p->aAlloc[] buffer. */
557 int rc
; /* vdbePmaReadBlob() return code */
558 int nCopy
; /* Number of bytes to copy */
559 u8
*aNext
= 0; /* Pointer to buffer to copy data from */
562 if( nRem
>p
->nBuffer
) nCopy
= p
->nBuffer
;
563 rc
= vdbePmaReadBlob(p
, nCopy
, &aNext
);
564 if( rc
!=SQLITE_OK
) return rc
;
565 assert( aNext
!=p
->aAlloc
);
567 memcpy(&p
->aAlloc
[nByte
- nRem
], aNext
, nCopy
);
578 ** Read a varint from the stream of data accessed by p. Set *pnOut to
581 static int vdbePmaReadVarint(PmaReader
*p
, u64
*pnOut
){
585 p
->iReadOff
+= sqlite3GetVarint(&p
->aMap
[p
->iReadOff
], pnOut
);
587 iBuf
= p
->iReadOff
% p
->nBuffer
;
588 if( iBuf
&& (p
->nBuffer
-iBuf
)>=9 ){
589 p
->iReadOff
+= sqlite3GetVarint(&p
->aBuffer
[iBuf
], pnOut
);
594 rc
= vdbePmaReadBlob(p
, 1, &a
);
596 aVarint
[(i
++)&0xf] = a
[0];
597 }while( (a
[0]&0x80)!=0 );
598 sqlite3GetVarint(aVarint
, pnOut
);
606 ** Attempt to memory map file pFile. If successful, set *pp to point to the
607 ** new mapping and return SQLITE_OK. If the mapping is not attempted
608 ** (because the file is too large or the VFS layer is configured not to use
609 ** mmap), return SQLITE_OK and set *pp to NULL.
611 ** Or, if an error occurs, return an SQLite error code. The final value of
612 ** *pp is undefined in this case.
614 static int vdbeSorterMapFile(SortSubtask
*pTask
, SorterFile
*pFile
, u8
**pp
){
616 if( pFile
->iEof
<=(i64
)(pTask
->pSorter
->db
->nMaxSorterMmap
) ){
617 sqlite3_file
*pFd
= pFile
->pFd
;
618 if( pFd
->pMethods
->iVersion
>=3 ){
619 rc
= sqlite3OsFetch(pFd
, 0, (int)pFile
->iEof
, (void**)pp
);
620 testcase( rc
!=SQLITE_OK
);
627 ** Attach PmaReader pReadr to file pFile (if it is not already attached to
628 ** that file) and seek it to offset iOff within the file. Return SQLITE_OK
629 ** if successful, or an SQLite error code if an error occurs.
631 static int vdbePmaReaderSeek(
632 SortSubtask
*pTask
, /* Task context */
633 PmaReader
*pReadr
, /* Reader whose cursor is to be moved */
634 SorterFile
*pFile
, /* Sorter file to read from */
635 i64 iOff
/* Offset in pFile */
639 assert( pReadr
->pIncr
==0 || pReadr
->pIncr
->bEof
==0 );
641 if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ
;
643 sqlite3OsUnfetch(pReadr
->pFd
, 0, pReadr
->aMap
);
646 pReadr
->iReadOff
= iOff
;
647 pReadr
->iEof
= pFile
->iEof
;
648 pReadr
->pFd
= pFile
->pFd
;
650 rc
= vdbeSorterMapFile(pTask
, pFile
, &pReadr
->aMap
);
651 if( rc
==SQLITE_OK
&& pReadr
->aMap
==0 ){
652 int pgsz
= pTask
->pSorter
->pgsz
;
653 int iBuf
= pReadr
->iReadOff
% pgsz
;
654 if( pReadr
->aBuffer
==0 ){
655 pReadr
->aBuffer
= (u8
*)sqlite3Malloc(pgsz
);
656 if( pReadr
->aBuffer
==0 ) rc
= SQLITE_NOMEM_BKPT
;
657 pReadr
->nBuffer
= pgsz
;
659 if( rc
==SQLITE_OK
&& iBuf
){
660 int nRead
= pgsz
- iBuf
;
661 if( (pReadr
->iReadOff
+ nRead
) > pReadr
->iEof
){
662 nRead
= (int)(pReadr
->iEof
- pReadr
->iReadOff
);
665 pReadr
->pFd
, &pReadr
->aBuffer
[iBuf
], nRead
, pReadr
->iReadOff
667 testcase( rc
!=SQLITE_OK
);
675 ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if
676 ** no error occurs, or an SQLite error code if one does.
678 static int vdbePmaReaderNext(PmaReader
*pReadr
){
679 int rc
= SQLITE_OK
; /* Return Code */
680 u64 nRec
= 0; /* Size of record in bytes */
683 if( pReadr
->iReadOff
>=pReadr
->iEof
){
684 IncrMerger
*pIncr
= pReadr
->pIncr
;
687 rc
= vdbeIncrSwap(pIncr
);
688 if( rc
==SQLITE_OK
&& pIncr
->bEof
==0 ){
689 rc
= vdbePmaReaderSeek(
690 pIncr
->pTask
, pReadr
, &pIncr
->aFile
[0], pIncr
->iStartOff
697 /* This is an EOF condition */
698 vdbePmaReaderClear(pReadr
);
699 testcase( rc
!=SQLITE_OK
);
705 rc
= vdbePmaReadVarint(pReadr
, &nRec
);
708 pReadr
->nKey
= (int)nRec
;
709 rc
= vdbePmaReadBlob(pReadr
, (int)nRec
, &pReadr
->aKey
);
710 testcase( rc
!=SQLITE_OK
);
717 ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile
718 ** starting at offset iStart and ending at offset iEof-1. This function
719 ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the
722 ** If the pnByte parameter is NULL, then it is assumed that the file
723 ** contains a single PMA, and that that PMA omits the initial length varint.
725 static int vdbePmaReaderInit(
726 SortSubtask
*pTask
, /* Task context */
727 SorterFile
*pFile
, /* Sorter file to read from */
728 i64 iStart
, /* Start offset in pFile */
729 PmaReader
*pReadr
, /* PmaReader to populate */
730 i64
*pnByte
/* IN/OUT: Increment this value by PMA size */
734 assert( pFile
->iEof
>iStart
);
735 assert( pReadr
->aAlloc
==0 && pReadr
->nAlloc
==0 );
736 assert( pReadr
->aBuffer
==0 );
737 assert( pReadr
->aMap
==0 );
739 rc
= vdbePmaReaderSeek(pTask
, pReadr
, pFile
, iStart
);
741 u64 nByte
= 0; /* Size of PMA in bytes */
742 rc
= vdbePmaReadVarint(pReadr
, &nByte
);
743 pReadr
->iEof
= pReadr
->iReadOff
+ nByte
;
748 rc
= vdbePmaReaderNext(pReadr
);
754 ** A version of vdbeSorterCompare() that assumes that it has already been
755 ** determined that the first field of key1 is equal to the first field of
758 static int vdbeSorterCompareTail(
759 SortSubtask
*pTask
, /* Subtask context (for pKeyInfo) */
760 int *pbKey2Cached
, /* True if pTask->pUnpacked is pKey2 */
761 const void *pKey1
, int nKey1
, /* Left side of comparison */
762 const void *pKey2
, int nKey2
/* Right side of comparison */
764 UnpackedRecord
*r2
= pTask
->pUnpacked
;
765 if( *pbKey2Cached
==0 ){
766 sqlite3VdbeRecordUnpack(pTask
->pSorter
->pKeyInfo
, nKey2
, pKey2
, r2
);
769 return sqlite3VdbeRecordCompareWithSkip(nKey1
, pKey1
, r2
, 1);
773 ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
774 ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
775 ** used by the comparison. Return the result of the comparison.
777 ** If IN/OUT parameter *pbKey2Cached is true when this function is called,
778 ** it is assumed that (pTask->pUnpacked) contains the unpacked version
779 ** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked
780 ** version of key2 and *pbKey2Cached set to true before returning.
782 ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
785 static int vdbeSorterCompare(
786 SortSubtask
*pTask
, /* Subtask context (for pKeyInfo) */
787 int *pbKey2Cached
, /* True if pTask->pUnpacked is pKey2 */
788 const void *pKey1
, int nKey1
, /* Left side of comparison */
789 const void *pKey2
, int nKey2
/* Right side of comparison */
791 UnpackedRecord
*r2
= pTask
->pUnpacked
;
792 if( !*pbKey2Cached
){
793 sqlite3VdbeRecordUnpack(pTask
->pSorter
->pKeyInfo
, nKey2
, pKey2
, r2
);
796 return sqlite3VdbeRecordCompare(nKey1
, pKey1
, r2
);
800 ** A specially optimized version of vdbeSorterCompare() that assumes that
801 ** the first field of each key is a TEXT value and that the collation
802 ** sequence to compare them with is BINARY.
804 static int vdbeSorterCompareText(
805 SortSubtask
*pTask
, /* Subtask context (for pKeyInfo) */
806 int *pbKey2Cached
, /* True if pTask->pUnpacked is pKey2 */
807 const void *pKey1
, int nKey1
, /* Left side of comparison */
808 const void *pKey2
, int nKey2
/* Right side of comparison */
810 const u8
* const p1
= (const u8
* const)pKey1
;
811 const u8
* const p2
= (const u8
* const)pKey2
;
812 const u8
* const v1
= &p1
[ p1
[0] ]; /* Pointer to value 1 */
813 const u8
* const v2
= &p2
[ p2
[0] ]; /* Pointer to value 2 */
819 getVarint32NR(&p1
[1], n1
);
820 getVarint32NR(&p2
[1], n2
);
821 res
= memcmp(v1
, v2
, (MIN(n1
, n2
) - 13)/2);
827 if( pTask
->pSorter
->pKeyInfo
->nKeyField
>1 ){
828 res
= vdbeSorterCompareTail(
829 pTask
, pbKey2Cached
, pKey1
, nKey1
, pKey2
, nKey2
833 assert( !(pTask
->pSorter
->pKeyInfo
->aSortFlags
[0]&KEYINFO_ORDER_BIGNULL
) );
834 if( pTask
->pSorter
->pKeyInfo
->aSortFlags
[0] ){
843 ** A specially optimized version of vdbeSorterCompare() that assumes that
844 ** the first field of each key is an INTEGER value.
846 static int vdbeSorterCompareInt(
847 SortSubtask
*pTask
, /* Subtask context (for pKeyInfo) */
848 int *pbKey2Cached
, /* True if pTask->pUnpacked is pKey2 */
849 const void *pKey1
, int nKey1
, /* Left side of comparison */
850 const void *pKey2
, int nKey2
/* Right side of comparison */
852 const u8
* const p1
= (const u8
* const)pKey1
;
853 const u8
* const p2
= (const u8
* const)pKey2
;
854 const int s1
= p1
[1]; /* Left hand serial type */
855 const int s2
= p2
[1]; /* Right hand serial type */
856 const u8
* const v1
= &p1
[ p1
[0] ]; /* Pointer to value 1 */
857 const u8
* const v2
= &p2
[ p2
[0] ]; /* Pointer to value 2 */
858 int res
; /* Return value */
860 assert( (s1
>0 && s1
<7) || s1
==8 || s1
==9 );
861 assert( (s2
>0 && s2
<7) || s2
==8 || s2
==9 );
864 /* The two values have the same sign. Compare using memcmp(). */
865 static const u8 aLen
[] = {0, 1, 2, 3, 4, 6, 8, 0, 0, 0 };
866 const u8 n
= aLen
[s1
];
870 if( (res
= v1
[i
] - v2
[i
])!=0 ){
871 if( ((v1
[0] ^ v2
[0]) & 0x80)!=0 ){
872 res
= v1
[0] & 0x80 ? -1 : +1;
877 }else if( s1
>7 && s2
>7 ){
890 if( *v1
& 0x80 ) res
= -1;
892 if( *v2
& 0x80 ) res
= +1;
897 if( pTask
->pSorter
->pKeyInfo
->nKeyField
>1 ){
898 res
= vdbeSorterCompareTail(
899 pTask
, pbKey2Cached
, pKey1
, nKey1
, pKey2
, nKey2
902 }else if( pTask
->pSorter
->pKeyInfo
->aSortFlags
[0] ){
903 assert( !(pTask
->pSorter
->pKeyInfo
->aSortFlags
[0]&KEYINFO_ORDER_BIGNULL
) );
911 ** Initialize the temporary index cursor just opened as a sorter cursor.
913 ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nKeyField)
914 ** to determine the number of fields that should be compared from the
915 ** records being sorted. However, if the value passed as argument nField
916 ** is non-zero and the sorter is able to guarantee a stable sort, nField
917 ** is used instead. This is used when sorting records for a CREATE INDEX
918 ** statement. In this case, keys are always delivered to the sorter in
919 ** order of the primary key, which happens to be make up the final part
920 ** of the records being sorted. So if the sort is stable, there is never
921 ** any reason to compare PK fields and they can be ignored for a small
922 ** performance boost.
924 ** The sorter can guarantee a stable sort when running in single-threaded
925 ** mode, but not in multi-threaded mode.
927 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
929 int sqlite3VdbeSorterInit(
930 sqlite3
*db
, /* Database connection (for malloc()) */
931 int nField
, /* Number of key fields in each record */
932 VdbeCursor
*pCsr
/* Cursor that holds the new sorter */
934 int pgsz
; /* Page size of main database */
935 int i
; /* Used to iterate through aTask[] */
936 VdbeSorter
*pSorter
; /* The new sorter */
937 KeyInfo
*pKeyInfo
; /* Copy of pCsr->pKeyInfo with db==0 */
938 int szKeyInfo
; /* Size of pCsr->pKeyInfo in bytes */
939 int sz
; /* Size of pSorter in bytes */
941 #if SQLITE_MAX_WORKER_THREADS==0
947 /* Initialize the upper limit on the number of worker threads */
948 #if SQLITE_MAX_WORKER_THREADS>0
949 if( sqlite3TempInMemory(db
) || sqlite3GlobalConfig
.bCoreMutex
==0 ){
952 nWorker
= db
->aLimit
[SQLITE_LIMIT_WORKER_THREADS
];
956 /* Do not allow the total number of threads (main thread + all workers)
957 ** to exceed the maximum merge count */
958 #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT
959 if( nWorker
>=SORTER_MAX_MERGE_COUNT
){
960 nWorker
= SORTER_MAX_MERGE_COUNT
-1;
964 assert( pCsr
->pKeyInfo
);
965 assert( !pCsr
->isEphemeral
);
966 assert( pCsr
->eCurType
==CURTYPE_SORTER
);
967 szKeyInfo
= sizeof(KeyInfo
) + (pCsr
->pKeyInfo
->nKeyField
-1)*sizeof(CollSeq
*);
968 sz
= sizeof(VdbeSorter
) + nWorker
* sizeof(SortSubtask
);
970 pSorter
= (VdbeSorter
*)sqlite3DbMallocZero(db
, sz
+ szKeyInfo
);
971 pCsr
->uc
.pSorter
= pSorter
;
973 rc
= SQLITE_NOMEM_BKPT
;
975 Btree
*pBt
= db
->aDb
[0].pBt
;
976 pSorter
->pKeyInfo
= pKeyInfo
= (KeyInfo
*)((u8
*)pSorter
+ sz
);
977 memcpy(pKeyInfo
, pCsr
->pKeyInfo
, szKeyInfo
);
979 if( nField
&& nWorker
==0 ){
980 pKeyInfo
->nKeyField
= nField
;
982 sqlite3BtreeEnter(pBt
);
983 pSorter
->pgsz
= pgsz
= sqlite3BtreeGetPageSize(pBt
);
984 sqlite3BtreeLeave(pBt
);
985 pSorter
->nTask
= nWorker
+ 1;
986 pSorter
->iPrev
= (u8
)(nWorker
- 1);
987 pSorter
->bUseThreads
= (pSorter
->nTask
>1);
989 for(i
=0; i
<pSorter
->nTask
; i
++){
990 SortSubtask
*pTask
= &pSorter
->aTask
[i
];
991 pTask
->pSorter
= pSorter
;
994 if( !sqlite3TempInMemory(db
) ){
995 i64 mxCache
; /* Cache size in bytes*/
996 u32 szPma
= sqlite3GlobalConfig
.szPma
;
997 pSorter
->mnPmaSize
= szPma
* pgsz
;
999 mxCache
= db
->aDb
[0].pSchema
->cache_size
;
1001 /* A negative cache-size value C indicates that the cache is abs(C)
1003 mxCache
= mxCache
* -1024;
1005 mxCache
= mxCache
* pgsz
;
1007 mxCache
= MIN(mxCache
, SQLITE_MAX_PMASZ
);
1008 pSorter
->mxPmaSize
= MAX(pSorter
->mnPmaSize
, (int)mxCache
);
1010 /* Avoid large memory allocations if the application has requested
1011 ** SQLITE_CONFIG_SMALL_MALLOC. */
1012 if( sqlite3GlobalConfig
.bSmallMalloc
==0 ){
1013 assert( pSorter
->iMemory
==0 );
1014 pSorter
->nMemory
= pgsz
;
1015 pSorter
->list
.aMemory
= (u8
*)sqlite3Malloc(pgsz
);
1016 if( !pSorter
->list
.aMemory
) rc
= SQLITE_NOMEM_BKPT
;
1020 if( pKeyInfo
->nAllField
<13
1021 && (pKeyInfo
->aColl
[0]==0 || pKeyInfo
->aColl
[0]==db
->pDfltColl
)
1022 && (pKeyInfo
->aSortFlags
[0] & KEYINFO_ORDER_BIGNULL
)==0
1024 pSorter
->typeMask
= SORTER_TYPE_INTEGER
| SORTER_TYPE_TEXT
;
1030 #undef nWorker /* Defined at the top of this function */
1033 ** Free the list of sorted records starting at pRecord.
1035 static void vdbeSorterRecordFree(sqlite3
*db
, SorterRecord
*pRecord
){
1037 SorterRecord
*pNext
;
1038 for(p
=pRecord
; p
; p
=pNext
){
1040 sqlite3DbFree(db
, p
);
1045 ** Free all resources owned by the object indicated by argument pTask. All
1046 ** fields of *pTask are zeroed before returning.
1048 static void vdbeSortSubtaskCleanup(sqlite3
*db
, SortSubtask
*pTask
){
1049 sqlite3DbFree(db
, pTask
->pUnpacked
);
1050 #if SQLITE_MAX_WORKER_THREADS>0
1051 /* pTask->list.aMemory can only be non-zero if it was handed memory
1052 ** from the main thread. That only occurs SQLITE_MAX_WORKER_THREADS>0 */
1053 if( pTask
->list
.aMemory
){
1054 sqlite3_free(pTask
->list
.aMemory
);
1058 assert( pTask
->list
.aMemory
==0 );
1059 vdbeSorterRecordFree(0, pTask
->list
.pList
);
1061 if( pTask
->file
.pFd
){
1062 sqlite3OsCloseFree(pTask
->file
.pFd
);
1064 if( pTask
->file2
.pFd
){
1065 sqlite3OsCloseFree(pTask
->file2
.pFd
);
1067 memset(pTask
, 0, sizeof(SortSubtask
));
1070 #ifdef SQLITE_DEBUG_SORTER_THREADS
1071 static void vdbeSorterWorkDebug(SortSubtask
*pTask
, const char *zEvent
){
1073 int iTask
= (pTask
- pTask
->pSorter
->aTask
);
1074 sqlite3OsCurrentTimeInt64(pTask
->pSorter
->db
->pVfs
, &t
);
1075 fprintf(stderr
, "%lld:%d %s\n", t
, iTask
, zEvent
);
1077 static void vdbeSorterRewindDebug(const char *zEvent
){
1079 sqlite3_vfs
*pVfs
= sqlite3_vfs_find(0);
1080 if( ALWAYS(pVfs
) ) sqlite3OsCurrentTimeInt64(pVfs
, &t
);
1081 fprintf(stderr
, "%lld:X %s\n", t
, zEvent
);
1083 static void vdbeSorterPopulateDebug(
1088 int iTask
= (pTask
- pTask
->pSorter
->aTask
);
1089 sqlite3OsCurrentTimeInt64(pTask
->pSorter
->db
->pVfs
, &t
);
1090 fprintf(stderr
, "%lld:bg%d %s\n", t
, iTask
, zEvent
);
1092 static void vdbeSorterBlockDebug(
1099 sqlite3OsCurrentTimeInt64(pTask
->pSorter
->db
->pVfs
, &t
);
1100 fprintf(stderr
, "%lld:main %s\n", t
, zEvent
);
1104 # define vdbeSorterWorkDebug(x,y)
1105 # define vdbeSorterRewindDebug(y)
1106 # define vdbeSorterPopulateDebug(x,y)
1107 # define vdbeSorterBlockDebug(x,y,z)
1110 #if SQLITE_MAX_WORKER_THREADS>0
1112 ** Join thread pTask->thread.
1114 static int vdbeSorterJoinThread(SortSubtask
*pTask
){
1116 if( pTask
->pThread
){
1117 #ifdef SQLITE_DEBUG_SORTER_THREADS
1118 int bDone
= pTask
->bDone
;
1120 void *pRet
= SQLITE_INT_TO_PTR(SQLITE_ERROR
);
1121 vdbeSorterBlockDebug(pTask
, !bDone
, "enter");
1122 (void)sqlite3ThreadJoin(pTask
->pThread
, &pRet
);
1123 vdbeSorterBlockDebug(pTask
, !bDone
, "exit");
1124 rc
= SQLITE_PTR_TO_INT(pRet
);
1125 assert( pTask
->bDone
==1 );
1133 ** Launch a background thread to run xTask(pIn).
1135 static int vdbeSorterCreateThread(
1136 SortSubtask
*pTask
, /* Thread will use this task object */
1137 void *(*xTask
)(void*), /* Routine to run in a separate thread */
1138 void *pIn
/* Argument passed into xTask() */
1140 assert( pTask
->pThread
==0 && pTask
->bDone
==0 );
1141 return sqlite3ThreadCreate(&pTask
->pThread
, xTask
, pIn
);
1145 ** Join all outstanding threads launched by SorterWrite() to create
1148 static int vdbeSorterJoinAll(VdbeSorter
*pSorter
, int rcin
){
1152 /* This function is always called by the main user thread.
1154 ** If this function is being called after SorterRewind() has been called,
1155 ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread
1156 ** is currently attempt to join one of the other threads. To avoid a race
1157 ** condition where this thread also attempts to join the same object, join
1158 ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */
1159 for(i
=pSorter
->nTask
-1; i
>=0; i
--){
1160 SortSubtask
*pTask
= &pSorter
->aTask
[i
];
1161 int rc2
= vdbeSorterJoinThread(pTask
);
1162 if( rc
==SQLITE_OK
) rc
= rc2
;
1167 # define vdbeSorterJoinAll(x,rcin) (rcin)
1168 # define vdbeSorterJoinThread(pTask) SQLITE_OK
1172 ** Allocate a new MergeEngine object capable of handling up to
1173 ** nReader PmaReader inputs.
1175 ** nReader is automatically rounded up to the next power of two.
1176 ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up.
1178 static MergeEngine
*vdbeMergeEngineNew(int nReader
){
1179 int N
= 2; /* Smallest power of two >= nReader */
1180 int nByte
; /* Total bytes of space to allocate */
1181 MergeEngine
*pNew
; /* Pointer to allocated object to return */
1183 assert( nReader
<=SORTER_MAX_MERGE_COUNT
);
1185 while( N
<nReader
) N
+= N
;
1186 nByte
= sizeof(MergeEngine
) + N
* (sizeof(int) + sizeof(PmaReader
));
1188 pNew
= sqlite3FaultSim(100) ? 0 : (MergeEngine
*)sqlite3MallocZero(nByte
);
1192 pNew
->aReadr
= (PmaReader
*)&pNew
[1];
1193 pNew
->aTree
= (int*)&pNew
->aReadr
[N
];
1199 ** Free the MergeEngine object passed as the only argument.
1201 static void vdbeMergeEngineFree(MergeEngine
*pMerger
){
1204 for(i
=0; i
<pMerger
->nTree
; i
++){
1205 vdbePmaReaderClear(&pMerger
->aReadr
[i
]);
1208 sqlite3_free(pMerger
);
1212 ** Free all resources associated with the IncrMerger object indicated by
1213 ** the first argument.
1215 static void vdbeIncrFree(IncrMerger
*pIncr
){
1217 #if SQLITE_MAX_WORKER_THREADS>0
1218 if( pIncr
->bUseThread
){
1219 vdbeSorterJoinThread(pIncr
->pTask
);
1220 if( pIncr
->aFile
[0].pFd
) sqlite3OsCloseFree(pIncr
->aFile
[0].pFd
);
1221 if( pIncr
->aFile
[1].pFd
) sqlite3OsCloseFree(pIncr
->aFile
[1].pFd
);
1224 vdbeMergeEngineFree(pIncr
->pMerger
);
1225 sqlite3_free(pIncr
);
1230 ** Reset a sorting cursor back to its original empty state.
1232 void sqlite3VdbeSorterReset(sqlite3
*db
, VdbeSorter
*pSorter
){
1234 (void)vdbeSorterJoinAll(pSorter
, SQLITE_OK
);
1235 assert( pSorter
->bUseThreads
|| pSorter
->pReader
==0 );
1236 #if SQLITE_MAX_WORKER_THREADS>0
1237 if( pSorter
->pReader
){
1238 vdbePmaReaderClear(pSorter
->pReader
);
1239 sqlite3DbFree(db
, pSorter
->pReader
);
1240 pSorter
->pReader
= 0;
1243 vdbeMergeEngineFree(pSorter
->pMerger
);
1244 pSorter
->pMerger
= 0;
1245 for(i
=0; i
<pSorter
->nTask
; i
++){
1246 SortSubtask
*pTask
= &pSorter
->aTask
[i
];
1247 vdbeSortSubtaskCleanup(db
, pTask
);
1248 pTask
->pSorter
= pSorter
;
1250 if( pSorter
->list
.aMemory
==0 ){
1251 vdbeSorterRecordFree(0, pSorter
->list
.pList
);
1253 pSorter
->list
.pList
= 0;
1254 pSorter
->list
.szPMA
= 0;
1255 pSorter
->bUsePMA
= 0;
1256 pSorter
->iMemory
= 0;
1257 pSorter
->mxKeysize
= 0;
1258 sqlite3DbFree(db
, pSorter
->pUnpacked
);
1259 pSorter
->pUnpacked
= 0;
1263 ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
1265 void sqlite3VdbeSorterClose(sqlite3
*db
, VdbeCursor
*pCsr
){
1266 VdbeSorter
*pSorter
;
1267 assert( pCsr
->eCurType
==CURTYPE_SORTER
);
1268 pSorter
= pCsr
->uc
.pSorter
;
1270 sqlite3VdbeSorterReset(db
, pSorter
);
1271 sqlite3_free(pSorter
->list
.aMemory
);
1272 sqlite3DbFree(db
, pSorter
);
1273 pCsr
->uc
.pSorter
= 0;
1277 #if SQLITE_MAX_MMAP_SIZE>0
1279 ** The first argument is a file-handle open on a temporary file. The file
1280 ** is guaranteed to be nByte bytes or smaller in size. This function
1281 ** attempts to extend the file to nByte bytes in size and to ensure that
1282 ** the VFS has memory mapped it.
1284 ** Whether or not the file does end up memory mapped of course depends on
1285 ** the specific VFS implementation.
1287 static void vdbeSorterExtendFile(sqlite3
*db
, sqlite3_file
*pFd
, i64 nByte
){
1288 if( nByte
<=(i64
)(db
->nMaxSorterMmap
) && pFd
->pMethods
->iVersion
>=3 ){
1290 int chunksize
= 4*1024;
1291 sqlite3OsFileControlHint(pFd
, SQLITE_FCNTL_CHUNK_SIZE
, &chunksize
);
1292 sqlite3OsFileControlHint(pFd
, SQLITE_FCNTL_SIZE_HINT
, &nByte
);
1293 sqlite3OsFetch(pFd
, 0, (int)nByte
, &p
);
1294 if( p
) sqlite3OsUnfetch(pFd
, 0, p
);
1298 # define vdbeSorterExtendFile(x,y,z)
1302 ** Allocate space for a file-handle and open a temporary file. If successful,
1303 ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK.
1304 ** Otherwise, set *ppFd to 0 and return an SQLite error code.
1306 static int vdbeSorterOpenTempFile(
1307 sqlite3
*db
, /* Database handle doing sort */
1308 i64 nExtend
, /* Attempt to extend file to this size */
1312 if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS
;
1313 rc
= sqlite3OsOpenMalloc(db
->pVfs
, 0, ppFd
,
1314 SQLITE_OPEN_TEMP_JOURNAL
|
1315 SQLITE_OPEN_READWRITE
| SQLITE_OPEN_CREATE
|
1316 SQLITE_OPEN_EXCLUSIVE
| SQLITE_OPEN_DELETEONCLOSE
, &rc
1318 if( rc
==SQLITE_OK
){
1319 i64 max
= SQLITE_MAX_MMAP_SIZE
;
1320 sqlite3OsFileControlHint(*ppFd
, SQLITE_FCNTL_MMAP_SIZE
, (void*)&max
);
1322 vdbeSorterExtendFile(db
, *ppFd
, nExtend
);
1329 ** If it has not already been allocated, allocate the UnpackedRecord
1330 ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or
1331 ** if no allocation was required), or SQLITE_NOMEM otherwise.
1333 static int vdbeSortAllocUnpacked(SortSubtask
*pTask
){
1334 if( pTask
->pUnpacked
==0 ){
1335 pTask
->pUnpacked
= sqlite3VdbeAllocUnpackedRecord(pTask
->pSorter
->pKeyInfo
);
1336 if( pTask
->pUnpacked
==0 ) return SQLITE_NOMEM_BKPT
;
1337 pTask
->pUnpacked
->nField
= pTask
->pSorter
->pKeyInfo
->nKeyField
;
1338 pTask
->pUnpacked
->errCode
= 0;
1345 ** Merge the two sorted lists p1 and p2 into a single list.
1347 static SorterRecord
*vdbeSorterMerge(
1348 SortSubtask
*pTask
, /* Calling thread context */
1349 SorterRecord
*p1
, /* First list to merge */
1350 SorterRecord
*p2
/* Second list to merge */
1352 SorterRecord
*pFinal
= 0;
1353 SorterRecord
**pp
= &pFinal
;
1356 assert( p1
!=0 && p2
!=0 );
1359 res
= pTask
->xCompare(
1360 pTask
, &bCached
, SRVAL(p1
), p1
->nVal
, SRVAL(p2
), p2
->nVal
1386 ** Return the SorterCompare function to compare values collected by the
1387 ** sorter object passed as the only argument.
1389 static SorterCompare
vdbeSorterGetCompare(VdbeSorter
*p
){
1390 if( p
->typeMask
==SORTER_TYPE_INTEGER
){
1391 return vdbeSorterCompareInt
;
1392 }else if( p
->typeMask
==SORTER_TYPE_TEXT
){
1393 return vdbeSorterCompareText
;
1395 return vdbeSorterCompare
;
1399 ** Sort the linked list of records headed at pTask->pList. Return
1400 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if
1403 static int vdbeSorterSort(SortSubtask
*pTask
, SorterList
*pList
){
1407 SorterRecord
*aSlot
[64];
1409 rc
= vdbeSortAllocUnpacked(pTask
);
1410 if( rc
!=SQLITE_OK
) return rc
;
1413 pTask
->xCompare
= vdbeSorterGetCompare(pTask
->pSorter
);
1414 memset(aSlot
, 0, sizeof(aSlot
));
1417 SorterRecord
*pNext
;
1418 if( pList
->aMemory
){
1419 if( (u8
*)p
==pList
->aMemory
){
1422 assert( p
->u
.iNext
<sqlite3MallocSize(pList
->aMemory
) );
1423 pNext
= (SorterRecord
*)&pList
->aMemory
[p
->u
.iNext
];
1430 for(i
=0; aSlot
[i
]; i
++){
1431 p
= vdbeSorterMerge(pTask
, p
, aSlot
[i
]);
1439 for(i
=0; i
<ArraySize(aSlot
); i
++){
1440 if( aSlot
[i
]==0 ) continue;
1441 p
= p
? vdbeSorterMerge(pTask
, p
, aSlot
[i
]) : aSlot
[i
];
1445 assert( pTask
->pUnpacked
->errCode
==SQLITE_OK
1446 || pTask
->pUnpacked
->errCode
==SQLITE_NOMEM
1448 return pTask
->pUnpacked
->errCode
;
1452 ** Initialize a PMA-writer object.
1454 static void vdbePmaWriterInit(
1455 sqlite3_file
*pFd
, /* File handle to write to */
1456 PmaWriter
*p
, /* Object to populate */
1457 int nBuf
, /* Buffer size */
1458 i64 iStart
/* Offset of pFd to begin writing at */
1460 memset(p
, 0, sizeof(PmaWriter
));
1461 p
->aBuffer
= (u8
*)sqlite3Malloc(nBuf
);
1463 p
->eFWErr
= SQLITE_NOMEM_BKPT
;
1465 p
->iBufEnd
= p
->iBufStart
= (iStart
% nBuf
);
1466 p
->iWriteOff
= iStart
- p
->iBufStart
;
1473 ** Write nData bytes of data to the PMA. Return SQLITE_OK
1474 ** if successful, or an SQLite error code if an error occurs.
1476 static void vdbePmaWriteBlob(PmaWriter
*p
, u8
*pData
, int nData
){
1478 while( nRem
>0 && p
->eFWErr
==0 ){
1480 if( nCopy
>(p
->nBuffer
- p
->iBufEnd
) ){
1481 nCopy
= p
->nBuffer
- p
->iBufEnd
;
1484 memcpy(&p
->aBuffer
[p
->iBufEnd
], &pData
[nData
-nRem
], nCopy
);
1485 p
->iBufEnd
+= nCopy
;
1486 if( p
->iBufEnd
==p
->nBuffer
){
1487 p
->eFWErr
= sqlite3OsWrite(p
->pFd
,
1488 &p
->aBuffer
[p
->iBufStart
], p
->iBufEnd
- p
->iBufStart
,
1489 p
->iWriteOff
+ p
->iBufStart
1491 p
->iBufStart
= p
->iBufEnd
= 0;
1492 p
->iWriteOff
+= p
->nBuffer
;
1494 assert( p
->iBufEnd
<p
->nBuffer
);
1501 ** Flush any buffered data to disk and clean up the PMA-writer object.
1502 ** The results of using the PMA-writer after this call are undefined.
1503 ** Return SQLITE_OK if flushing the buffered data succeeds or is not
1504 ** required. Otherwise, return an SQLite error code.
1506 ** Before returning, set *piEof to the offset immediately following the
1507 ** last byte written to the file.
1509 static int vdbePmaWriterFinish(PmaWriter
*p
, i64
*piEof
){
1511 if( p
->eFWErr
==0 && ALWAYS(p
->aBuffer
) && p
->iBufEnd
>p
->iBufStart
){
1512 p
->eFWErr
= sqlite3OsWrite(p
->pFd
,
1513 &p
->aBuffer
[p
->iBufStart
], p
->iBufEnd
- p
->iBufStart
,
1514 p
->iWriteOff
+ p
->iBufStart
1517 *piEof
= (p
->iWriteOff
+ p
->iBufEnd
);
1518 sqlite3_free(p
->aBuffer
);
1520 memset(p
, 0, sizeof(PmaWriter
));
1525 ** Write value iVal encoded as a varint to the PMA. Return
1526 ** SQLITE_OK if successful, or an SQLite error code if an error occurs.
1528 static void vdbePmaWriteVarint(PmaWriter
*p
, u64 iVal
){
1531 nByte
= sqlite3PutVarint(aByte
, iVal
);
1532 vdbePmaWriteBlob(p
, aByte
, nByte
);
1536 ** Write the current contents of in-memory linked-list pList to a level-0
1537 ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if
1538 ** successful, or an SQLite error code otherwise.
1540 ** The format of a PMA is:
1542 ** * A varint. This varint contains the total number of bytes of content
1543 ** in the PMA (not including the varint itself).
1545 ** * One or more records packed end-to-end in order of ascending keys.
1546 ** Each record consists of a varint followed by a blob of data (the
1547 ** key). The varint is the number of bytes in the blob of data.
1549 static int vdbeSorterListToPMA(SortSubtask
*pTask
, SorterList
*pList
){
1550 sqlite3
*db
= pTask
->pSorter
->db
;
1551 int rc
= SQLITE_OK
; /* Return code */
1552 PmaWriter writer
; /* Object used to write to the file */
1555 /* Set iSz to the expected size of file pTask->file after writing the PMA.
1556 ** This is used by an assert() statement at the end of this function. */
1557 i64 iSz
= pList
->szPMA
+ sqlite3VarintLen(pList
->szPMA
) + pTask
->file
.iEof
;
1560 vdbeSorterWorkDebug(pTask
, "enter");
1561 memset(&writer
, 0, sizeof(PmaWriter
));
1562 assert( pList
->szPMA
>0 );
1564 /* If the first temporary PMA file has not been opened, open it now. */
1565 if( pTask
->file
.pFd
==0 ){
1566 rc
= vdbeSorterOpenTempFile(db
, 0, &pTask
->file
.pFd
);
1567 assert( rc
!=SQLITE_OK
|| pTask
->file
.pFd
);
1568 assert( pTask
->file
.iEof
==0 );
1569 assert( pTask
->nPMA
==0 );
1572 /* Try to get the file to memory map */
1573 if( rc
==SQLITE_OK
){
1574 vdbeSorterExtendFile(db
, pTask
->file
.pFd
, pTask
->file
.iEof
+pList
->szPMA
+9);
1578 if( rc
==SQLITE_OK
){
1579 rc
= vdbeSorterSort(pTask
, pList
);
1582 if( rc
==SQLITE_OK
){
1584 SorterRecord
*pNext
= 0;
1586 vdbePmaWriterInit(pTask
->file
.pFd
, &writer
, pTask
->pSorter
->pgsz
,
1589 vdbePmaWriteVarint(&writer
, pList
->szPMA
);
1590 for(p
=pList
->pList
; p
; p
=pNext
){
1592 vdbePmaWriteVarint(&writer
, p
->nVal
);
1593 vdbePmaWriteBlob(&writer
, SRVAL(p
), p
->nVal
);
1594 if( pList
->aMemory
==0 ) sqlite3_free(p
);
1597 rc
= vdbePmaWriterFinish(&writer
, &pTask
->file
.iEof
);
1600 vdbeSorterWorkDebug(pTask
, "exit");
1601 assert( rc
!=SQLITE_OK
|| pList
->pList
==0 );
1602 assert( rc
!=SQLITE_OK
|| pTask
->file
.iEof
==iSz
);
1607 ** Advance the MergeEngine to its next entry.
1608 ** Set *pbEof to true there is no next entry because
1609 ** the MergeEngine has reached the end of all its inputs.
1611 ** Return SQLITE_OK if successful or an error code if an error occurs.
1613 static int vdbeMergeEngineStep(
1614 MergeEngine
*pMerger
, /* The merge engine to advance to the next row */
1615 int *pbEof
/* Set TRUE at EOF. Set false for more content */
1618 int iPrev
= pMerger
->aTree
[1];/* Index of PmaReader to advance */
1619 SortSubtask
*pTask
= pMerger
->pTask
;
1621 /* Advance the current PmaReader */
1622 rc
= vdbePmaReaderNext(&pMerger
->aReadr
[iPrev
]);
1624 /* Update contents of aTree[] */
1625 if( rc
==SQLITE_OK
){
1626 int i
; /* Index of aTree[] to recalculate */
1627 PmaReader
*pReadr1
; /* First PmaReader to compare */
1628 PmaReader
*pReadr2
; /* Second PmaReader to compare */
1631 /* Find the first two PmaReaders to compare. The one that was just
1632 ** advanced (iPrev) and the one next to it in the array. */
1633 pReadr1
= &pMerger
->aReadr
[(iPrev
& 0xFFFE)];
1634 pReadr2
= &pMerger
->aReadr
[(iPrev
| 0x0001)];
1636 for(i
=(pMerger
->nTree
+iPrev
)/2; i
>0; i
=i
/2){
1637 /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
1639 if( pReadr1
->pFd
==0 ){
1641 }else if( pReadr2
->pFd
==0 ){
1644 iRes
= pTask
->xCompare(pTask
, &bCached
,
1645 pReadr1
->aKey
, pReadr1
->nKey
, pReadr2
->aKey
, pReadr2
->nKey
1649 /* If pReadr1 contained the smaller value, set aTree[i] to its index.
1650 ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
1651 ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
1652 ** pKey2 to point to the record belonging to pReadr2.
1654 ** Alternatively, if pReadr2 contains the smaller of the two values,
1655 ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
1656 ** was actually called above, then pTask->pUnpacked now contains
1657 ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
1658 ** vdbeSorterCompare() from decoding pReadr2 again.
1660 ** If the two values were equal, then the value from the oldest
1661 ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
1662 ** is sorted from oldest to newest, so pReadr1 contains older values
1663 ** than pReadr2 iff (pReadr1<pReadr2). */
1664 if( iRes
<0 || (iRes
==0 && pReadr1
<pReadr2
) ){
1665 pMerger
->aTree
[i
] = (int)(pReadr1
- pMerger
->aReadr
);
1666 pReadr2
= &pMerger
->aReadr
[ pMerger
->aTree
[i
^ 0x0001] ];
1669 if( pReadr1
->pFd
) bCached
= 0;
1670 pMerger
->aTree
[i
] = (int)(pReadr2
- pMerger
->aReadr
);
1671 pReadr1
= &pMerger
->aReadr
[ pMerger
->aTree
[i
^ 0x0001] ];
1674 *pbEof
= (pMerger
->aReadr
[pMerger
->aTree
[1]].pFd
==0);
1677 return (rc
==SQLITE_OK
? pTask
->pUnpacked
->errCode
: rc
);
1680 #if SQLITE_MAX_WORKER_THREADS>0
1682 ** The main routine for background threads that write level-0 PMAs.
1684 static void *vdbeSorterFlushThread(void *pCtx
){
1685 SortSubtask
*pTask
= (SortSubtask
*)pCtx
;
1686 int rc
; /* Return code */
1687 assert( pTask
->bDone
==0 );
1688 rc
= vdbeSorterListToPMA(pTask
, &pTask
->list
);
1690 return SQLITE_INT_TO_PTR(rc
);
1692 #endif /* SQLITE_MAX_WORKER_THREADS>0 */
1695 ** Flush the current contents of VdbeSorter.list to a new PMA, possibly
1696 ** using a background thread.
1698 static int vdbeSorterFlushPMA(VdbeSorter
*pSorter
){
1699 #if SQLITE_MAX_WORKER_THREADS==0
1700 pSorter
->bUsePMA
= 1;
1701 return vdbeSorterListToPMA(&pSorter
->aTask
[0], &pSorter
->list
);
1705 SortSubtask
*pTask
= 0; /* Thread context used to create new PMA */
1706 int nWorker
= (pSorter
->nTask
-1);
1708 /* Set the flag to indicate that at least one PMA has been written.
1709 ** Or will be, anyhow. */
1710 pSorter
->bUsePMA
= 1;
1712 /* Select a sub-task to sort and flush the current list of in-memory
1713 ** records to disk. If the sorter is running in multi-threaded mode,
1714 ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
1715 ** the background thread from a sub-tasks previous turn is still running,
1716 ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
1717 ** fall back to using the final sub-task. The first (pSorter->nTask-1)
1718 ** sub-tasks are preferred as they use background threads - the final
1719 ** sub-task uses the main thread. */
1720 for(i
=0; i
<nWorker
; i
++){
1721 int iTest
= (pSorter
->iPrev
+ i
+ 1) % nWorker
;
1722 pTask
= &pSorter
->aTask
[iTest
];
1724 rc
= vdbeSorterJoinThread(pTask
);
1726 if( rc
!=SQLITE_OK
|| pTask
->pThread
==0 ) break;
1729 if( rc
==SQLITE_OK
){
1731 /* Use the foreground thread for this operation */
1732 rc
= vdbeSorterListToPMA(&pSorter
->aTask
[nWorker
], &pSorter
->list
);
1734 /* Launch a background thread for this operation */
1739 assert( pTask
->pThread
==0 && pTask
->bDone
==0 );
1740 assert( pTask
->list
.pList
==0 );
1741 assert( pTask
->list
.aMemory
==0 || pSorter
->list
.aMemory
!=0 );
1743 aMem
= pTask
->list
.aMemory
;
1744 pCtx
= (void*)pTask
;
1745 pSorter
->iPrev
= (u8
)(pTask
- pSorter
->aTask
);
1746 pTask
->list
= pSorter
->list
;
1747 pSorter
->list
.pList
= 0;
1748 pSorter
->list
.szPMA
= 0;
1750 pSorter
->list
.aMemory
= aMem
;
1751 pSorter
->nMemory
= sqlite3MallocSize(aMem
);
1752 }else if( pSorter
->list
.aMemory
){
1753 pSorter
->list
.aMemory
= sqlite3Malloc(pSorter
->nMemory
);
1754 if( !pSorter
->list
.aMemory
) return SQLITE_NOMEM_BKPT
;
1757 rc
= vdbeSorterCreateThread(pTask
, vdbeSorterFlushThread
, pCtx
);
1762 #endif /* SQLITE_MAX_WORKER_THREADS!=0 */
1766 ** Add a record to the sorter.
1768 int sqlite3VdbeSorterWrite(
1769 const VdbeCursor
*pCsr
, /* Sorter cursor */
1770 Mem
*pVal
/* Memory cell containing record */
1772 VdbeSorter
*pSorter
;
1773 int rc
= SQLITE_OK
; /* Return Code */
1774 SorterRecord
*pNew
; /* New list element */
1775 int bFlush
; /* True to flush contents of memory to PMA */
1776 i64 nReq
; /* Bytes of memory required */
1777 i64 nPMA
; /* Bytes of PMA space required */
1778 int t
; /* serial type of first record field */
1780 assert( pCsr
->eCurType
==CURTYPE_SORTER
);
1781 pSorter
= pCsr
->uc
.pSorter
;
1782 getVarint32NR((const u8
*)&pVal
->z
[1], t
);
1783 if( t
>0 && t
<10 && t
!=7 ){
1784 pSorter
->typeMask
&= SORTER_TYPE_INTEGER
;
1785 }else if( t
>10 && (t
& 0x01) ){
1786 pSorter
->typeMask
&= SORTER_TYPE_TEXT
;
1788 pSorter
->typeMask
= 0;
1793 /* Figure out whether or not the current contents of memory should be
1794 ** flushed to a PMA before continuing. If so, do so.
1796 ** If using the single large allocation mode (pSorter->aMemory!=0), then
1797 ** flush the contents of memory to a new PMA if (a) at least one value is
1798 ** already in memory and (b) the new value will not fit in memory.
1800 ** Or, if using separate allocations for each record, flush the contents
1801 ** of memory to a PMA if either of the following are true:
1803 ** * The total memory allocated for the in-memory list is greater
1804 ** than (page-size * cache-size), or
1806 ** * The total memory allocated for the in-memory list is greater
1807 ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
1809 nReq
= pVal
->n
+ sizeof(SorterRecord
);
1810 nPMA
= pVal
->n
+ sqlite3VarintLen(pVal
->n
);
1811 if( pSorter
->mxPmaSize
){
1812 if( pSorter
->list
.aMemory
){
1813 bFlush
= pSorter
->iMemory
&& (pSorter
->iMemory
+nReq
) > pSorter
->mxPmaSize
;
1816 (pSorter
->list
.szPMA
> pSorter
->mxPmaSize
)
1817 || (pSorter
->list
.szPMA
> pSorter
->mnPmaSize
&& sqlite3HeapNearlyFull())
1821 rc
= vdbeSorterFlushPMA(pSorter
);
1822 pSorter
->list
.szPMA
= 0;
1823 pSorter
->iMemory
= 0;
1824 assert( rc
!=SQLITE_OK
|| pSorter
->list
.pList
==0 );
1828 pSorter
->list
.szPMA
+= nPMA
;
1829 if( nPMA
>pSorter
->mxKeysize
){
1830 pSorter
->mxKeysize
= nPMA
;
1833 if( pSorter
->list
.aMemory
){
1834 int nMin
= pSorter
->iMemory
+ nReq
;
1836 if( nMin
>pSorter
->nMemory
){
1838 sqlite3_int64 nNew
= 2 * (sqlite3_int64
)pSorter
->nMemory
;
1840 if( pSorter
->list
.pList
){
1841 iListOff
= (u8
*)pSorter
->list
.pList
- pSorter
->list
.aMemory
;
1843 while( nNew
< nMin
) nNew
= nNew
*2;
1844 if( nNew
> pSorter
->mxPmaSize
) nNew
= pSorter
->mxPmaSize
;
1845 if( nNew
< nMin
) nNew
= nMin
;
1846 aNew
= sqlite3Realloc(pSorter
->list
.aMemory
, nNew
);
1847 if( !aNew
) return SQLITE_NOMEM_BKPT
;
1849 pSorter
->list
.pList
= (SorterRecord
*)&aNew
[iListOff
];
1851 pSorter
->list
.aMemory
= aNew
;
1852 pSorter
->nMemory
= nNew
;
1855 pNew
= (SorterRecord
*)&pSorter
->list
.aMemory
[pSorter
->iMemory
];
1856 pSorter
->iMemory
+= ROUND8(nReq
);
1857 if( pSorter
->list
.pList
){
1858 pNew
->u
.iNext
= (int)((u8
*)(pSorter
->list
.pList
) - pSorter
->list
.aMemory
);
1861 pNew
= (SorterRecord
*)sqlite3Malloc(nReq
);
1863 return SQLITE_NOMEM_BKPT
;
1865 pNew
->u
.pNext
= pSorter
->list
.pList
;
1868 memcpy(SRVAL(pNew
), pVal
->z
, pVal
->n
);
1869 pNew
->nVal
= pVal
->n
;
1870 pSorter
->list
.pList
= pNew
;
1876 ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
1877 ** of the data stored in aFile[1] is the same as that used by regular PMAs,
1878 ** except that the number-of-bytes varint is omitted from the start.
1880 static int vdbeIncrPopulate(IncrMerger
*pIncr
){
1883 i64 iStart
= pIncr
->iStartOff
;
1884 SorterFile
*pOut
= &pIncr
->aFile
[1];
1885 SortSubtask
*pTask
= pIncr
->pTask
;
1886 MergeEngine
*pMerger
= pIncr
->pMerger
;
1888 assert( pIncr
->bEof
==0 );
1890 vdbeSorterPopulateDebug(pTask
, "enter");
1892 vdbePmaWriterInit(pOut
->pFd
, &writer
, pTask
->pSorter
->pgsz
, iStart
);
1893 while( rc
==SQLITE_OK
){
1895 PmaReader
*pReader
= &pMerger
->aReadr
[ pMerger
->aTree
[1] ];
1896 int nKey
= pReader
->nKey
;
1897 i64 iEof
= writer
.iWriteOff
+ writer
.iBufEnd
;
1899 /* Check if the output file is full or if the input has been exhausted.
1900 ** In either case exit the loop. */
1901 if( pReader
->pFd
==0 ) break;
1902 if( (iEof
+ nKey
+ sqlite3VarintLen(nKey
))>(iStart
+ pIncr
->mxSz
) ) break;
1904 /* Write the next key to the output. */
1905 vdbePmaWriteVarint(&writer
, nKey
);
1906 vdbePmaWriteBlob(&writer
, pReader
->aKey
, nKey
);
1907 assert( pIncr
->pMerger
->pTask
==pTask
);
1908 rc
= vdbeMergeEngineStep(pIncr
->pMerger
, &dummy
);
1911 rc2
= vdbePmaWriterFinish(&writer
, &pOut
->iEof
);
1912 if( rc
==SQLITE_OK
) rc
= rc2
;
1913 vdbeSorterPopulateDebug(pTask
, "exit");
1917 #if SQLITE_MAX_WORKER_THREADS>0
1919 ** The main routine for background threads that populate aFile[1] of
1920 ** multi-threaded IncrMerger objects.
1922 static void *vdbeIncrPopulateThread(void *pCtx
){
1923 IncrMerger
*pIncr
= (IncrMerger
*)pCtx
;
1924 void *pRet
= SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr
) );
1925 pIncr
->pTask
->bDone
= 1;
1930 ** Launch a background thread to populate aFile[1] of pIncr.
1932 static int vdbeIncrBgPopulate(IncrMerger
*pIncr
){
1933 void *p
= (void*)pIncr
;
1934 assert( pIncr
->bUseThread
);
1935 return vdbeSorterCreateThread(pIncr
->pTask
, vdbeIncrPopulateThread
, p
);
1940 ** This function is called when the PmaReader corresponding to pIncr has
1941 ** finished reading the contents of aFile[0]. Its purpose is to "refill"
1942 ** aFile[0] such that the PmaReader should start rereading it from the
1945 ** For single-threaded objects, this is accomplished by literally reading
1946 ** keys from pIncr->pMerger and repopulating aFile[0].
1948 ** For multi-threaded objects, all that is required is to wait until the
1949 ** background thread is finished (if it is not already) and then swap
1950 ** aFile[0] and aFile[1] in place. If the contents of pMerger have not
1951 ** been exhausted, this function also launches a new background thread
1952 ** to populate the new aFile[1].
1954 ** SQLITE_OK is returned on success, or an SQLite error code otherwise.
1956 static int vdbeIncrSwap(IncrMerger
*pIncr
){
1959 #if SQLITE_MAX_WORKER_THREADS>0
1960 if( pIncr
->bUseThread
){
1961 rc
= vdbeSorterJoinThread(pIncr
->pTask
);
1963 if( rc
==SQLITE_OK
){
1964 SorterFile f0
= pIncr
->aFile
[0];
1965 pIncr
->aFile
[0] = pIncr
->aFile
[1];
1966 pIncr
->aFile
[1] = f0
;
1969 if( rc
==SQLITE_OK
){
1970 if( pIncr
->aFile
[0].iEof
==pIncr
->iStartOff
){
1973 rc
= vdbeIncrBgPopulate(pIncr
);
1979 rc
= vdbeIncrPopulate(pIncr
);
1980 pIncr
->aFile
[0] = pIncr
->aFile
[1];
1981 if( pIncr
->aFile
[0].iEof
==pIncr
->iStartOff
){
1990 ** Allocate and return a new IncrMerger object to read data from pMerger.
1992 ** If an OOM condition is encountered, return NULL. In this case free the
1993 ** pMerger argument before returning.
1995 static int vdbeIncrMergerNew(
1996 SortSubtask
*pTask
, /* The thread that will be using the new IncrMerger */
1997 MergeEngine
*pMerger
, /* The MergeEngine that the IncrMerger will control */
1998 IncrMerger
**ppOut
/* Write the new IncrMerger here */
2001 IncrMerger
*pIncr
= *ppOut
= (IncrMerger
*)
2002 (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr
)));
2004 pIncr
->pMerger
= pMerger
;
2005 pIncr
->pTask
= pTask
;
2006 pIncr
->mxSz
= MAX(pTask
->pSorter
->mxKeysize
+9,pTask
->pSorter
->mxPmaSize
/2);
2007 pTask
->file2
.iEof
+= pIncr
->mxSz
;
2009 vdbeMergeEngineFree(pMerger
);
2010 rc
= SQLITE_NOMEM_BKPT
;
2012 assert( *ppOut
!=0 || rc
!=SQLITE_OK
);
2016 #if SQLITE_MAX_WORKER_THREADS>0
2018 ** Set the "use-threads" flag on object pIncr.
2020 static void vdbeIncrMergerSetThreads(IncrMerger
*pIncr
){
2021 pIncr
->bUseThread
= 1;
2022 pIncr
->pTask
->file2
.iEof
-= pIncr
->mxSz
;
2024 #endif /* SQLITE_MAX_WORKER_THREADS>0 */
2029 ** Recompute pMerger->aTree[iOut] by comparing the next keys on the
2030 ** two PmaReaders that feed that entry. Neither of the PmaReaders
2031 ** are advanced. This routine merely does the comparison.
2033 static void vdbeMergeEngineCompare(
2034 MergeEngine
*pMerger
, /* Merge engine containing PmaReaders to compare */
2035 int iOut
/* Store the result in pMerger->aTree[iOut] */
2043 assert( iOut
<pMerger
->nTree
&& iOut
>0 );
2045 if( iOut
>=(pMerger
->nTree
/2) ){
2046 i1
= (iOut
- pMerger
->nTree
/2) * 2;
2049 i1
= pMerger
->aTree
[iOut
*2];
2050 i2
= pMerger
->aTree
[iOut
*2+1];
2053 p1
= &pMerger
->aReadr
[i1
];
2054 p2
= &pMerger
->aReadr
[i2
];
2058 }else if( p2
->pFd
==0 ){
2061 SortSubtask
*pTask
= pMerger
->pTask
;
2064 assert( pTask
->pUnpacked
!=0 ); /* from vdbeSortSubtaskMain() */
2065 res
= pTask
->xCompare(
2066 pTask
, &bCached
, p1
->aKey
, p1
->nKey
, p2
->aKey
, p2
->nKey
2075 pMerger
->aTree
[iOut
] = iRes
;
2079 ** Allowed values for the eMode parameter to vdbeMergeEngineInit()
2080 ** and vdbePmaReaderIncrMergeInit().
2082 ** Only INCRINIT_NORMAL is valid in single-threaded builds (when
2083 ** SQLITE_MAX_WORKER_THREADS==0). The other values are only used
2084 ** when there exists one or more separate worker threads.
2086 #define INCRINIT_NORMAL 0
2087 #define INCRINIT_TASK 1
2088 #define INCRINIT_ROOT 2
2091 ** Forward reference required as the vdbeIncrMergeInit() and
2092 ** vdbePmaReaderIncrInit() routines are called mutually recursively when
2093 ** building a merge tree.
2095 static int vdbePmaReaderIncrInit(PmaReader
*pReadr
, int eMode
);
2098 ** Initialize the MergeEngine object passed as the second argument. Once this
2099 ** function returns, the first key of merged data may be read from the
2100 ** MergeEngine object in the usual fashion.
2102 ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
2103 ** objects attached to the PmaReader objects that the merger reads from have
2104 ** already been populated, but that they have not yet populated aFile[0] and
2105 ** set the PmaReader objects up to read from it. In this case all that is
2106 ** required is to call vdbePmaReaderNext() on each PmaReader to point it at
2109 ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use
2110 ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data
2113 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2115 static int vdbeMergeEngineInit(
2116 SortSubtask
*pTask
, /* Thread that will run pMerger */
2117 MergeEngine
*pMerger
, /* MergeEngine to initialize */
2118 int eMode
/* One of the INCRINIT_XXX constants */
2120 int rc
= SQLITE_OK
; /* Return code */
2121 int i
; /* For looping over PmaReader objects */
2122 int nTree
; /* Number of subtrees to merge */
2124 /* Failure to allocate the merge would have been detected prior to
2125 ** invoking this routine */
2126 assert( pMerger
!=0 );
2128 /* eMode is always INCRINIT_NORMAL in single-threaded mode */
2129 assert( SQLITE_MAX_WORKER_THREADS
>0 || eMode
==INCRINIT_NORMAL
);
2131 /* Verify that the MergeEngine is assigned to a single thread */
2132 assert( pMerger
->pTask
==0 );
2133 pMerger
->pTask
= pTask
;
2135 nTree
= pMerger
->nTree
;
2136 for(i
=0; i
<nTree
; i
++){
2137 if( SQLITE_MAX_WORKER_THREADS
>0 && eMode
==INCRINIT_ROOT
){
2138 /* PmaReaders should be normally initialized in order, as if they are
2139 ** reading from the same temp file this makes for more linear file IO.
2140 ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
2141 ** in use it will block the vdbePmaReaderNext() call while it uses
2142 ** the main thread to fill its buffer. So calling PmaReaderNext()
2143 ** on this PmaReader before any of the multi-threaded PmaReaders takes
2144 ** better advantage of multi-processor hardware. */
2145 rc
= vdbePmaReaderNext(&pMerger
->aReadr
[nTree
-i
-1]);
2147 rc
= vdbePmaReaderIncrInit(&pMerger
->aReadr
[i
], INCRINIT_NORMAL
);
2149 if( rc
!=SQLITE_OK
) return rc
;
2152 for(i
=pMerger
->nTree
-1; i
>0; i
--){
2153 vdbeMergeEngineCompare(pMerger
, i
);
2155 return pTask
->pUnpacked
->errCode
;
2159 ** The PmaReader passed as the first argument is guaranteed to be an
2160 ** incremental-reader (pReadr->pIncr!=0). This function serves to open
2161 ** and/or initialize the temp file related fields of the IncrMerge
2162 ** object at (pReadr->pIncr).
2164 ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
2165 ** in the sub-tree headed by pReadr are also initialized. Data is then
2166 ** loaded into the buffers belonging to pReadr and it is set to point to
2167 ** the first key in its range.
2169 ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
2170 ** to be a multi-threaded PmaReader and this function is being called in a
2171 ** background thread. In this case all PmaReaders in the sub-tree are
2172 ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
2173 ** pReadr is populated. However, pReadr itself is not set up to point
2174 ** to its first key. A call to vdbePmaReaderNext() is still required to do
2177 ** The reason this function does not call vdbePmaReaderNext() immediately
2178 ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
2179 ** to block on thread (pTask->thread) before accessing aFile[1]. But, since
2180 ** this entire function is being run by thread (pTask->thread), that will
2181 ** lead to the current background thread attempting to join itself.
2183 ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
2184 ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
2185 ** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
2186 ** In this case vdbePmaReaderNext() is called on all child PmaReaders and
2187 ** the current PmaReader set to point to the first key in its range.
2189 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2191 static int vdbePmaReaderIncrMergeInit(PmaReader
*pReadr
, int eMode
){
2193 IncrMerger
*pIncr
= pReadr
->pIncr
;
2194 SortSubtask
*pTask
= pIncr
->pTask
;
2195 sqlite3
*db
= pTask
->pSorter
->db
;
2197 /* eMode is always INCRINIT_NORMAL in single-threaded mode */
2198 assert( SQLITE_MAX_WORKER_THREADS
>0 || eMode
==INCRINIT_NORMAL
);
2200 rc
= vdbeMergeEngineInit(pTask
, pIncr
->pMerger
, eMode
);
2202 /* Set up the required files for pIncr. A multi-threaded IncrMerge object
2203 ** requires two temp files to itself, whereas a single-threaded object
2204 ** only requires a region of pTask->file2. */
2205 if( rc
==SQLITE_OK
){
2206 int mxSz
= pIncr
->mxSz
;
2207 #if SQLITE_MAX_WORKER_THREADS>0
2208 if( pIncr
->bUseThread
){
2209 rc
= vdbeSorterOpenTempFile(db
, mxSz
, &pIncr
->aFile
[0].pFd
);
2210 if( rc
==SQLITE_OK
){
2211 rc
= vdbeSorterOpenTempFile(db
, mxSz
, &pIncr
->aFile
[1].pFd
);
2215 /*if( !pIncr->bUseThread )*/{
2216 if( pTask
->file2
.pFd
==0 ){
2217 assert( pTask
->file2
.iEof
>0 );
2218 rc
= vdbeSorterOpenTempFile(db
, pTask
->file2
.iEof
, &pTask
->file2
.pFd
);
2219 pTask
->file2
.iEof
= 0;
2221 if( rc
==SQLITE_OK
){
2222 pIncr
->aFile
[1].pFd
= pTask
->file2
.pFd
;
2223 pIncr
->iStartOff
= pTask
->file2
.iEof
;
2224 pTask
->file2
.iEof
+= mxSz
;
2229 #if SQLITE_MAX_WORKER_THREADS>0
2230 if( rc
==SQLITE_OK
&& pIncr
->bUseThread
){
2231 /* Use the current thread to populate aFile[1], even though this
2232 ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object,
2233 ** then this function is already running in background thread
2234 ** pIncr->pTask->thread.
2236 ** If this is the INCRINIT_ROOT object, then it is running in the
2237 ** main VDBE thread. But that is Ok, as that thread cannot return
2238 ** control to the VDBE or proceed with anything useful until the
2239 ** first results are ready from this merger object anyway.
2241 assert( eMode
==INCRINIT_ROOT
|| eMode
==INCRINIT_TASK
);
2242 rc
= vdbeIncrPopulate(pIncr
);
2246 if( rc
==SQLITE_OK
&& (SQLITE_MAX_WORKER_THREADS
==0 || eMode
!=INCRINIT_TASK
) ){
2247 rc
= vdbePmaReaderNext(pReadr
);
2253 #if SQLITE_MAX_WORKER_THREADS>0
2255 ** The main routine for vdbePmaReaderIncrMergeInit() operations run in
2256 ** background threads.
2258 static void *vdbePmaReaderBgIncrInit(void *pCtx
){
2259 PmaReader
*pReader
= (PmaReader
*)pCtx
;
2260 void *pRet
= SQLITE_INT_TO_PTR(
2261 vdbePmaReaderIncrMergeInit(pReader
,INCRINIT_TASK
)
2263 pReader
->pIncr
->pTask
->bDone
= 1;
2269 ** If the PmaReader passed as the first argument is not an incremental-reader
2270 ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes
2271 ** the vdbePmaReaderIncrMergeInit() function with the parameters passed to
2272 ** this routine to initialize the incremental merge.
2274 ** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1),
2275 ** then a background thread is launched to call vdbePmaReaderIncrMergeInit().
2276 ** Or, if the IncrMerger is single threaded, the same function is called
2277 ** using the current thread.
2279 static int vdbePmaReaderIncrInit(PmaReader
*pReadr
, int eMode
){
2280 IncrMerger
*pIncr
= pReadr
->pIncr
; /* Incremental merger */
2281 int rc
= SQLITE_OK
; /* Return code */
2283 #if SQLITE_MAX_WORKER_THREADS>0
2284 assert( pIncr
->bUseThread
==0 || eMode
==INCRINIT_TASK
);
2285 if( pIncr
->bUseThread
){
2286 void *pCtx
= (void*)pReadr
;
2287 rc
= vdbeSorterCreateThread(pIncr
->pTask
, vdbePmaReaderBgIncrInit
, pCtx
);
2291 rc
= vdbePmaReaderIncrMergeInit(pReadr
, eMode
);
2298 ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
2299 ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
2300 ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
2301 ** to NULL and return an SQLite error code.
2303 ** When this function is called, *piOffset is set to the offset of the
2304 ** first PMA to read from pTask->file. Assuming no error occurs, it is
2305 ** set to the offset immediately following the last byte of the last
2306 ** PMA before returning. If an error does occur, then the final value of
2307 ** *piOffset is undefined.
2309 static int vdbeMergeEngineLevel0(
2310 SortSubtask
*pTask
, /* Sorter task to read from */
2311 int nPMA
, /* Number of PMAs to read */
2312 i64
*piOffset
, /* IN/OUT: Readr offset in pTask->file */
2313 MergeEngine
**ppOut
/* OUT: New merge-engine */
2315 MergeEngine
*pNew
; /* Merge engine to return */
2316 i64 iOff
= *piOffset
;
2320 *ppOut
= pNew
= vdbeMergeEngineNew(nPMA
);
2321 if( pNew
==0 ) rc
= SQLITE_NOMEM_BKPT
;
2323 for(i
=0; i
<nPMA
&& rc
==SQLITE_OK
; i
++){
2325 PmaReader
*pReadr
= &pNew
->aReadr
[i
];
2326 rc
= vdbePmaReaderInit(pTask
, &pTask
->file
, iOff
, pReadr
, &nDummy
);
2327 iOff
= pReadr
->iEof
;
2330 if( rc
!=SQLITE_OK
){
2331 vdbeMergeEngineFree(pNew
);
2339 ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
2340 ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
2344 ** nPMA<=16 -> TreeDepth() == 0
2345 ** nPMA<=256 -> TreeDepth() == 1
2346 ** nPMA<=65536 -> TreeDepth() == 2
2348 static int vdbeSorterTreeDepth(int nPMA
){
2350 i64 nDiv
= SORTER_MAX_MERGE_COUNT
;
2351 while( nDiv
< (i64
)nPMA
){
2352 nDiv
= nDiv
* SORTER_MAX_MERGE_COUNT
;
2359 ** pRoot is the root of an incremental merge-tree with depth nDepth (according
2360 ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
2361 ** tree, counting from zero. This function adds pLeaf to the tree.
2363 ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
2364 ** code is returned and pLeaf is freed.
2366 static int vdbeSorterAddToTree(
2367 SortSubtask
*pTask
, /* Task context */
2368 int nDepth
, /* Depth of tree according to TreeDepth() */
2369 int iSeq
, /* Sequence number of leaf within tree */
2370 MergeEngine
*pRoot
, /* Root of tree */
2371 MergeEngine
*pLeaf
/* Leaf to add to tree */
2376 MergeEngine
*p
= pRoot
;
2379 rc
= vdbeIncrMergerNew(pTask
, pLeaf
, &pIncr
);
2381 for(i
=1; i
<nDepth
; i
++){
2382 nDiv
= nDiv
* SORTER_MAX_MERGE_COUNT
;
2385 for(i
=1; i
<nDepth
&& rc
==SQLITE_OK
; i
++){
2386 int iIter
= (iSeq
/ nDiv
) % SORTER_MAX_MERGE_COUNT
;
2387 PmaReader
*pReadr
= &p
->aReadr
[iIter
];
2389 if( pReadr
->pIncr
==0 ){
2390 MergeEngine
*pNew
= vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT
);
2392 rc
= SQLITE_NOMEM_BKPT
;
2394 rc
= vdbeIncrMergerNew(pTask
, pNew
, &pReadr
->pIncr
);
2397 if( rc
==SQLITE_OK
){
2398 p
= pReadr
->pIncr
->pMerger
;
2399 nDiv
= nDiv
/ SORTER_MAX_MERGE_COUNT
;
2403 if( rc
==SQLITE_OK
){
2404 p
->aReadr
[iSeq
% SORTER_MAX_MERGE_COUNT
].pIncr
= pIncr
;
2406 vdbeIncrFree(pIncr
);
2412 ** This function is called as part of a SorterRewind() operation on a sorter
2413 ** that has already written two or more level-0 PMAs to one or more temp
2414 ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that
2415 ** can be used to incrementally merge all PMAs on disk.
2417 ** If successful, SQLITE_OK is returned and *ppOut set to point to the
2418 ** MergeEngine object at the root of the tree before returning. Or, if an
2419 ** error occurs, an SQLite error code is returned and the final value
2420 ** of *ppOut is undefined.
2422 static int vdbeSorterMergeTreeBuild(
2423 VdbeSorter
*pSorter
, /* The VDBE cursor that implements the sort */
2424 MergeEngine
**ppOut
/* Write the MergeEngine here */
2426 MergeEngine
*pMain
= 0;
2430 #if SQLITE_MAX_WORKER_THREADS>0
2431 /* If the sorter uses more than one task, then create the top-level
2432 ** MergeEngine here. This MergeEngine will read data from exactly
2433 ** one PmaReader per sub-task. */
2434 assert( pSorter
->bUseThreads
|| pSorter
->nTask
==1 );
2435 if( pSorter
->nTask
>1 ){
2436 pMain
= vdbeMergeEngineNew(pSorter
->nTask
);
2437 if( pMain
==0 ) rc
= SQLITE_NOMEM_BKPT
;
2441 for(iTask
=0; rc
==SQLITE_OK
&& iTask
<pSorter
->nTask
; iTask
++){
2442 SortSubtask
*pTask
= &pSorter
->aTask
[iTask
];
2443 assert( pTask
->nPMA
>0 || SQLITE_MAX_WORKER_THREADS
>0 );
2444 if( SQLITE_MAX_WORKER_THREADS
==0 || pTask
->nPMA
){
2445 MergeEngine
*pRoot
= 0; /* Root node of tree for this task */
2446 int nDepth
= vdbeSorterTreeDepth(pTask
->nPMA
);
2449 if( pTask
->nPMA
<=SORTER_MAX_MERGE_COUNT
){
2450 rc
= vdbeMergeEngineLevel0(pTask
, pTask
->nPMA
, &iReadOff
, &pRoot
);
2454 pRoot
= vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT
);
2455 if( pRoot
==0 ) rc
= SQLITE_NOMEM_BKPT
;
2456 for(i
=0; i
<pTask
->nPMA
&& rc
==SQLITE_OK
; i
+= SORTER_MAX_MERGE_COUNT
){
2457 MergeEngine
*pMerger
= 0; /* New level-0 PMA merger */
2458 int nReader
; /* Number of level-0 PMAs to merge */
2460 nReader
= MIN(pTask
->nPMA
- i
, SORTER_MAX_MERGE_COUNT
);
2461 rc
= vdbeMergeEngineLevel0(pTask
, nReader
, &iReadOff
, &pMerger
);
2462 if( rc
==SQLITE_OK
){
2463 rc
= vdbeSorterAddToTree(pTask
, nDepth
, iSeq
++, pRoot
, pMerger
);
2468 if( rc
==SQLITE_OK
){
2469 #if SQLITE_MAX_WORKER_THREADS>0
2471 rc
= vdbeIncrMergerNew(pTask
, pRoot
, &pMain
->aReadr
[iTask
].pIncr
);
2479 vdbeMergeEngineFree(pRoot
);
2484 if( rc
!=SQLITE_OK
){
2485 vdbeMergeEngineFree(pMain
);
2493 ** This function is called as part of an sqlite3VdbeSorterRewind() operation
2494 ** on a sorter that has written two or more PMAs to temporary files. It sets
2495 ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader
2496 ** (for multi-threaded sorters) so that it can be used to iterate through
2497 ** all records stored in the sorter.
2499 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2501 static int vdbeSorterSetupMerge(VdbeSorter
*pSorter
){
2502 int rc
; /* Return code */
2503 SortSubtask
*pTask0
= &pSorter
->aTask
[0];
2504 MergeEngine
*pMain
= 0;
2505 #if SQLITE_MAX_WORKER_THREADS
2506 sqlite3
*db
= pTask0
->pSorter
->db
;
2508 SorterCompare xCompare
= vdbeSorterGetCompare(pSorter
);
2509 for(i
=0; i
<pSorter
->nTask
; i
++){
2510 pSorter
->aTask
[i
].xCompare
= xCompare
;
2514 rc
= vdbeSorterMergeTreeBuild(pSorter
, &pMain
);
2515 if( rc
==SQLITE_OK
){
2516 #if SQLITE_MAX_WORKER_THREADS
2517 assert( pSorter
->bUseThreads
==0 || pSorter
->nTask
>1 );
2518 if( pSorter
->bUseThreads
){
2520 PmaReader
*pReadr
= 0;
2521 SortSubtask
*pLast
= &pSorter
->aTask
[pSorter
->nTask
-1];
2522 rc
= vdbeSortAllocUnpacked(pLast
);
2523 if( rc
==SQLITE_OK
){
2524 pReadr
= (PmaReader
*)sqlite3DbMallocZero(db
, sizeof(PmaReader
));
2525 pSorter
->pReader
= pReadr
;
2526 if( pReadr
==0 ) rc
= SQLITE_NOMEM_BKPT
;
2528 if( rc
==SQLITE_OK
){
2529 rc
= vdbeIncrMergerNew(pLast
, pMain
, &pReadr
->pIncr
);
2530 if( rc
==SQLITE_OK
){
2531 vdbeIncrMergerSetThreads(pReadr
->pIncr
);
2532 for(iTask
=0; iTask
<(pSorter
->nTask
-1); iTask
++){
2534 if( (pIncr
= pMain
->aReadr
[iTask
].pIncr
) ){
2535 vdbeIncrMergerSetThreads(pIncr
);
2536 assert( pIncr
->pTask
!=pLast
);
2539 for(iTask
=0; rc
==SQLITE_OK
&& iTask
<pSorter
->nTask
; iTask
++){
2542 ** a) The incremental merge object is configured to use the
2544 ** b) If it is using task (nTask-1), it is configured to run
2545 ** in single-threaded mode. This is important, as the
2546 ** root merge (INCRINIT_ROOT) will be using the same task
2549 PmaReader
*p
= &pMain
->aReadr
[iTask
];
2550 assert( p
->pIncr
==0 || (
2551 (p
->pIncr
->pTask
==&pSorter
->aTask
[iTask
]) /* a */
2552 && (iTask
!=pSorter
->nTask
-1 || p
->pIncr
->bUseThread
==0) /* b */
2554 rc
= vdbePmaReaderIncrInit(p
, INCRINIT_TASK
);
2559 if( rc
==SQLITE_OK
){
2560 rc
= vdbePmaReaderIncrMergeInit(pReadr
, INCRINIT_ROOT
);
2565 rc
= vdbeMergeEngineInit(pTask0
, pMain
, INCRINIT_NORMAL
);
2566 pSorter
->pMerger
= pMain
;
2571 if( rc
!=SQLITE_OK
){
2572 vdbeMergeEngineFree(pMain
);
2579 ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
2580 ** this function is called to prepare for iterating through the records
2583 int sqlite3VdbeSorterRewind(const VdbeCursor
*pCsr
, int *pbEof
){
2584 VdbeSorter
*pSorter
;
2585 int rc
= SQLITE_OK
; /* Return code */
2587 assert( pCsr
->eCurType
==CURTYPE_SORTER
);
2588 pSorter
= pCsr
->uc
.pSorter
;
2591 /* If no data has been written to disk, then do not do so now. Instead,
2592 ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
2593 ** from the in-memory list. */
2594 if( pSorter
->bUsePMA
==0 ){
2595 if( pSorter
->list
.pList
){
2597 rc
= vdbeSorterSort(&pSorter
->aTask
[0], &pSorter
->list
);
2604 /* Write the current in-memory list to a PMA. When the VdbeSorterWrite()
2605 ** function flushes the contents of memory to disk, it immediately always
2606 ** creates a new list consisting of a single key immediately afterwards.
2607 ** So the list is never empty at this point. */
2608 assert( pSorter
->list
.pList
);
2609 rc
= vdbeSorterFlushPMA(pSorter
);
2611 /* Join all threads */
2612 rc
= vdbeSorterJoinAll(pSorter
, rc
);
2614 vdbeSorterRewindDebug("rewind");
2616 /* Assuming no errors have occurred, set up a merger structure to
2617 ** incrementally read and merge all remaining PMAs. */
2618 assert( pSorter
->pReader
==0 );
2619 if( rc
==SQLITE_OK
){
2620 rc
= vdbeSorterSetupMerge(pSorter
);
2624 vdbeSorterRewindDebug("rewinddone");
2629 ** Advance to the next element in the sorter. Return value:
2631 ** SQLITE_OK success
2632 ** SQLITE_DONE end of data
2633 ** otherwise some kind of error.
2635 int sqlite3VdbeSorterNext(sqlite3
*db
, const VdbeCursor
*pCsr
){
2636 VdbeSorter
*pSorter
;
2637 int rc
; /* Return code */
2639 assert( pCsr
->eCurType
==CURTYPE_SORTER
);
2640 pSorter
= pCsr
->uc
.pSorter
;
2641 assert( pSorter
->bUsePMA
|| (pSorter
->pReader
==0 && pSorter
->pMerger
==0) );
2642 if( pSorter
->bUsePMA
){
2643 assert( pSorter
->pReader
==0 || pSorter
->pMerger
==0 );
2644 assert( pSorter
->bUseThreads
==0 || pSorter
->pReader
);
2645 assert( pSorter
->bUseThreads
==1 || pSorter
->pMerger
);
2646 #if SQLITE_MAX_WORKER_THREADS>0
2647 if( pSorter
->bUseThreads
){
2648 rc
= vdbePmaReaderNext(pSorter
->pReader
);
2649 if( rc
==SQLITE_OK
&& pSorter
->pReader
->pFd
==0 ) rc
= SQLITE_DONE
;
2652 /*if( !pSorter->bUseThreads )*/ {
2654 assert( pSorter
->pMerger
!=0 );
2655 assert( pSorter
->pMerger
->pTask
==(&pSorter
->aTask
[0]) );
2656 rc
= vdbeMergeEngineStep(pSorter
->pMerger
, &res
);
2657 if( rc
==SQLITE_OK
&& res
) rc
= SQLITE_DONE
;
2660 SorterRecord
*pFree
= pSorter
->list
.pList
;
2661 pSorter
->list
.pList
= pFree
->u
.pNext
;
2663 if( pSorter
->list
.aMemory
==0 ) vdbeSorterRecordFree(db
, pFree
);
2664 rc
= pSorter
->list
.pList
? SQLITE_OK
: SQLITE_DONE
;
2670 ** Return a pointer to a buffer owned by the sorter that contains the
2673 static void *vdbeSorterRowkey(
2674 const VdbeSorter
*pSorter
, /* Sorter object */
2675 int *pnKey
/* OUT: Size of current key in bytes */
2678 if( pSorter
->bUsePMA
){
2680 #if SQLITE_MAX_WORKER_THREADS>0
2681 if( pSorter
->bUseThreads
){
2682 pReader
= pSorter
->pReader
;
2685 /*if( !pSorter->bUseThreads )*/{
2686 pReader
= &pSorter
->pMerger
->aReadr
[pSorter
->pMerger
->aTree
[1]];
2688 *pnKey
= pReader
->nKey
;
2689 pKey
= pReader
->aKey
;
2691 *pnKey
= pSorter
->list
.pList
->nVal
;
2692 pKey
= SRVAL(pSorter
->list
.pList
);
2698 ** Copy the current sorter key into the memory cell pOut.
2700 int sqlite3VdbeSorterRowkey(const VdbeCursor
*pCsr
, Mem
*pOut
){
2701 VdbeSorter
*pSorter
;
2702 void *pKey
; int nKey
; /* Sorter key to copy into pOut */
2704 assert( pCsr
->eCurType
==CURTYPE_SORTER
);
2705 pSorter
= pCsr
->uc
.pSorter
;
2706 pKey
= vdbeSorterRowkey(pSorter
, &nKey
);
2707 if( sqlite3VdbeMemClearAndResize(pOut
, nKey
) ){
2708 return SQLITE_NOMEM_BKPT
;
2711 MemSetTypeFlag(pOut
, MEM_Blob
);
2712 memcpy(pOut
->z
, pKey
, nKey
);
2718 ** Compare the key in memory cell pVal with the key that the sorter cursor
2719 ** passed as the first argument currently points to. For the purposes of
2720 ** the comparison, ignore the rowid field at the end of each record.
2722 ** If the sorter cursor key contains any NULL values, consider it to be
2723 ** less than pVal. Even if pVal also contains NULL values.
2725 ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM).
2726 ** Otherwise, set *pRes to a negative, zero or positive value if the
2727 ** key in pVal is smaller than, equal to or larger than the current sorter
2730 ** This routine forms the core of the OP_SorterCompare opcode, which in
2731 ** turn is used to verify uniqueness when constructing a UNIQUE INDEX.
2733 int sqlite3VdbeSorterCompare(
2734 const VdbeCursor
*pCsr
, /* Sorter cursor */
2735 Mem
*pVal
, /* Value to compare to current sorter key */
2736 int nKeyCol
, /* Compare this many columns */
2737 int *pRes
/* OUT: Result of comparison */
2739 VdbeSorter
*pSorter
;
2743 void *pKey
; int nKey
; /* Sorter key to compare pVal with */
2745 assert( pCsr
->eCurType
==CURTYPE_SORTER
);
2746 pSorter
= pCsr
->uc
.pSorter
;
2747 r2
= pSorter
->pUnpacked
;
2748 pKeyInfo
= pCsr
->pKeyInfo
;
2750 r2
= pSorter
->pUnpacked
= sqlite3VdbeAllocUnpackedRecord(pKeyInfo
);
2751 if( r2
==0 ) return SQLITE_NOMEM_BKPT
;
2752 r2
->nField
= nKeyCol
;
2754 assert( r2
->nField
==nKeyCol
);
2756 pKey
= vdbeSorterRowkey(pSorter
, &nKey
);
2757 sqlite3VdbeRecordUnpack(pKeyInfo
, nKey
, pKey
, r2
);
2758 for(i
=0; i
<nKeyCol
; i
++){
2759 if( r2
->aMem
[i
].flags
& MEM_Null
){
2765 *pRes
= sqlite3VdbeRecordCompare(pVal
->n
, pVal
->z
, r2
);