Add a new algorithm for handling INSERT which reduces fragmentation on

a VACUUM.  Ticket #2075.  More testing needed. (CVS 3643)

FossilOrigin-Name: 9f56a878cbbc715262b3a48ee696148dbd7bf1d2
This commit is contained in:
drh 2007-02-13 15:01:11 +00:00
parent 1f9a746277
commit 9d9cf22971
3 changed files with 460 additions and 59 deletions

View File

@ -1,5 +1,5 @@
C Changes\sto\sthe\sscript\sthat\sgenerates\sdownload.html\sso\sthat\sit\srecognizes\nFTS2\smodules.\s(CVS\s3642)
D 2007-02-13T14:11:46
C Add\sa\snew\salgorithm\sfor\shandling\sINSERT\swhich\sreduces\sfragmentation\son\na\sVACUUM.\s\sTicket\s#2075.\s\sMore\stesting\sneeded.\s(CVS\s3643)
D 2007-02-13T15:01:11
F Makefile.in 7fa74bf4359aa899da5586e394d17735f221315f
F Makefile.linux-gcc 2d8574d1ba75f129aba2019f0b959db380a90935
F README 9c4e2d6706bdcc3efdd773ce752a8cdab4f90028
@ -69,7 +69,7 @@ F src/expr.c dfd25ae8f8f2ebf3d8dea605a5cea959946aabb7
F src/func.c b7e1e220a6795ecae7649815145ea5f8644dfa5f
F src/hash.c 449f3d6620193aa557f5d86cbc5cc6b87702b185
F src/hash.h 1b3f7e2609141fd571f62199fc38687d262e9564
F src/insert.c e9526ced19978a55687b55faea969b6ff2a53fb4
F src/insert.c c7cb4894ec15ae1c66a33f3e7df9761794604241
F src/legacy.c 2631df6a861f830d6b1c0fe92b9fdd745b2c0cd6
F src/loadext.c bbfdbf452c71b6f2723375478a365788498ec3cd
F src/main.c 33c32014da3a1471e8869d2eba32b2c4314c39ce
@ -432,7 +432,7 @@ F www/tclsqlite.tcl bb0d1357328a42b1993d78573e587c6dcbc964b9
F www/vdbe.tcl 87a31ace769f20d3627a64fa1fade7fed47b90d0
F www/version3.tcl 890248cf7b70e60c383b0e84d77d5132b3ead42b
F www/whentouse.tcl 97e2b5cd296f7d8057e11f44427dea8a4c2db513
P 286c4eb30dfe1e8ef110cd2a214d71612a78b8cc
R ca69d7cc4c4b021b775504aabaf660cf
P 06c22de25472eba439d7f05879eb13756c4e34a8
R 8ff634a3ba4b64ceb6f5689bb048183d
U drh
Z c4493abcd34e0be6390ec09bf0882b33
Z 882c862138cfb7c09b76716a7634fa06

View File

@ -1 +1 @@
06c22de25472eba439d7f05879eb13756c4e34a8
9f56a878cbbc715262b3a48ee696148dbd7bf1d2

View File

