Add the SQLITE_MAX_WORKER_THREADS compile time option. And the SQLITE_CONFIG_WORKER_THREADS sqlite3_config() switch.

FossilOrigin-Name: 2774710df8cd2bfaca49888c69f1b01c0ddadf9a
This commit is contained in:
dan 2014-03-31 19:57:34 +00:00
parent 853c4a7621
commit b3f56fdb69
8 changed files with 90 additions and 42 deletions

View File

@ -1,5 +1,5 @@
C Fix\sa\sbroken\sassert()\sin\svdbesort.c.
D 2014-03-29T10:01:58.802
C Add\sthe\sSQLITE_MAX_WORKER_THREADS\scompile\stime\soption.\sAnd\sthe\sSQLITE_CONFIG_WORKER_THREADS\ssqlite3_config()\sswitch.
D 2014-03-31T19:57:34.075
F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125
F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@ -177,7 +177,7 @@ F src/expr.c da2b3cb41081af6b56e95e7c9e95949564ce2e21
F src/fault.c 160a0c015b6c2629d3899ed2daf63d75754a32bb
F src/fkey.c 5269ef07b100763134f71b889327c333bd0989cf
F src/func.c 2945bb2c4cdc0ac43733046285a4434310be1811
F src/global.c 1d7bb7ea8254ae6a68ed9bfaf65fcb3d1690b486
F src/global.c 57d9dd92f4e2469cf4046847f456d45bdda0f202
F src/hash.c d139319967164f139c8d1bb8a11b14db9c4ba3cd
F src/hash.h 8890a25af81fb85a9ad7790d32eedab4b994da22
F src/hwtime.h d32741c8f4df852c7d959236615444e2b1063b08
@ -186,7 +186,7 @@ F src/journal.c b4124532212b6952f42eb2c12fa3c25701d8ba8d
F src/legacy.c 0df0b1550b9cc1f58229644735e317ac89131f12
F src/lempar.c cdf0a000315332fc9b50b62f3b5e22e080a0952b
F src/loadext.c 867c7b330b740c6c917af9956b13b81d0a048303
F src/main.c 691b25754bef596108fe60ff1bcbe8445369c9db
F src/main.c d3655832585baef4c2356529a5c6ca5ca3bd7c1f
F src/malloc.c 0203ebce9152c6a0e5de520140b8ba65187350be
F src/mem0.c 6a55ebe57c46ca1a7d98da93aaa07f99f1059645
F src/mem1.c c0c990fcaddff810ea277b4fb5d9138603dd5d4b
@ -219,10 +219,10 @@ F src/resolve.c 273d5f47c4e2c05b2d3d2bffeda939551ab59e66
F src/rowset.c 64655f1a627c9c212d9ab497899e7424a34222e0
F src/select.c 20055cf917222e660c4222fea306bd13a0623caa
F src/shell.c f48b63f8e582e7998ecefd051d697f91fb1453df
F src/sqlite.h.in a2ef671f92747a5a1c8a47bad5c585a8dd9eca80
F src/sqlite.h.in 0249af5d9d3bbeab0dc1f58e1f9fee878807732a
F src/sqlite3.rc 11094cc6a157a028b301a9f06b3d03089ea37c3e
F src/sqlite3ext.h 886f5a34de171002ad46fae8c36a7d8051c190fc
F src/sqliteInt.h 3f5190a4e07ca227035334da8d66ebe227071528
F src/sqliteInt.h 7f42c2792b951db22fa189bbed828a5e3b38789c
F src/sqliteLimit.h 164b0e6749d31e0daa1a4589a169d31c0dec7b3d
F src/status.c 7ac05a5c7017d0b9f0b4bcd701228b784f987158
F src/table.c 2cd62736f845d82200acfa1287e33feb3c15d62e
@ -272,7 +272,7 @@ F src/test_thread.c 1e133a40b50e9c035b00174035b846e7eef481cb
F src/test_vfs.c e72f555ef7a59080f898fcf1a233deb9eb704ea9
F src/test_vfstrace.c 3a0ab304682fecbceb689e7d9b904211fde11d78
F src/test_wsd.c 41cadfd9d97fe8e3e4e44f61a4a8ccd6f7ca8fe9
F src/threads.c b96d62f88c06d4fa980a4a92685d1b130c4c84d3
F src/threads.c 6992f70cab8d5d8451a6b5641a9256d1749af87b
F src/tokenize.c 6da2de6e12218ccb0aea5184b56727d011f4bee7
F src/trigger.c 66f3470b03b52b395e839155786966e3e037fddb
F src/update.c 5b3e74a03b3811e586b4f2b4cbd7c49f01c93115
@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4
F src/vdbeaux.c 1153175fb57a8454e1c8cf79b59b7bf92b26779d
F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa
F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447
F src/vdbesort.c 2881297f4acdba5908078c5d7f00635288a1ca08
F src/vdbesort.c b4d6133bada297e118492420346f83cd76c6da31
F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767
F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd
F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8
@ -1160,7 +1160,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1
F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01
F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff
P a683c05f6250389e84b980b16559e162ba1a27c2
R d8e63408790a442d33eec2a57272c54d
P 18d1b402f2dbe78f1a1113bb356b710e348365ef
R 8b3347b8372cc17a3226330524ab6da6
U dan
Z 6c5e71d99f167a1ded5f09353b3e0513
Z 11cb5db8cbfdddf6047ddaf9c26850df

