3 ** This file is broken into three semi-autonomous parts:
5 ** 1. The database functions.
6 ** 2. The thread wrappers.
7 ** 3. The implementation of the mt1.* tests.
10 /*************************************************************************
13 ** The database contains up to N key/value pairs, where N is some large
14 ** number (say 10,000,000). Keys are integer values between 0 and (N-1).
15 ** The value associated with each key is a pseudo-random blob of data.
17 ** Key/value pair keys are encoded as the two bytes "k." followed by a
18 ** 10-digit decimal number. i.e. key 45 -> "k.0000000045".
20 ** As well as the key/value pairs, the database also contains checksum
21 ** entries. The checksums form a hierarchy - for every F key/value
22 ** entries there is one level 1 checksum. And for each F level 1 checksums
23 ** there is one level 2 checksum. And so on.
25 ** Checksum keys are encoded as the two byte "c." followed by the
26 ** checksum level, followed by a 10 digit decimal number containing
27 ** the value of the first key that contributes to the checksum value.
28 ** For example, assuming F==10, the level 1 checksum that spans keys
29 ** 10 to 19 is "c.1.0000000010".
31 ** Clients may perform one of two operations on the database: a read
36 ** A read operation scans a range of F key/value pairs. It computes
37 ** the expected checksum and then compares the computed value to the
38 ** actual value stored in the level 1 checksum entry. It then scans
39 ** the group of F level 1 checksums, and compares the computed checksum
40 ** to the associated level 2 checksum value, and so on until the
41 ** highest level checksum value has been verified.
43 ** If a checksum ever fails to match the expected value, the test
48 ** A write operation involves writing (possibly clobbering) a single
49 ** key/value pair. The associated level 1 checksum is then recalculated
50 ** updated. Then the level 2 checksum, and so on until the highest
51 ** level checksum has been modified.
53 ** All updates occur inside a single transaction.
57 ** The interface used by test cases to read and write the db consists
58 ** of type DbParameters and the following functions:
66 typedef struct DbParameters DbParameters
;
68 int nFanout
; /* Checksum fanout (F) */
69 int nKey
; /* Size of key space (N) */
72 #define DB_KEY_BYTES (2+5+10+1)
75 ** Argument aBuf[] must point to a buffer at least DB_KEY_BYTES in size.
76 ** This function populates the buffer with a nul-terminated key string
77 ** corresponding to key iKey.
79 static void dbFormatKey(
82 int iKey
, /* Key value */
83 char *aBuf
/* Write key string here */
86 snprintf(aBuf
, DB_KEY_BYTES
, "k.%.10d", iKey
);
90 for(i
=0; i
<iLevel
; i
++) f
= f
* pParam
->nFanout
;
91 snprintf(aBuf
, DB_KEY_BYTES
, "c.%d.%.10d", iLevel
, f
*(iKey
/f
));
96 ** Argument aBuf[] must point to a buffer at least DB_KEY_BYTES in size.
97 ** This function populates the buffer with the string representation of
98 ** checksum value iVal.
100 static void dbFormatCksumValue(u32 iVal
, char *aBuf
){
101 snprintf(aBuf
, DB_KEY_BYTES
, "%.10u", iVal
);
105 ** Return the highest level of checksum in the database described
108 static int dbMaxLevel(DbParameters
*pParam
){
111 for(iMax
=0; n
<pParam
->nKey
; iMax
++){
112 n
= n
* pParam
->nFanout
;
118 void *pCtx
, /* IN/OUT: Pointer to u32 containing cksum */
119 void *pKey
, int nKey
, /* Database key. Unused. */
120 void *pVal
, int nVal
/* Database value. Checksum this. */
122 u8
*aVal
= (u8
*)pVal
;
123 u32
*pCksum
= (u32
*)pCtx
;
127 unused_parameter(pKey
);
128 unused_parameter(nKey
);
130 for(i
=0; i
<nVal
; i
++){
131 cksum
+= (cksum
<<3) + (int)aVal
[i
];
138 ** Compute the value of the checksum stored on level iLevel that contains
139 ** data from key iKey by scanning the pParam->nFanout entries at level
142 static u32
dbComputeCksum(
143 DbParameters
*pParam
, /* Database parameters */
144 TestDb
*pDb
, /* Database connection handle */
145 int iLevel
, /* Level of checksum to compute */
146 int iKey
, /* Compute checksum for this key */
147 int *pRc
/* IN/OUT: Error code */
157 char zFirst
[DB_KEY_BYTES
];
158 char zLast
[DB_KEY_BYTES
];
161 for(i
=0; i
<iLevel
; i
++) f
= f
* pParam
->nFanout
;
164 iLast
= iFirst
+ f
- 1;
165 dbFormatKey(pParam
, iLevel
-1, iFirst
, zFirst
);
166 dbFormatKey(pParam
, iLevel
-1, iLast
, zLast
);
167 nFirst
= strlen(zFirst
);
168 nLast
= strlen(zLast
);
170 *pRc
= tdb_scan(pDb
, (u32
*)&cksum
, 0, zFirst
, nFirst
, zLast
, nLast
,dbCksum
);
176 static void dbReadOperation(
177 DbParameters
*pParam
, /* Database parameters */
178 TestDb
*pDb
, /* Database connection handle */
179 void (*xDelay
)(void *),
181 int iKey
, /* Key to read */
182 int *pRc
/* IN/OUT: Error code */
184 const int iMax
= dbMaxLevel(pParam
);
187 if( tdb_transaction_support(pDb
) ) testBegin(pDb
, 1, pRc
);
188 for(i
=1; *pRc
==0 && i
<=iMax
; i
++){
189 char zCksum
[DB_KEY_BYTES
];
190 char zKey
[DB_KEY_BYTES
];
193 iCksum
= dbComputeCksum(pParam
, pDb
, i
, iKey
, pRc
);
195 if( xDelay
&& i
==1 ) xDelay(pDelayCtx
);
196 dbFormatCksumValue(iCksum
, zCksum
);
197 dbFormatKey(pParam
, i
, iKey
, zKey
);
198 testFetchStr(pDb
, zKey
, zCksum
, pRc
);
201 if( tdb_transaction_support(pDb
) ) testCommit(pDb
, 0, pRc
);
204 static int dbWriteOperation(
205 DbParameters
*pParam
, /* Database parameters */
206 TestDb
*pDb
, /* Database connection handle */
207 int iKey
, /* Key to write to */
208 const char *zValue
, /* Nul-terminated value to write */
209 int *pRc
/* IN/OUT: Error code */
211 const int iMax
= dbMaxLevel(pParam
);
212 char zKey
[DB_KEY_BYTES
];
216 assert( iKey
>=0 && iKey
<pParam
->nKey
);
217 dbFormatKey(pParam
, 0, iKey
, zKey
);
219 /* Open a write transaction. This may fail - SQLITE4_BUSY */
220 if( *pRc
==0 && tdb_transaction_support(pDb
) ){
221 rc
= tdb_begin(pDb
, 2);
222 if( rc
==5 ) return 0;
226 testWriteStr(pDb
, zKey
, zValue
, pRc
);
227 for(i
=1; i
<=iMax
; i
++){
228 char zCksum
[DB_KEY_BYTES
];
231 iCksum
= dbComputeCksum(pParam
, pDb
, i
, iKey
, pRc
);
232 dbFormatCksumValue(iCksum
, zCksum
);
233 dbFormatKey(pParam
, i
, iKey
, zKey
);
234 testWriteStr(pDb
, zKey
, zCksum
, pRc
);
236 if( tdb_transaction_support(pDb
) ) testCommit(pDb
, 0, pRc
);
240 /*************************************************************************
241 ** The following block contains testXXX() functions that implement a
242 ** wrapper around the systems native multi-thread support. There are no
243 ** synchronization primitives - just functions to launch and join
244 ** threads. Wrapper functions are:
246 ** testThreadSupport()
249 ** testThreadShutdown()
250 ** testThreadLaunch()
253 ** testThreadSetHalt()
254 ** testThreadGetHalt()
255 ** testThreadSetResult()
256 ** testThreadGetResult()
258 ** testThreadEnterMutex()
259 ** testThreadLeaveMutex()
261 typedef struct ThreadSet ThreadSet
;
262 #ifdef LSM_MUTEX_PTHREADS
267 typedef struct Thread Thread
;
272 void (*xMain
)(ThreadSet
*, int, void *);
274 ThreadSet
*pThreadSet
;
278 int bHalt
; /* Halt flag */
279 int nThread
; /* Number of threads */
280 Thread
*aThread
; /* Array of Thread structures */
281 pthread_mutex_t mutex
; /* Mutex used for cheating */
285 ** Return true if this build supports threads, or false otherwise. If
286 ** this function returns false, no other testThreadXXX() functions should
289 static int testThreadSupport(){ return 1; }
292 ** Allocate and return a thread-set handle with enough space allocated
293 ** to handle up to nMax threads. Each call to this function should be
294 ** matched by a call to testThreadShutdown() to delete the object.
296 static ThreadSet
*testThreadInit(int nMax
){
297 int nByte
; /* Total space to allocate */
298 ThreadSet
*p
; /* Return value */
300 nByte
= sizeof(ThreadSet
) + sizeof(struct Thread
) * nMax
;
301 p
= (ThreadSet
*)testMalloc(nByte
);
303 p
->aThread
= (Thread
*)&p
[1];
304 pthread_mutex_init(&p
->mutex
, 0);
310 ** Delete a thread-set object and release all resources held by it.
312 static void testThreadShutdown(ThreadSet
*p
){
314 for(i
=0; i
<p
->nThread
; i
++){
315 testFree(p
->aThread
[i
].zMsg
);
317 pthread_mutex_destroy(&p
->mutex
);
321 static void *ttMain(void *pArg
){
322 Thread
*pThread
= (Thread
*)pArg
;
324 iThread
= (pThread
- pThread
->pThreadSet
->aThread
);
325 pThread
->xMain(pThread
->pThreadSet
, iThread
, pThread
->pCtx
);
330 ** Launch a new thread.
332 static int testThreadLaunch(
335 void (*xMain
)(ThreadSet
*, int, void *),
341 assert( iThread
>=0 && iThread
<p
->nThread
);
343 pThread
= &p
->aThread
[iThread
];
344 assert( pThread
->pThreadSet
==0 );
345 pThread
->xMain
= xMain
;
346 pThread
->pCtx
= pCtx
;
347 pThread
->pThreadSet
= p
;
348 rc
= pthread_create(&pThread
->id
, 0, ttMain
, (void *)pThread
);
354 ** Set the thread-set "halt" flag.
356 static void testThreadSetHalt(ThreadSet
*pThreadSet
){
357 pThreadSet
->bHalt
= 1;
361 ** Return the current value of the thread-set "halt" flag.
363 static int testThreadGetHalt(ThreadSet
*pThreadSet
){
364 return pThreadSet
->bHalt
;
367 static void testThreadSleep(ThreadSet
*pThreadSet
, int nMs
){
369 while( nRem
>0 && testThreadGetHalt(pThreadSet
)==0 ){
376 ** Wait for all threads launched to finish before returning. If nMs
377 ** is greater than zero, set the "halt" flag to tell all threads
378 ** to halt after waiting nMs milliseconds.
380 static void testThreadWait(ThreadSet
*pThreadSet
, int nMs
){
383 testThreadSleep(pThreadSet
, nMs
);
384 testThreadSetHalt(pThreadSet
);
385 for(i
=0; i
<pThreadSet
->nThread
; i
++){
386 Thread
*pThread
= &pThreadSet
->aThread
[i
];
387 if( pThread
->xMain
){
388 pthread_join(pThread
->id
, 0);
394 ** Set the result for thread iThread.
396 static void testThreadSetResult(
397 ThreadSet
*pThreadSet
, /* Thread-set handle */
398 int iThread
, /* Set result for this thread */
399 int rc
, /* Result error code */
400 char *zFmt
, /* Result string format */
401 ... /* Result string formatting args... */
405 testFree(pThreadSet
->aThread
[iThread
].zMsg
);
406 pThreadSet
->aThread
[iThread
].rc
= rc
;
407 pThreadSet
->aThread
[iThread
].zMsg
= 0;
410 pThreadSet
->aThread
[iThread
].zMsg
= testMallocVPrintf(zFmt
, ap
);
416 ** Retrieve the result for thread iThread.
418 static int testThreadGetResult(
419 ThreadSet
*pThreadSet
, /* Thread-set handle */
420 int iThread
, /* Get result for this thread */
421 const char **pzRes
/* OUT: Pointer to result string */
423 if( pzRes
) *pzRes
= pThreadSet
->aThread
[iThread
].zMsg
;
424 return pThreadSet
->aThread
[iThread
].rc
;
428 ** Enter and leave the test case mutex.
431 static void testThreadEnterMutex(ThreadSet
*p
){
432 pthread_mutex_lock(&p
->mutex
);
434 static void testThreadLeaveMutex(ThreadSet
*p
){
435 pthread_mutex_unlock(&p
->mutex
);
440 #if !defined(LSM_MUTEX_PTHREADS)
441 static int testThreadSupport(){ return 0; }
443 #define testThreadInit(a) 0
444 #define testThreadShutdown(a)
445 #define testThreadLaunch(a,b,c,d) 0
446 #define testThreadWait(a,b)
447 #define testThreadSetHalt(a)
448 #define testThreadGetHalt(a) 0
449 #define testThreadGetResult(a,b,c) 0
450 #define testThreadSleep(a,b) 0
452 static void testThreadSetResult(ThreadSet
*a
, int b
, int c
, char *d
, ...){
459 /* End of threads wrapper.
460 *************************************************************************/
462 /*************************************************************************
463 ** Below this point is the third part of this file - the implementation
464 ** of the mt1.* tests.
466 typedef struct Mt1Test Mt1Test
;
468 DbParameters param
; /* Description of database to read/write */
469 int nReadwrite
; /* Number of read/write threads */
470 int nFastReader
; /* Number of fast reader threads */
471 int nSlowReader
; /* Number of slow reader threads */
472 int nMs
; /* How long to run for */
473 const char *zSystem
; /* Database system to test */
476 typedef struct Mt1DelayCtx Mt1DelayCtx
;
478 ThreadSet
*pSet
; /* Threadset to sleep within */
479 int nMs
; /* Sleep in ms */
482 static void xMt1Delay(void *pCtx
){
483 Mt1DelayCtx
*p
= (Mt1DelayCtx
*)pCtx
;
484 testThreadSleep(p
->pSet
, p
->nMs
);
487 #define MT1_THREAD_RDWR 0
488 #define MT1_THREAD_SLOW 1
489 #define MT1_THREAD_FAST 2
491 static void xMt1Work(lsm_db
*pDb
, void *pCtx
){
494 lsm_info(pDb
, LSM_INFO_DB_STRUCTURE
, &z
);
501 ** This is the main() proc for all threads in test case "mt1".
503 static void mt1Main(ThreadSet
*pThreadSet
, int iThread
, void *pCtx
){
504 Mt1Test
*p
= (Mt1Test
*)pCtx
; /* Test parameters */
506 int nRead
= 0; /* Number of calls to dbReadOperation() */
507 int nWrite
= 0; /* Number of completed database writes */
508 int rc
= 0; /* Error code */
509 int iPrng
; /* Prng argument variable */
510 TestDb
*pDb
; /* Database handle */
513 delay
.pSet
= pThreadSet
;
515 if( iThread
<p
->nReadwrite
){
516 eType
= MT1_THREAD_RDWR
;
517 }else if( iThread
<(p
->nReadwrite
+p
->nFastReader
) ){
518 eType
= MT1_THREAD_FAST
;
520 eType
= MT1_THREAD_SLOW
;
521 delay
.nMs
= (p
->nMs
/ 20);
524 /* Open a new database connection. Initialize the pseudo-random number
525 ** argument based on the thread number. */
526 iPrng
= testPrngValue(iThread
);
527 pDb
= testOpen(p
->zSystem
, 0, &rc
);
530 tdb_lsm_config_work_hook(pDb
, xMt1Work
, 0);
533 /* Loop until either an error occurs or some other thread sets the
535 while( rc
==0 && testThreadGetHalt(pThreadSet
)==0 ){
538 /* Perform a read operation on an arbitrarily selected key. */
539 iKey
= (testPrngValue(iPrng
++) % p
->param
.nKey
);
540 dbReadOperation(&p
->param
, pDb
, xMt1Delay
, (void *)&delay
, iKey
, &rc
);
544 /* Attempt to write an arbitrary key value pair (and update the associated
545 ** checksum entries). dbWriteOperation() returns 1 if the write is
546 ** successful, or 0 if it failed with an LSM_BUSY error. */
547 if( eType
==MT1_THREAD_RDWR
){
551 iKey
= (testPrngValue(iPrng
++) % p
->param
.nKey
);
552 testPrngString(iPrng
, aRnd
, sizeof(aRnd
));
553 iPrng
+= sizeof(aRnd
);
554 snprintf(aValue
, sizeof(aValue
), "%d.%s", iThread
, aRnd
);
555 nWrite
+= dbWriteOperation(&p
->param
, pDb
, iKey
, aValue
, &rc
);
560 /* If an error has occured, set the thread error code and the threadset
561 ** halt flag to tell the other test threads to halt. Otherwise, set the
562 ** thread error code to 0 and post a message with the number of read
563 ** and write operations completed. */
565 testThreadSetResult(pThreadSet
, iThread
, rc
, 0);
566 testThreadSetHalt(pThreadSet
);
568 testThreadSetResult(pThreadSet
, iThread
, 0, "r/w: %d/%d", nRead
, nWrite
);
572 static void do_test_mt1(
573 const char *zSystem
, /* Database system name */
574 const char *zPattern
, /* Run test cases that match this pattern */
575 int *pRc
/* IN/OUT: Error code */
578 /* param, nReadwrite, nFastReader, nSlowReader, nMs, zSystem */
579 { {10, 1000}, 4, 0, 0, 10000, 0 },
580 { {10, 1000}, 4, 4, 2, 100000, 0 },
581 { {10, 100000}, 4, 0, 0, 10000, 0 },
582 { {10, 100000}, 4, 4, 2, 100000, 0 },
586 for(i
=0; *pRc
==0 && i
<ArraySize(aTest
); i
++){
587 Mt1Test
*p
= &aTest
[i
];
588 int bRun
= testCaseBegin(pRc
, zPattern
,
589 "mt1.%s.db=%d,%d.ms=%d.rdwr=%d.fast=%d.slow=%d",
590 zSystem
, p
->param
.nFanout
, p
->param
.nKey
,
591 p
->nMs
, p
->nReadwrite
, p
->nFastReader
, p
->nSlowReader
599 p
->zSystem
= zSystem
;
600 pDb
= testOpen(zSystem
, 1, pRc
);
602 nThread
= p
->nReadwrite
+ p
->nFastReader
+ p
->nSlowReader
;
603 pSet
= testThreadInit(nThread
);
604 for(iThread
=0; *pRc
==0 && iThread
<nThread
; iThread
++){
605 testThreadLaunch(pSet
, iThread
, mt1Main
, (void *)p
);
608 testThreadWait(pSet
, p
->nMs
);
609 for(iThread
=0; *pRc
==0 && iThread
<nThread
; iThread
++){
610 *pRc
= testThreadGetResult(pSet
, iThread
, 0);
612 testCaseFinish(*pRc
);
614 for(iThread
=0; *pRc
==0 && iThread
<nThread
; iThread
++){
615 const char *zMsg
= 0;
616 *pRc
= testThreadGetResult(pSet
, iThread
, &zMsg
);
617 printf(" Info: thread %d (%d): %s\n", iThread
, *pRc
, zMsg
);
620 testThreadShutdown(pSet
);
627 const char *zSystem
, /* Database system name */
628 const char *zPattern
, /* Run test cases that match this pattern */
629 int *pRc
/* IN/OUT: Error code */
631 if( testThreadSupport()==0 ) return;
632 do_test_mt1(zSystem
, zPattern
, pRc
);