Fix the ".lint fkey-indexes" shell command so that it works with WITHOUT ROWID
[sqlite.git] / ext / lsm1 / lsm-test / lsmtest5.c
blobf36184e79bf029e059f365aa79b1768754049f87
2 /*
3 ** This file is broken into three semi-autonomous parts:
4 **
5 ** 1. The database functions.
6 ** 2. The thread wrappers.
7 ** 3. The implementation of the mt1.* tests.
8 */
10 /*************************************************************************
11 ** DATABASE CONTENTS:
13 ** The database contains up to N key/value pairs, where N is some large
14 ** number (say 10,000,000). Keys are integer values between 0 and (N-1).
15 ** The value associated with each key is a pseudo-random blob of data.
17 ** Key/value pair keys are encoded as the two bytes "k." followed by a
18 ** 10-digit decimal number. i.e. key 45 -> "k.0000000045".
20 ** As well as the key/value pairs, the database also contains checksum
21 ** entries. The checksums form a hierarchy - for every F key/value
22 ** entries there is one level 1 checksum. And for each F level 1 checksums
23 ** there is one level 2 checksum. And so on.
25 ** Checksum keys are encoded as the two byte "c." followed by the
26 ** checksum level, followed by a 10 digit decimal number containing
27 ** the value of the first key that contributes to the checksum value.
28 ** For example, assuming F==10, the level 1 checksum that spans keys
29 ** 10 to 19 is "c.1.0000000010".
31 ** Clients may perform one of two operations on the database: a read
32 ** or a write.
33 **
34 ** READ OPERATIONS:
36 ** A read operation scans a range of F key/value pairs. It computes
37 ** the expected checksum and then compares the computed value to the
38 ** actual value stored in the level 1 checksum entry. It then scans
39 ** the group of F level 1 checksums, and compares the computed checksum
40 ** to the associated level 2 checksum value, and so on until the
41 ** highest level checksum value has been verified.
43 ** If a checksum ever fails to match the expected value, the test
44 ** has failed.
46 ** WRITE OPERATIONS:
48 ** A write operation involves writing (possibly clobbering) a single
49 ** key/value pair. The associated level 1 checksum is then recalculated
50 ** updated. Then the level 2 checksum, and so on until the highest
51 ** level checksum has been modified.
53 ** All updates occur inside a single transaction.
55 ** INTERFACE:
57 ** The interface used by test cases to read and write the db consists
58 ** of type DbParameters and the following functions:
60 ** dbReadOperation()
61 ** dbWriteOperation()
64 #include "lsmtest.h"
66 typedef struct DbParameters DbParameters;
67 struct DbParameters {
68 int nFanout; /* Checksum fanout (F) */
69 int nKey; /* Size of key space (N) */
72 #define DB_KEY_BYTES (2+5+10+1)
75 ** Argument aBuf[] must point to a buffer at least DB_KEY_BYTES in size.
76 ** This function populates the buffer with a nul-terminated key string
77 ** corresponding to key iKey.
79 static void dbFormatKey(
80 DbParameters *pParam,
81 int iLevel,
82 int iKey, /* Key value */
83 char *aBuf /* Write key string here */
85 if( iLevel==0 ){
86 snprintf(aBuf, DB_KEY_BYTES, "k.%.10d", iKey);
87 }else{
88 int f = 1;
89 int i;
90 for(i=0; i<iLevel; i++) f = f * pParam->nFanout;
91 snprintf(aBuf, DB_KEY_BYTES, "c.%d.%.10d", iLevel, f*(iKey/f));
96 ** Argument aBuf[] must point to a buffer at least DB_KEY_BYTES in size.
97 ** This function populates the buffer with the string representation of
98 ** checksum value iVal.
100 static void dbFormatCksumValue(u32 iVal, char *aBuf){
101 snprintf(aBuf, DB_KEY_BYTES, "%.10u", iVal);
105 ** Return the highest level of checksum in the database described
106 ** by *pParam.
108 static int dbMaxLevel(DbParameters *pParam){
109 int iMax;
110 int n = 1;
111 for(iMax=0; n<pParam->nKey; iMax++){
112 n = n * pParam->nFanout;
114 return iMax;
117 static void dbCksum(
118 void *pCtx, /* IN/OUT: Pointer to u32 containing cksum */
119 void *pKey, int nKey, /* Database key. Unused. */
120 void *pVal, int nVal /* Database value. Checksum this. */
122 u8 *aVal = (u8 *)pVal;
123 u32 *pCksum = (u32 *)pCtx;
124 u32 cksum = *pCksum;
125 int i;
127 unused_parameter(pKey);
128 unused_parameter(nKey);
130 for(i=0; i<nVal; i++){
131 cksum += (cksum<<3) + (int)aVal[i];
134 *pCksum = cksum;
138 ** Compute the value of the checksum stored on level iLevel that contains
139 ** data from key iKey by scanning the pParam->nFanout entries at level
140 ** iLevel-1.
142 static u32 dbComputeCksum(
143 DbParameters *pParam, /* Database parameters */
144 TestDb *pDb, /* Database connection handle */
145 int iLevel, /* Level of checksum to compute */
146 int iKey, /* Compute checksum for this key */
147 int *pRc /* IN/OUT: Error code */
149 u32 cksum = 0;
150 if( *pRc==0 ){
151 int nFirst;
152 int nLast;
153 int iFirst = 0;
154 int iLast = 0;
155 int i;
156 int f = 1;
157 char zFirst[DB_KEY_BYTES];
158 char zLast[DB_KEY_BYTES];
160 assert( iLevel>=1 );
161 for(i=0; i<iLevel; i++) f = f * pParam->nFanout;
163 iFirst = f*(iKey/f);
164 iLast = iFirst + f - 1;
165 dbFormatKey(pParam, iLevel-1, iFirst, zFirst);
166 dbFormatKey(pParam, iLevel-1, iLast, zLast);
167 nFirst = strlen(zFirst);
168 nLast = strlen(zLast);
170 *pRc = tdb_scan(pDb, (u32*)&cksum, 0, zFirst, nFirst, zLast, nLast,dbCksum);
173 return cksum;
176 static void dbReadOperation(
177 DbParameters *pParam, /* Database parameters */
178 TestDb *pDb, /* Database connection handle */
179 void (*xDelay)(void *),
180 void *pDelayCtx,
181 int iKey, /* Key to read */
182 int *pRc /* IN/OUT: Error code */
184 const int iMax = dbMaxLevel(pParam);
185 int i;
187 if( tdb_transaction_support(pDb) ) testBegin(pDb, 1, pRc);
188 for(i=1; *pRc==0 && i<=iMax; i++){
189 char zCksum[DB_KEY_BYTES];
190 char zKey[DB_KEY_BYTES];
191 u32 iCksum = 0;
193 iCksum = dbComputeCksum(pParam, pDb, i, iKey, pRc);
194 if( iCksum ){
195 if( xDelay && i==1 ) xDelay(pDelayCtx);
196 dbFormatCksumValue(iCksum, zCksum);
197 dbFormatKey(pParam, i, iKey, zKey);
198 testFetchStr(pDb, zKey, zCksum, pRc);
201 if( tdb_transaction_support(pDb) ) testCommit(pDb, 0, pRc);
204 static int dbWriteOperation(
205 DbParameters *pParam, /* Database parameters */
206 TestDb *pDb, /* Database connection handle */
207 int iKey, /* Key to write to */
208 const char *zValue, /* Nul-terminated value to write */
209 int *pRc /* IN/OUT: Error code */
211 const int iMax = dbMaxLevel(pParam);
212 char zKey[DB_KEY_BYTES];
213 int i;
214 int rc;
216 assert( iKey>=0 && iKey<pParam->nKey );
217 dbFormatKey(pParam, 0, iKey, zKey);
219 /* Open a write transaction. This may fail - SQLITE4_BUSY */
220 if( *pRc==0 && tdb_transaction_support(pDb) ){
221 rc = tdb_begin(pDb, 2);
222 if( rc==5 ) return 0;
223 *pRc = rc;
226 testWriteStr(pDb, zKey, zValue, pRc);
227 for(i=1; i<=iMax; i++){
228 char zCksum[DB_KEY_BYTES];
229 u32 iCksum = 0;
231 iCksum = dbComputeCksum(pParam, pDb, i, iKey, pRc);
232 dbFormatCksumValue(iCksum, zCksum);
233 dbFormatKey(pParam, i, iKey, zKey);
234 testWriteStr(pDb, zKey, zCksum, pRc);
236 if( tdb_transaction_support(pDb) ) testCommit(pDb, 0, pRc);
237 return 1;
240 /*************************************************************************
241 ** The following block contains testXXX() functions that implement a
242 ** wrapper around the systems native multi-thread support. There are no
243 ** synchronization primitives - just functions to launch and join
244 ** threads. Wrapper functions are:
246 ** testThreadSupport()
248 ** testThreadInit()
249 ** testThreadShutdown()
250 ** testThreadLaunch()
251 ** testThreadWait()
253 ** testThreadSetHalt()
254 ** testThreadGetHalt()
255 ** testThreadSetResult()
256 ** testThreadGetResult()
258 ** testThreadEnterMutex()
259 ** testThreadLeaveMutex()
261 typedef struct ThreadSet ThreadSet;
262 #ifdef LSM_MUTEX_PTHREADS
264 #include <pthread.h>
265 #include <unistd.h>
267 typedef struct Thread Thread;
268 struct Thread {
269 int rc;
270 char *zMsg;
271 pthread_t id;
272 void (*xMain)(ThreadSet *, int, void *);
273 void *pCtx;
274 ThreadSet *pThreadSet;
277 struct ThreadSet {
278 int bHalt; /* Halt flag */
279 int nThread; /* Number of threads */
280 Thread *aThread; /* Array of Thread structures */
281 pthread_mutex_t mutex; /* Mutex used for cheating */
285 ** Return true if this build supports threads, or false otherwise. If
286 ** this function returns false, no other testThreadXXX() functions should
287 ** be called.
289 static int testThreadSupport(){ return 1; }
292 ** Allocate and return a thread-set handle with enough space allocated
293 ** to handle up to nMax threads. Each call to this function should be
294 ** matched by a call to testThreadShutdown() to delete the object.
296 static ThreadSet *testThreadInit(int nMax){
297 int nByte; /* Total space to allocate */
298 ThreadSet *p; /* Return value */
300 nByte = sizeof(ThreadSet) + sizeof(struct Thread) * nMax;
301 p = (ThreadSet *)testMalloc(nByte);
302 p->nThread = nMax;
303 p->aThread = (Thread *)&p[1];
304 pthread_mutex_init(&p->mutex, 0);
306 return p;
310 ** Delete a thread-set object and release all resources held by it.
312 static void testThreadShutdown(ThreadSet *p){
313 int i;
314 for(i=0; i<p->nThread; i++){
315 testFree(p->aThread[i].zMsg);
317 pthread_mutex_destroy(&p->mutex);
318 testFree(p);
321 static void *ttMain(void *pArg){
322 Thread *pThread = (Thread *)pArg;
323 int iThread;
324 iThread = (pThread - pThread->pThreadSet->aThread);
325 pThread->xMain(pThread->pThreadSet, iThread, pThread->pCtx);
326 return 0;
330 ** Launch a new thread.
332 static int testThreadLaunch(
333 ThreadSet *p,
334 int iThread,
335 void (*xMain)(ThreadSet *, int, void *),
336 void *pCtx
338 int rc;
339 Thread *pThread;
341 assert( iThread>=0 && iThread<p->nThread );
343 pThread = &p->aThread[iThread];
344 assert( pThread->pThreadSet==0 );
345 pThread->xMain = xMain;
346 pThread->pCtx = pCtx;
347 pThread->pThreadSet = p;
348 rc = pthread_create(&pThread->id, 0, ttMain, (void *)pThread);
350 return rc;
354 ** Set the thread-set "halt" flag.
356 static void testThreadSetHalt(ThreadSet *pThreadSet){
357 pThreadSet->bHalt = 1;
361 ** Return the current value of the thread-set "halt" flag.
363 static int testThreadGetHalt(ThreadSet *pThreadSet){
364 return pThreadSet->bHalt;
367 static void testThreadSleep(ThreadSet *pThreadSet, int nMs){
368 int nRem = nMs;
369 while( nRem>0 && testThreadGetHalt(pThreadSet)==0 ){
370 usleep(50000);
371 nRem -= 50;
376 ** Wait for all threads launched to finish before returning. If nMs
377 ** is greater than zero, set the "halt" flag to tell all threads
378 ** to halt after waiting nMs milliseconds.
380 static void testThreadWait(ThreadSet *pThreadSet, int nMs){
381 int i;
383 testThreadSleep(pThreadSet, nMs);
384 testThreadSetHalt(pThreadSet);
385 for(i=0; i<pThreadSet->nThread; i++){
386 Thread *pThread = &pThreadSet->aThread[i];
387 if( pThread->xMain ){
388 pthread_join(pThread->id, 0);
394 ** Set the result for thread iThread.
396 static void testThreadSetResult(
397 ThreadSet *pThreadSet, /* Thread-set handle */
398 int iThread, /* Set result for this thread */
399 int rc, /* Result error code */
400 char *zFmt, /* Result string format */
401 ... /* Result string formatting args... */
403 va_list ap;
405 testFree(pThreadSet->aThread[iThread].zMsg);
406 pThreadSet->aThread[iThread].rc = rc;
407 pThreadSet->aThread[iThread].zMsg = 0;
408 if( zFmt ){
409 va_start(ap, zFmt);
410 pThreadSet->aThread[iThread].zMsg = testMallocVPrintf(zFmt, ap);
411 va_end(ap);
416 ** Retrieve the result for thread iThread.
418 static int testThreadGetResult(
419 ThreadSet *pThreadSet, /* Thread-set handle */
420 int iThread, /* Get result for this thread */
421 const char **pzRes /* OUT: Pointer to result string */
423 if( pzRes ) *pzRes = pThreadSet->aThread[iThread].zMsg;
424 return pThreadSet->aThread[iThread].rc;
428 ** Enter and leave the test case mutex.
430 #if 0
431 static void testThreadEnterMutex(ThreadSet *p){
432 pthread_mutex_lock(&p->mutex);
434 static void testThreadLeaveMutex(ThreadSet *p){
435 pthread_mutex_unlock(&p->mutex);
437 #endif
438 #endif
440 #if !defined(LSM_MUTEX_PTHREADS)
441 static int testThreadSupport(){ return 0; }
443 #define testThreadInit(a) 0
444 #define testThreadShutdown(a)
445 #define testThreadLaunch(a,b,c,d) 0
446 #define testThreadWait(a,b)
447 #define testThreadSetHalt(a)
448 #define testThreadGetHalt(a) 0
449 #define testThreadGetResult(a,b,c) 0
450 #define testThreadSleep(a,b) 0
452 static void testThreadSetResult(ThreadSet *a, int b, int c, char *d, ...){
453 unused_parameter(a);
454 unused_parameter(b);
455 unused_parameter(c);
456 unused_parameter(d);
458 #endif
459 /* End of threads wrapper.
460 *************************************************************************/
462 /*************************************************************************
463 ** Below this point is the third part of this file - the implementation
464 ** of the mt1.* tests.
466 typedef struct Mt1Test Mt1Test;
467 struct Mt1Test {
468 DbParameters param; /* Description of database to read/write */
469 int nReadwrite; /* Number of read/write threads */
470 int nFastReader; /* Number of fast reader threads */
471 int nSlowReader; /* Number of slow reader threads */
472 int nMs; /* How long to run for */
473 const char *zSystem; /* Database system to test */
476 typedef struct Mt1DelayCtx Mt1DelayCtx;
477 struct Mt1DelayCtx {
478 ThreadSet *pSet; /* Threadset to sleep within */
479 int nMs; /* Sleep in ms */
482 static void xMt1Delay(void *pCtx){
483 Mt1DelayCtx *p = (Mt1DelayCtx *)pCtx;
484 testThreadSleep(p->pSet, p->nMs);
487 #define MT1_THREAD_RDWR 0
488 #define MT1_THREAD_SLOW 1
489 #define MT1_THREAD_FAST 2
491 static void xMt1Work(lsm_db *pDb, void *pCtx){
492 #if 0
493 char *z = 0;
494 lsm_info(pDb, LSM_INFO_DB_STRUCTURE, &z);
495 printf("%s\n", z);
496 fflush(stdout);
497 #endif
501 ** This is the main() proc for all threads in test case "mt1".
503 static void mt1Main(ThreadSet *pThreadSet, int iThread, void *pCtx){
504 Mt1Test *p = (Mt1Test *)pCtx; /* Test parameters */
505 Mt1DelayCtx delay;
506 int nRead = 0; /* Number of calls to dbReadOperation() */
507 int nWrite = 0; /* Number of completed database writes */
508 int rc = 0; /* Error code */
509 int iPrng; /* Prng argument variable */
510 TestDb *pDb; /* Database handle */
511 int eType;
513 delay.pSet = pThreadSet;
514 delay.nMs = 0;
515 if( iThread<p->nReadwrite ){
516 eType = MT1_THREAD_RDWR;
517 }else if( iThread<(p->nReadwrite+p->nFastReader) ){
518 eType = MT1_THREAD_FAST;
519 }else{
520 eType = MT1_THREAD_SLOW;
521 delay.nMs = (p->nMs / 20);
524 /* Open a new database connection. Initialize the pseudo-random number
525 ** argument based on the thread number. */
526 iPrng = testPrngValue(iThread);
527 pDb = testOpen(p->zSystem, 0, &rc);
529 if( rc==0 ){
530 tdb_lsm_config_work_hook(pDb, xMt1Work, 0);
533 /* Loop until either an error occurs or some other thread sets the
534 ** halt flag. */
535 while( rc==0 && testThreadGetHalt(pThreadSet)==0 ){
536 int iKey;
538 /* Perform a read operation on an arbitrarily selected key. */
539 iKey = (testPrngValue(iPrng++) % p->param.nKey);
540 dbReadOperation(&p->param, pDb, xMt1Delay, (void *)&delay, iKey, &rc);
541 if( rc ) continue;
542 nRead++;
544 /* Attempt to write an arbitrary key value pair (and update the associated
545 ** checksum entries). dbWriteOperation() returns 1 if the write is
546 ** successful, or 0 if it failed with an LSM_BUSY error. */
547 if( eType==MT1_THREAD_RDWR ){
548 char aValue[50];
549 char aRnd[25];
551 iKey = (testPrngValue(iPrng++) % p->param.nKey);
552 testPrngString(iPrng, aRnd, sizeof(aRnd));
553 iPrng += sizeof(aRnd);
554 snprintf(aValue, sizeof(aValue), "%d.%s", iThread, aRnd);
555 nWrite += dbWriteOperation(&p->param, pDb, iKey, aValue, &rc);
558 testClose(&pDb);
560 /* If an error has occured, set the thread error code and the threadset
561 ** halt flag to tell the other test threads to halt. Otherwise, set the
562 ** thread error code to 0 and post a message with the number of read
563 ** and write operations completed. */
564 if( rc ){
565 testThreadSetResult(pThreadSet, iThread, rc, 0);
566 testThreadSetHalt(pThreadSet);
567 }else{
568 testThreadSetResult(pThreadSet, iThread, 0, "r/w: %d/%d", nRead, nWrite);
572 static void do_test_mt1(
573 const char *zSystem, /* Database system name */
574 const char *zPattern, /* Run test cases that match this pattern */
575 int *pRc /* IN/OUT: Error code */
577 Mt1Test aTest[] = {
578 /* param, nReadwrite, nFastReader, nSlowReader, nMs, zSystem */
579 { {10, 1000}, 4, 0, 0, 10000, 0 },
580 { {10, 1000}, 4, 4, 2, 100000, 0 },
581 { {10, 100000}, 4, 0, 0, 10000, 0 },
582 { {10, 100000}, 4, 4, 2, 100000, 0 },
584 int i;
586 for(i=0; *pRc==0 && i<ArraySize(aTest); i++){
587 Mt1Test *p = &aTest[i];
588 int bRun = testCaseBegin(pRc, zPattern,
589 "mt1.%s.db=%d,%d.ms=%d.rdwr=%d.fast=%d.slow=%d",
590 zSystem, p->param.nFanout, p->param.nKey,
591 p->nMs, p->nReadwrite, p->nFastReader, p->nSlowReader
593 if( bRun ){
594 TestDb *pDb;
595 ThreadSet *pSet;
596 int iThread;
597 int nThread;
599 p->zSystem = zSystem;
600 pDb = testOpen(zSystem, 1, pRc);
602 nThread = p->nReadwrite + p->nFastReader + p->nSlowReader;
603 pSet = testThreadInit(nThread);
604 for(iThread=0; *pRc==0 && iThread<nThread; iThread++){
605 testThreadLaunch(pSet, iThread, mt1Main, (void *)p);
608 testThreadWait(pSet, p->nMs);
609 for(iThread=0; *pRc==0 && iThread<nThread; iThread++){
610 *pRc = testThreadGetResult(pSet, iThread, 0);
612 testCaseFinish(*pRc);
614 for(iThread=0; *pRc==0 && iThread<nThread; iThread++){
615 const char *zMsg = 0;
616 *pRc = testThreadGetResult(pSet, iThread, &zMsg);
617 printf(" Info: thread %d (%d): %s\n", iThread, *pRc, zMsg);
620 testThreadShutdown(pSet);
621 testClose(&pDb);
626 void test_mt(
627 const char *zSystem, /* Database system name */
628 const char *zPattern, /* Run test cases that match this pattern */
629 int *pRc /* IN/OUT: Error code */
631 if( testThreadSupport()==0 ) return;
632 do_test_mt1(zSystem, zPattern, pRc);