@ -12,7 +12,7 @@
** This file contains C code routines that are called by the parser
** to handle INSERT statements in SQLite.
**
** $Id: insert.c,v 1.172 2006/08/29 18:46:14 drh Exp $
** $Id: insert.c,v 1.173 2007/02/13 15:01:11 drh Exp $
*/
#include "sqliteInt.h"
@ -118,6 +118,117 @@ static int selectReadsTable(Select *p, Schema *pSchema, int iTab){
return 0;
}
#ifndef SQLITE_OMIT_AUTOINCREMENT
/*
** Write out code to initialize the autoincrement logic. This code
** looks up the current autoincrement value in the sqlite_sequence
** table and stores that value in a memory cell. Code generated by
** autoIncStep() will keep that memory cell holding the largest
** rowid value. Code generated by autoIncEnd() will write the new
** largest value of the counter back into the sqlite_sequence table.
**
** This routine returns the index of the mem[] cell that contains
** the maximum rowid counter.
**
** Two memory cells are allocated. The next memory cell after the
** one returned holds the rowid in sqlite_sequence where we will
** write back the revised maximum rowid.
*/
static int autoIncBegin(
Parse *pParse, /* Parsing context */
int iDb, /* Index of the database holding pTab */
Table *pTab /* The table we are writing to */
){
int memId = 0;
if( pTab->autoInc ){
Vdbe *v = pParse->pVdbe;
Db *pDb = &pParse->db->aDb[iDb];
int iCur = pParse->nTab;
int addr;
assert( v );
addr = sqlite3VdbeCurrentAddr(v);
memId = pParse->nMem+1;
pParse->nMem += 2;
sqlite3OpenTable(pParse, iCur, iDb, pDb->pSchema->pSeqTab, OP_OpenRead);
sqlite3VdbeAddOp(v, OP_Rewind, iCur, addr+13);
sqlite3VdbeAddOp(v, OP_Column, iCur, 0);
sqlite3VdbeOp3(v, OP_String8, 0, 0, pTab->zName, 0);
sqlite3VdbeAddOp(v, OP_Ne, 0x100, addr+12);
sqlite3VdbeAddOp(v, OP_Rowid, iCur, 0);
sqlite3VdbeAddOp(v, OP_MemStore, memId-1, 1);
sqlite3VdbeAddOp(v, OP_Column, iCur, 1);
sqlite3VdbeAddOp(v, OP_MemStore, memId, 1);
sqlite3VdbeAddOp(v, OP_Goto, 0, addr+13);
sqlite3VdbeAddOp(v, OP_Next, iCur, addr+4);
sqlite3VdbeAddOp(v, OP_Close, iCur, 0);
}
return memId;
}
/*
** Update the maximum rowid for an autoincrement calculation.
**
** This routine should be called when the top of the stack holds a
** new rowid that is about to be inserted. If that new rowid is
** larger than the maximum rowid in the memId memory cell, then the
** memory cell is updated. The stack is unchanged.
*/
static void autoIncStep(Parse *pParse, int memId){
if( memId>0 ){
sqlite3VdbeAddOp(pParse->pVdbe, OP_MemMax, memId, 0);
}
}
/*
** After doing one or more inserts, the maximum rowid is stored
** in mem[memId]. Generate code to write this value back into the
** the sqlite_sequence table.
*/
static void autoIncEnd(
Parse *pParse, /* The parsing context */
int iDb, /* Index of the database holding pTab */
Table *pTab, /* Table we are inserting into */
int memId /* Memory cell holding the maximum rowid */
){
if( pTab->autoInc ){
int iCur = pParse->nTab;
Vdbe *v = pParse->pVdbe;
Db *pDb = &pParse->db->aDb[iDb];
int addr;
assert( v );
addr = sqlite3VdbeCurrentAddr(v);
sqlite3OpenTable(pParse, iCur, iDb, pDb->pSchema->pSeqTab, OP_OpenWrite);
sqlite3VdbeAddOp(v, OP_MemLoad, memId-1, 0);
sqlite3VdbeAddOp(v, OP_NotNull, -1, addr+7);
sqlite3VdbeAddOp(v, OP_Pop, 1, 0);
sqlite3VdbeAddOp(v, OP_NewRowid, iCur, 0);
sqlite3VdbeOp3(v, OP_String8, 0, 0, pTab->zName, 0);
sqlite3VdbeAddOp(v, OP_MemLoad, memId, 0);
sqlite3VdbeAddOp(v, OP_MakeRecord, 2, 0);
sqlite3VdbeAddOp(v, OP_Insert, iCur, 0);
sqlite3VdbeAddOp(v, OP_Close, iCur, 0);
}
}
#else
/*
** If SQLITE_OMIT_AUTOINCREMENT is defined, then the three routines
** above are all no-ops
*/
# define autoIncBegin(A,B,C) (0)
# define autoIncStep(A,B)
# define autoIncEnd(A,B,C,D)
#endif /* SQLITE_OMIT_AUTOINCREMENT */
/* Forward declaration */
static int xferOptimization(
Parse *pParse, /* Parser context */
Table *pDest, /* The table we are inserting into */
Select *pSelect, /* A SELECT statement to use as the data source */
int onError, /* How to handle constraint errors */
int iDbDest /* The database of pDest */
);
/*
** This routine is call to handle SQL of the following forms:
**
@ -133,7 +244,7 @@ static int selectReadsTable(Select *p, Schema *pSchema, int iTab){
** NULL and pSelect is a pointer to the select statement used to generate
** data for the insert.
**
** The code generated follows one of three templates. For a simple
** The code generated follows one of four templates. For a simple
** select with data coming from a VALUES clause, the code executes
** once straight down through. The template looks like this:
**
@ -142,16 +253,37 @@ static int selectReadsTable(Select *p, Schema *pSchema, int iTab){
** write the resulting record into <table>
** cleanup
**
** If the statement is of the form
** The three remaining templates assume the statement is of the form
**
** INSERT INTO <table> SELECT ...
**
** And the SELECT clause does not read from <table> at any time, then
** the generated code follows this template:
** If the SELECT clause is of the restricted form "SELECT * FROM <table2>" -
** in other words if the SELECT pulls all columns from a single table
** and there is no WHERE or LIMIT or GROUP BY or ORDER BY clauses, and
** if <table2> and <table1> are distinct tables but have identical
** schemas, including all the same indices, then a special optimization
** is invoked that copies raw records from <table2> over to <table1>.
** See the xferOptimization() function for the implementation of this
** template. This is the second template.
**
** open a write cursor to <table>
** open read cursor on <table2>
** transfer all records in <table2> over to <table>
** close cursors
** foreach index on <table>
** open a write cursor on the <table> index
** open a read cursor on the corresponding <table2> index
** transfer all records from the read to the write cursors
** close cursors
** end foreach
**
** The third template is for when the second template does not apply
** and the SELECT clause does not read from <table> at any time.
** The generated code follows this template:
**
** goto B
** A: setup for the SELECT
** loop over the tables in the SELECT
** loop over the rows in the SELECT
** gosub C
** end loop
** cleanup after the SELECT
@ -162,7 +294,7 @@ static int selectReadsTable(Select *p, Schema *pSchema, int iTab){
** return
** D: cleanup
**
** The third template is used if the insert statement takes its
** The fourth template is used if the insert statement takes its
** values from a SELECT but the data is being inserted into a table
** that is also read as part of the SELECT. In the third form,
** we have to use a intermediate table to store the results of
@ -221,10 +353,6 @@ void sqlite3Insert(
int triggers_exist = 0; /* True if there are FOR EACH ROW triggers */
#endif
#ifndef SQLITE_OMIT_AUTOINCREMENT
int counterRowid = 0; /* Memory cell holding rowid of autoinc counter */
#endif
if( pParse->nErr || sqlite3MallocFailed() ){
goto insert_cleanup;
}
@ -291,31 +419,27 @@ void sqlite3Insert(
newIdx = pParse->nTab++;
}
#ifndef SQLITE_OMIT_AUTOINCREMENT
#ifndef SQLITE_OMIT_XFER_OPT
/* If the statement is of the form
**
** INSERT INTO <table1> SELECT * FROM <table2>;
**
** Then special optimizations can be applied that make the transfer
** very fast and which reduce fragmentation of indices.
*/
if( pColumn==0 && xferOptimization(pParse, pTab, pSelect, onError, iDb) ){
assert( !triggers_exist );
assert( pList==0 );
goto insert_cleanup;
}
#endif /* SQLITE_OMIT_XFER_OPT */
/* If this is an AUTOINCREMENT table, look up the sequence number in the
** sqlite_sequence table and store it in memory cell counterMem. Also
** remember the rowid of the sqlite_sequence table entry in memory cell
** counterRowid.
*/
if( pTab->autoInc ){
int iCur = pParse->nTab;
int addr = sqlite3VdbeCurrentAddr(v);
counterRowid = pParse->nMem++;
counterMem = pParse->nMem++;
sqlite3OpenTable(pParse, iCur, iDb, pDb->pSchema->pSeqTab, OP_OpenRead);
sqlite3VdbeAddOp(v, OP_Rewind, iCur, addr+13);
sqlite3VdbeAddOp(v, OP_Column, iCur, 0);
sqlite3VdbeOp3(v, OP_String8, 0, 0, pTab->zName, 0);
sqlite3VdbeAddOp(v, OP_Ne, 0x100, addr+12);
sqlite3VdbeAddOp(v, OP_Rowid, iCur, 0);
sqlite3VdbeAddOp(v, OP_MemStore, counterRowid, 1);
sqlite3VdbeAddOp(v, OP_Column, iCur, 1);
sqlite3VdbeAddOp(v, OP_MemStore, counterMem, 1);
sqlite3VdbeAddOp(v, OP_Goto, 0, addr+13);
sqlite3VdbeAddOp(v, OP_Next, iCur, addr+4);
sqlite3VdbeAddOp(v, OP_Close, iCur, 0);
}
#endif /* SQLITE_OMIT_AUTOINCREMENT */
counterMem = autoIncBegin(pParse, iDb, pTab);
/* Figure out how many columns of data are supplied. If the data
** is coming from a SELECT statement, then this step also generates
@ -591,11 +715,7 @@ void sqlite3Insert(
}else{
sqlite3VdbeAddOp(v, OP_NewRowid, base, counterMem);
}
#ifndef SQLITE_OMIT_AUTOINCREMENT
if( pTab->autoInc ){
sqlite3VdbeAddOp(v, OP_MemMax, counterMem, 0);
}
#endif /* SQLITE_OMIT_AUTOINCREMENT */
autoIncStep(pParse, counterMem);
/* Push onto the stack, data for all columns of the new entry, beginning
** with the first column.
@ -688,26 +808,11 @@ void sqlite3Insert(
}
}
#ifndef SQLITE_OMIT_AUTOINCREMENT
/* Update the sqlite_sequence table by storing the content of the
** counter value in memory counterMem back into the sqlite_sequence
** table.
*/
if( pTab->autoInc ){
int iCur = pParse->nTab;
int addr = sqlite3VdbeCurrentAddr(v);
sqlite3OpenTable(pParse, iCur, iDb, pDb->pSchema->pSeqTab, OP_OpenWrite);
sqlite3VdbeAddOp(v, OP_MemLoad, counterRowid, 0);
sqlite3VdbeAddOp(v, OP_NotNull, -1, addr+7);
sqlite3VdbeAddOp(v, OP_Pop, 1, 0);
sqlite3VdbeAddOp(v, OP_NewRowid, iCur, 0);
sqlite3VdbeOp3(v, OP_String8, 0, 0, pTab->zName, 0);
sqlite3VdbeAddOp(v, OP_MemLoad, counterMem, 0);
sqlite3VdbeAddOp(v, OP_MakeRecord, 2, 0);
sqlite3VdbeAddOp(v, OP_Insert, iCur, 0);
sqlite3VdbeAddOp(v, OP_Close, iCur, 0);
}
#endif
autoIncEnd(pParse, iDb, pTab, counterMem);
/*
** Return the number of rows inserted. If this routine is
@ -1140,3 +1245,299 @@ void sqlite3OpenTableAndIndices(
pParse->nTab = base+i;
}
}
#ifndef SQLITE_OMIT_XFER_OPT
/*
** Check to collation names to see if they are compatible.
*/
static int xferCompatibleCollation(const char *z1, const char *z2){
if( z1==0 ){
return z2==0;
}
if( z2==0 ){
return 0;
}
return sqlite3StrICmp(z1, z2)==0;
}
/*
** Check to see if index pSrc is compatible as a source of data
** for index pDest in an insert transfer optimization. The rules
** for a compatible index:
**
** * The index is over the same set of columns
** * The same DESC and ASC markings occurs on all columns
** * The same onError processing (OE_Abort, OE_Ignore, etc)
** * The same collating sequence on each column
*/
static int xferCompatibleIndex(Index *pDest, Index *pSrc){
int i;
assert( pDest && pSrc );
assert( pDest->pTable!=pSrc->pTable );
if( pDest->nColumn!=pSrc->nColumn ){
return 0; /* Different number of columns */
}
if( pDest->onError!=pSrc->onError ){
return 0; /* Different conflict resolution strategies */
}
for(i=0; i<pSrc->nColumn; i++){
if( pSrc->aiColumn[i]!=pDest->aiColumn[i] ){
return 0; /* Different columns indexed */
}
if( pSrc->aSortOrder[i]!=pDest->aSortOrder[i] ){
return 0; /* Different sort orders */
}
if( pSrc->azColl[i]!=pDest->azColl[i] ){
return 0; /* Different sort orders */
}
}
/* If no test above fails then the indices must be compatible */
return 1;
}
/*
** Attempt the transfer optimization on INSERTs of the form
**
** INSERT INTO tab1 SELECT * FROM tab2;
**
** This optimization is only attempted if
**
** (1) tab1 and tab2 have identical schemas including all the
** same indices
**
** (2) tab1 and tab2 are different tables
**
** (3) There must be no triggers on tab1
**
** (4) The result set of the SELECT statement is "*"
**
** (5) The SELECT statement has no WHERE, HAVING, ORDER BY, GROUP BY,
** or LIMIT clause.
**
** (6) The SELECT statement is a simple (not a compound) select that
** contains only tab2 in its FROM clause
**
** This method for implementing the INSERT transfers raw records from
** tab2 over to tab1. The columns are not decoded. Raw records from
** the indices of tab2 are transfered to tab1 as well. In so doing,
** the resulting tab1 has much less fragmentation.
**
** This routine returns TRUE if the optimization is attempted. If any
** of the conditions above fail so that the optimization should not
** be attempted, then this routine returns FALSE.
*/
static int xferOptimization(
Parse *pParse, /* Parser context */
Table *pDest, /* The table we are inserting into */
Select *pSelect, /* A SELECT statement to use as the data source */
int onError, /* How to handle constraint errors */
int iDbDest /* The database of pDest */
){
ExprList *pEList; /* The result set of the SELECT */
Table *pSrc; /* The table in the FROM clause of SELECT */
Index *pSrcIdx, *pDestIdx; /* Source and destination indices */
struct SrcList_item *pItem; /* An element of pSelect->pSrc */
int i; /* Loop counter */
int iDbSrc; /* The database of pSrc */
int iSrc, iDest; /* Cursors from source and destination */
int addr1, addr2; /* Loop addresses */
int emptyDestTest; /* Address of test for empty pDest */
int emptySrcTest; /* Address of test for empty pSrc */
int memRowid; /* A memcell containing a rowid from pSrc */
Vdbe *v; /* The VDBE we are building */
KeyInfo *pKey; /* Key information for an index */
int counterMem; /* Memory register used by AUTOINC */
if( pSelect==0 ){
return 0; /* Must be of the form INSERT INTO ... SELECT ... */
}
if( pDest->pTrigger ){
return 0; /* tab1 must not have triggers */
}
#ifndef SQLITE_OMIT_VIRTUALTABLE
if( pDest->isVirtual ){
return 0; /* tab1 must not be a virtual table */
}
#endif
if( onError==OE_Default ){
onError = OE_Abort;
}
if( onError!=OE_Abort && onError!=OE_Rollback ){
return 0; /* Cannot do OR REPLACE or OR IGNORE or OR FAIL */
}
if( pSelect->pSrc==0 ){
return 0; /* SELECT must have a FROM clause */
}
if( pSelect->pSrc->nSrc!=1 ){
return 0; /* FROM clause must have exactly one term */
}
if( pSelect->pSrc->a[0].pSelect ){
return 0; /* FROM clause cannot contain a subquery */
}
if( pSelect->pWhere ){
return 0; /* SELECT may not have a WHERE clause */
}
if( pSelect->pOrderBy ){
return 0; /* SELECT may not have an ORDER BY clause */
}
if( pSelect->pHaving ){
return 0; /* SELECT may not have a HAVING clause */
}
if( pSelect->pGroupBy ){
return 0; /* SELECT may not have a GROUP BY clause */
}
if( pSelect->pLimit ){
return 0; /* SELECT may not have a LIMIT clause */
}
if( pSelect->pOffset ){
return 0; /* SELECT may not have an OFFSET clause */
}
if( pSelect->pPrior ){
return 0; /* SELECT may not be a compound query */
}
if( pSelect->isDistinct ){
return 0; /* SELECT may not be DISTINCT */
}
pEList = pSelect->pEList;
assert( pEList!=0 );
if( pEList->nExpr!=1 ){
return 0; /* The result set must have exactly one column */
}
assert( pEList->a[0].pExpr );
if( pEList->a[0].pExpr->op!=TK_ALL ){
return 0; /* The result set must be the special operator "*" */
}
/* At this point we have established that the statement is of the
** correct syntactic form to participate in this optimization. Now
** we have to check the semantics.
*/
pItem = pSelect->pSrc->a;
pSrc = sqlite3LocateTable(pParse, pItem->zName, pItem->zDatabase);
if( pSrc==0 ){
return 0; /* FROM clause does not contain a real table */
}
if( pSrc==pDest ){
return 0; /* tab1 and tab2 may not be the same table */
}
#ifndef SQLITE_OMIT_VIRTUALTABLE
if( pSrc->isVirtual ){
return 0; /* tab2 must not be a virtual table */
}
#endif
if( pSrc->pSelect ){
return 0; /* tab2 may not be a view */
}
if( pDest->nCol!=pSrc->nCol ){
return 0; /* Number of columns must be the same in tab1 and tab2 */
}
if( pDest->iPKey!=pSrc->iPKey ){
return 0; /* Both tables must have the same INTEGER PRIMARY KEY */
}
for(i=0; i<pDest->nCol; i++){
if( pDest->aCol[i].affinity!=pSrc->aCol[i].affinity ){
return 0; /* Affinity must be the same on all columns */
}
if( !xferCompatibleCollation(pDest->aCol[i].zColl, pSrc->aCol[i].zColl) ){
return 0; /* Collating sequence must be the same on all columns */
}
if( pDest->aCol[i].notNull && !pSrc->aCol[i].notNull ){
return 0; /* tab2 must be NOT NULL if tab1 is */
}
}
for(pDestIdx=pDest->pIndex; pDestIdx; pDestIdx=pDestIdx->pNext){
for(pSrcIdx=pSrc->pIndex; pSrcIdx; pSrcIdx=pSrcIdx->pNext){
if( xferCompatibleIndex(pDestIdx, pSrcIdx) ) break;
}
if( pSrcIdx==0 ){
return 0; /* pDestIdx has no corresponding index in pSrc */
}
}
/* If we get this far, it means either:
**
** * We can always do the transfer if the table contains an
** an integer primary key
**
** * We can conditionally do the transfer if the destination
** table is empty.
*/
iDbSrc = sqlite3SchemaToIndex(pParse->db, pSrc->pSchema);
v = sqlite3GetVdbe(pParse);
iSrc = pParse->nTab++;
iDest = pParse->nTab++;
counterMem = autoIncBegin(pParse, iDbDest, pDest);
sqlite3OpenTable(pParse, iDest, iDbDest, pDest, OP_OpenWrite);
if( pDest->iPKey<0 ){
/* The tables do not have an INTEGER PRIMARY KEY so that
** transfer optimization is only allowed if the destination
** table is initially empty
*/
addr1 = sqlite3VdbeAddOp(v, OP_Rewind, iDest, 0);
emptyDestTest = sqlite3VdbeAddOp(v, OP_Goto, 0, 0);
sqlite3VdbeJumpHere(v, addr1);
}else{
emptyDestTest = 0;
}
sqlite3OpenTable(pParse, iSrc, iDbSrc, pSrc, OP_OpenRead);
emptySrcTest = sqlite3VdbeAddOp(v, OP_Rewind, iSrc, 0);
memRowid = pParse->nMem++;
sqlite3VdbeAddOp(v, OP_Rowid, iSrc, 0);
sqlite3VdbeAddOp(v, OP_MemStore, memRowid, 1);
addr1 = sqlite3VdbeAddOp(v, OP_Rowid, iSrc, 0);
sqlite3VdbeAddOp(v, OP_Dup, 0, 0);
addr2 = sqlite3VdbeAddOp(v, OP_NotExists, iDest, 0);
sqlite3VdbeOp3(v, OP_Halt, SQLITE_CONSTRAINT, onError,
"PRIMARY KEY must be unique", P3_STATIC);
sqlite3VdbeJumpHere(v, addr2);
autoIncStep(pParse, counterMem);
sqlite3VdbeAddOp(v, OP_RowData, iSrc, 0);
sqlite3VdbeOp3(v, OP_Insert, iDest, OPFLAG_NCHANGE|OPFLAG_LASTROWID,
pDest->zName, 0);
sqlite3VdbeAddOp(v, OP_Next, iSrc, addr1);
autoIncEnd(pParse, iDbDest, pDest, counterMem);
for(pDestIdx=pDest->pIndex; pDestIdx; pDestIdx=pDestIdx->pNext){
for(pSrcIdx=pSrc->pIndex; pSrcIdx; pSrcIdx=pSrcIdx->pNext){
if( xferCompatibleIndex(pDestIdx, pSrcIdx) ) break;
}
assert( pSrcIdx );
sqlite3VdbeAddOp(v, OP_Close, iSrc, 0);
sqlite3VdbeAddOp(v, OP_Close, iDest, 0);
sqlite3VdbeAddOp(v, OP_Integer, iDbSrc, 0);
pKey = sqlite3IndexKeyinfo(pParse, pSrcIdx);
VdbeComment((v, "# %s", pSrcIdx->zName));
sqlite3VdbeOp3(v, OP_OpenRead, iSrc, pSrcIdx->tnum,
(char*)pKey, P3_KEYINFO_HANDOFF);
sqlite3VdbeAddOp(v, OP_Integer, iDbDest, 0);
pKey = sqlite3IndexKeyinfo(pParse, pDestIdx);
VdbeComment((v, "# %s", pDestIdx->zName));
sqlite3VdbeOp3(v, OP_OpenWrite, iDest, pDestIdx->tnum,
(char*)pKey, P3_KEYINFO_HANDOFF);
addr1 = sqlite3VdbeAddOp(v, OP_Rewind, iSrc, 0);
sqlite3VdbeAddOp(v, OP_RowKey, iSrc, 0);
if( pDestIdx->onError!=OE_None ){
sqlite3VdbeAddOp(v, OP_MemLoad, memRowid, 0);
addr2 = sqlite3VdbeAddOp(v, OP_IsUnique, iDest, 0);
sqlite3VdbeOp3(v, OP_Halt, SQLITE_CONSTRAINT, onError,
"UNIQUE constraint failed", P3_STATIC);
sqlite3VdbeJumpHere(v, addr2);
}
sqlite3VdbeAddOp(v, OP_IdxInsert, iDest, 0);
sqlite3VdbeAddOp(v, OP_Next, iSrc, addr1+1);
sqlite3VdbeJumpHere(v, addr1);
}
sqlite3VdbeJumpHere(v, emptySrcTest);
sqlite3VdbeAddOp(v, OP_Close, iSrc, 0);
sqlite3VdbeAddOp(v, OP_Close, iDest, 0);
if( emptyDestTest ){
sqlite3VdbeAddOp(v, OP_Halt, SQLITE_OK, 0);
sqlite3VdbeJumpHere(v, emptyDestTest);
sqlite3VdbeAddOp(v, OP_Close, iDest, 0);
return 0;
}else{
return 1;
}
}
#endif /* SQLITE_OMIT_XFER_OPT */