View File

@ -1 +1 @@
18d1b402f2dbe78f1a1113bb356b710e348365ef
2774710df8cd2bfaca49888c69f1b01c0ddadf9a

View File

@ -167,6 +167,7 @@ SQLITE_WSD struct Sqlite3Config sqlite3Config = {
0, /* nPage */
0, /* mxParserStack */
0, /* sharedCacheEnabled */
SQLITE_MAX_WORKER_THREADS, /* nWorker */
/* All the rest should always be initialized to zero */
0, /* isInit */
0, /* inProgress */

View File

@ -515,6 +515,13 @@ int sqlite3_config(int op, ...){
}
#endif
case SQLITE_CONFIG_WORKER_THREADS: {
int n = va_arg(ap, int);
if( n>SQLITE_MAX_WORKER_THREADS ) n = SQLITE_MAX_WORKER_THREADS;
if( n>=0 ) sqlite3GlobalConfig.nWorker = n;
break;
}
default: {
rc = SQLITE_ERROR;
break;

View File

@ -1715,6 +1715,16 @@ struct sqlite3_mem_methods {
** SQLITE_CONFIG_WIN32_HEAPSIZE takes a 32-bit unsigned integer value
** that specifies the maximum size of the created heap.
** </dl>
**
** [[SQLITE_CONFIG_WORKER_THREADS]]
** <dt>SQLITE_CONFIG_WORKER_THREADS
** <dd>^SQLITE_CONFIG_WORKER_THREADS takes a single argument of type int.
** It is used to set the number of background worker threads that may be
** launched when sorting large amounts of data. A value of 0 means launch
** no background threads at all. The maximum number of background threads
** allowed is configured at build-time by the SQLITE_MAX_WORKER_THREADS
** pre-processor option.
** </dl>
*/
#define SQLITE_CONFIG_SINGLETHREAD 1 /* nil */
#define SQLITE_CONFIG_MULTITHREAD 2 /* nil */
@ -1739,6 +1749,7 @@ struct sqlite3_mem_methods {
#define SQLITE_CONFIG_SQLLOG 21 /* xSqllog, void* */
#define SQLITE_CONFIG_MMAP_SIZE 22 /* sqlite3_int64, sqlite3_int64 */
#define SQLITE_CONFIG_WIN32_HEAPSIZE 23 /* int nByte */
#define SQLITE_CONFIG_WORKER_THREADS 24 /* int nWorker */
/*
** CAPI3REF: Database Connection Configuration Options

View File

@ -422,6 +422,19 @@
# define SQLITE_TEMP_STORE_xc 1 /* Exclude from ctime.c */
#endif
/*
** If no value has been provided for SQLITE_MAX_WORKER_THREADS, or if
** SQLITE_TEMP_STORE is set to 3 (never use temporary files), set it
** to zero.
*/
#if SQLITE_TEMP_STORE==3
# undef SQLITE_MAX_WORKER_THREADS
#endif
#ifndef SQLITE_MAX_WORKER_THREADS
# define SQLITE_MAX_WORKER_THREADS 0
#endif
/*
** GCC does not define the offsetof() macro so we'll have to do it
** ourselves.
@ -2685,6 +2698,7 @@ struct Sqlite3Config {
int nPage; /* Number of pages in pPage[] */
int mxParserStack; /* maximum depth of the parser stack */
int sharedCacheEnabled; /* true if shared-cache mode enabled */
int nWorker; /* Number of worker threads to use */
/* The above might be initialized to non-zero. The following need to always
** initially be zero, however. */
int isInit; /* True after initialization has finished */

View File

@ -27,6 +27,8 @@
*/
#include "sqliteInt.h"
#if SQLITE_MAX_WORKER_THREADS>0
/********************************* Unix Pthreads ****************************/
#if SQLITE_OS_UNIX && defined(SQLITE_MUTEX_PTHREADS) && SQLITE_THREADSAFE>0
@ -215,3 +217,4 @@ int sqlite3ThreadJoin(SQLiteThread *p, void **ppOut){
#endif /* !defined(SQLITE_THREADS_IMPLEMENTED) */
/****************************** End Single-Threaded *************************/
#endif /* SQLITE_MAX_WORKER_THREADS>0 */

View File

@ -26,14 +26,6 @@ typedef struct SorterMerger SorterMerger;
typedef struct FileWriter FileWriter;
/*
** Maximum number of threads to use. Setting this value to 1 forces all
** operations to be single-threaded.
*/
#ifndef SQLITE_MAX_SORTER_THREAD
# define SQLITE_MAX_SORTER_THREAD 4
#endif
/*
** Candidate values for SorterThread.eWork
*/
@ -48,9 +40,10 @@ typedef struct FileWriter FileWriter;
** is configured and passed to vdbeSorterThreadMain() - either directly by
** the main thread or via a background thread.
**
** Exactly SQLITE_MAX_SORTER_THREAD instances of this structure are allocated
** Exactly SorterThread.nThread instances of this structure are allocated
** as part of each VdbeSorter object. Instances are never allocated any other
** way.
** way. SorterThread.nThread is set to the number of worker threads allowed
** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
**
** When a background thread is launched to perform work, SorterThread.bDone
** is set to 0 and the SorterThread.pThread variable set to point to the
@ -59,7 +52,7 @@ typedef struct FileWriter FileWriter;
** exits. SorterThread.pThread and bDone are always cleared after the
** background thread has been joined.
**
** One object (specifically, VdbeSorter.aThread[SQLITE_MAX_SORTER_THREAD-1])
** One object (specifically, VdbeSorter.aThread[SorterThread.nThread-1])
** is reserved for the foreground thread.
**
** The nature of the work performed is determined by SorterThread.eWork,
@ -187,7 +180,8 @@ struct VdbeSorter {
u8 *aMemory; /* Block of memory to alloc records from */
int iMemory; /* Offset of first free byte in aMemory */
int nMemory; /* Size of aMemory allocation in bytes */
SorterThread aThread[SQLITE_MAX_SORTER_THREAD];
int nThread; /* Size of aThread[] array */
SorterThread aThread[1];
};
/*
@ -572,7 +566,7 @@ static int vdbeSorterDoCompare(
iRes = i1;
}else{
int res;
assert( pThread->pUnpacked!=0 ); /* allocated in vdbeSorterMerge() */
assert( pThread->pUnpacked!=0 ); /* allocated in vdbeSorterThreadMain() */
vdbeSorterCompare(
pThread, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
);
@ -597,21 +591,26 @@ int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){
VdbeSorter *pSorter; /* The new sorter */
KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */
int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */
int sz; /* Size of pSorter in bytes */
int rc = SQLITE_OK;
int nWorker = (sqlite3GlobalConfig.bCoreMutex?sqlite3GlobalConfig.nWorker:0);
assert( pCsr->pKeyInfo && pCsr->pBt==0 );
szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sizeof(VdbeSorter)+szKeyInfo);
sz = sizeof(VdbeSorter) + nWorker * sizeof(SorterThread);
pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
pCsr->pSorter = pSorter;
if( pSorter==0 ){
rc = SQLITE_NOMEM;
}else{
pKeyInfo = (KeyInfo*)&pSorter[1];
pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
pKeyInfo->db = 0;
pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
pSorter->nThread = nWorker + 1;
for(i=0; i<pSorter->nThread; i++){
SorterThread *pThread = &pSorter->aThread[i];
pThread->pKeyInfo = pKeyInfo;
pThread->pVfs = db->pVfs;
@ -674,10 +673,11 @@ static void vdbeSorterThreadCleanup(sqlite3 *db, SorterThread *pThread){
/*
** Join all threads.
*/
#if SQLITE_MAX_WORKER_THREADS>0
static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
int rc = rcin;
int i;
for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
for(i=0; i<pSorter->nThread; i++){
SorterThread *pThread = &pSorter->aThread[i];
if( pThread->pThread ){
void *pRet;
@ -690,6 +690,9 @@ static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
}
return rc;
}
#else
# define vdbeSorterJoinAll(x,rcin) (rcin)
#endif
/*
** Allocate a new SorterMerger object with space for nIter iterators.
@ -739,7 +742,7 @@ static void vdbeSorterMergerFree(SorterMerger *pMerger){
void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
int i;
vdbeSorterJoinAll(pSorter, SQLITE_OK);
for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
for(i=0; i<pSorter->nThread; i++){
SorterThread *pThread = &pSorter->aThread[i];
vdbeSorterThreadCleanup(db, pThread);
}
@ -1246,8 +1249,9 @@ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
SorterThread *pThread; /* Thread context used to create new PMA */
pSorter->bUsePMA = 1;
for(i=0; ALWAYS( i<SQLITE_MAX_SORTER_THREAD ); i++){
for(i=0; ALWAYS( i<pSorter->nThread ); i++){
pThread = &pSorter->aThread[i];
#if SQLITE_MAX_WORKER_THREADS>0
if( pThread->bDone ){
void *pRet;
assert( pThread->pThread );
@ -1258,11 +1262,12 @@ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
rc = SQLITE_PTR_TO_INT(pRet);
}
}
#endif
if( pThread->pThread==0 ) break;
}
if( rc==SQLITE_OK ){
int bUseFg = (bFg || i==(SQLITE_MAX_SORTER_THREAD-1));
int bUseFg = (bFg || i==(pSorter->nThread-1));
assert( pThread->pThread==0 && pThread->bDone==0 );
pThread->eWork = SORTER_THREAD_TO_PMA;
@ -1277,6 +1282,7 @@ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
pSorter->aMemory = aMem;
}
#if SQLITE_MAX_WORKER_THREADS>0
if( bUseFg==0 ){
/* Launch a background thread for this operation */
void *pCtx = (void*)pThread;
@ -1290,7 +1296,9 @@ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
}
}
rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSorterThreadMain, pCtx);
}else{
}else
#endif
{
/* Use the foreground thread for this operation */
u8 *aMem;
rc = vdbeSorterRunThread(pThread);
@ -1370,7 +1378,9 @@ int sqlite3VdbeSorterWrite(
aNew = sqlite3Realloc(pSorter->aMemory, nNew);
if( !aNew ) return SQLITE_NOMEM;
pSorter->pRecord = aNew + ((u8*)pSorter->pRecord - pSorter->aMemory);
pSorter->pRecord = (SorterRecord*)(
aNew + ((u8*)pSorter->pRecord - pSorter->aMemory)
);
pSorter->aMemory = aNew;
pSorter->nMemory = nNew;
}
@ -1399,7 +1409,7 @@ int sqlite3VdbeSorterWrite(
static int vdbeSorterCountPMA(VdbeSorter *pSorter){
int nPMA = 0;
int i;
for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
for(i=0; i<pSorter->nThread; i++){
nPMA += pSorter->aThread[i].nPMA;
}
return nPMA;
@ -1446,19 +1456,21 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
/* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
** some of them together so that this is no longer the case. */
assert( SORTER_MAX_MERGE_COUNT>=SQLITE_MAX_SORTER_THREAD );
if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
int i;
for(i=0; rc==SQLITE_OK && i<SQLITE_MAX_SORTER_THREAD; i++){
for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){
SorterThread *pThread = &pSorter->aThread[i];
if( pThread->pTemp1 ){
pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/SQLITE_MAX_SORTER_THREAD;
pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/pSorter->nThread;
pThread->eWork = SORTER_THREAD_CONS;
if( i<(SQLITE_MAX_SORTER_THREAD-1) ){
#if SQLITE_MAX_WORKER_THREADS>0
if( i<(pSorter->nThread-1) ){
void *pCtx = (void*)pThread;
rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx);
}else{
}else
#endif
{
rc = vdbeSorterRunThread(pThread);
}
}
@ -1475,7 +1487,7 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
int nIter = 0; /* Number of iterators used */
int i;
SorterMerger *pMerger;
for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
for(i=0; i<pSorter->nThread; i++){
nIter += pSorter->aThread[i].nPMA;
}
@ -1485,7 +1497,7 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
}else{
int iIter = 0;
int iThread = 0;
for(iThread=0; iThread<SQLITE_MAX_SORTER_THREAD; iThread++){
for(iThread=0; iThread<pSorter->nThread; iThread++){
int iPMA;
i64 iReadOff = 0;
SorterThread *pThread = &pSorter->aThread[iThread];