Add streaming version of sqlite3changeset_invert() to sessions module.

FossilOrigin-Name: 8ded6a46794c7bff1c8b790c662ba7e92f576380
This commit is contained in:
dan 2014-09-25 14:54:20 +00:00
parent 4757c65866
commit fa122adac1
5 changed files with 198 additions and 81 deletions

View File

@ -334,10 +334,11 @@ static int sessionSerializeValue(
if( aBuf ) aBuf[0] = '\0';
}
*pnWrite += nByte;
if( pnWrite ) *pnWrite += nByte;
return SQLITE_OK;
}
/*
** This macro is used to calculate hash key values for data structures. In
** order to use this macro, the entire data structure must be represented
@ -1330,6 +1331,29 @@ static int sessionBufferGrow(SessionBuffer *p, int nByte, int *pRc){
return (*pRc!=SQLITE_OK);
}
/*
** Append the value passed as the second argument to the buffer passed
** as the first.
**
** This function is a no-op if *pRc is non-zero when it is called.
** Otherwise, if an error occurs, *pRc is set to an SQLite error code
** before returning.
*/
static void sessionAppendValue(SessionBuffer *p, sqlite3_value *pVal, int *pRc){
int rc = *pRc;
if( rc==SQLITE_OK ){
int nByte = 0;
sessionSerializeValue(0, pVal, &nByte);
sessionBufferGrow(p, nByte, &rc);
if( rc==SQLITE_OK ){
rc = sessionSerializeValue(&p->aBuf[p->nBuf], pVal, 0);
p->nBuf += nByte;
}else{
*pRc = rc;
}
}
}
/*
** This function is a no-op if *pRc is other than SQLITE_OK when it is
** called. Otherwise, append a single byte to the buffer.
@ -2268,6 +2292,38 @@ static int sessionChangesetBufferTblhdr(SessionInput *pIn, int *pnByte){
return rc;
}
/*
** The input pointer currently points to the first byte of the first field
** of a record consisting of nCol columns. This function ensures the entire
** record is buffered.
*/
static int sessionChangesetBufferRecord(
SessionInput *pIn,
int nCol,
int *pnByte
){
int rc = SQLITE_OK;
int nByte = 0;
int i;
for(i=0; rc==SQLITE_OK && i<nCol; i++){
int eType;
rc = sessionInputBuffer(pIn, nByte + 10);
if( rc==SQLITE_OK ){
eType = pIn->aData[pIn->iNext + nByte++];
if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){
int n;
nByte += sessionVarintGet(&pIn->aData[pIn->iNext+nByte], &n);
nByte += n;
rc = sessionInputBuffer(pIn, nByte);
}else if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){
nByte += 8;
}
}
}
*pnByte = nByte;
return rc;
}
/*
** The input pointer currently points to the second byte of a table-header.
** Specifically, to the following:
@ -2582,44 +2638,37 @@ int sqlite3changeset_finalize(sqlite3_changeset_iter *p){
return rc;
}
/*
** Invert a changeset object.
*/
int sqlite3changeset_invert(
int nChangeset, /* Number of bytes in input */
const void *pChangeset, /* Input changeset */
static int sessionChangesetInvert(
SessionInput *pInput, /* Input changeset */
int (*xOutput)(void *pOut, const void *pData, int nData),
void *pOut,
int *pnInverted, /* OUT: Number of bytes in output changeset */
void **ppInverted /* OUT: Inverse of pChangeset */
){
int rc = SQLITE_OK; /* Return value */
u8 *aOut;
u8 *aIn;
int i;
SessionInput sInput;
SessionBuffer sOut; /* Output buffer */
int nCol = 0; /* Number of cols in current table */
u8 *abPK = 0; /* PK array for current table */
sqlite3_value **apVal = 0; /* Space for values for UPDATE inversion */
SessionBuffer sPK = {0, 0, 0}; /* PK array for current table */
/* Initialize the output buffer */
memset(&sOut, 0, sizeof(SessionBuffer));
/* Zero the output variables in case an error occurs. */
*ppInverted = 0;
*pnInverted = 0;
if( nChangeset==0 ) return SQLITE_OK;
if( ppInverted ){
*ppInverted = 0;
*pnInverted = 0;
}
/* Set up the input stream */
memset(&sInput, 0, sizeof(SessionInput));
sInput.nData = nChangeset;
sInput.aData = (u8*)pChangeset;
aOut = (u8 *)sqlite3_malloc(nChangeset);
if( !aOut ) return SQLITE_NOMEM;
aIn = (u8 *)pChangeset;
i = 0;
while( i<nChangeset ){
while( 1 ){
u8 eType;
if( (rc = sessionInputBuffer(&sInput, 2)) ) goto finished_invert;
eType = sInput.aData[sInput.iNext];
/* Test for EOF. */
if( (rc = sessionInputBuffer(pInput, 2)) ) goto finished_invert;
if( pInput->iNext>=pInput->nData ) break;
eType = pInput->aData[pInput->iNext];
switch( eType ){
case 'T': {
/* A 'table' record consists of:
@ -2630,19 +2679,19 @@ int sqlite3changeset_invert(
** * A nul-terminated table name.
*/
int nByte;
int nVarint;
int iNext = sInput.iNext;
sInput.iNext++;
if( (rc = sessionChangesetBufferTblhdr(&sInput, &nByte)) ){
int nVar;
pInput->iNext++;
if( (rc = sessionChangesetBufferTblhdr(pInput, &nByte)) ){
goto finished_invert;
}
nVarint = sessionVarintGet(&sInput.aData[iNext+1], &nCol);
nVar = sessionVarintGet(&pInput->aData[pInput->iNext], &nCol);
sPK.nBuf = 0;
sessionAppendBlob(&sPK, &sInput.aData[iNext+1+nVarint], nCol, &rc);
sessionAppendBlob(&sPK, &pInput->aData[pInput->iNext+nVar], nCol, &rc);
sessionAppendByte(&sOut, eType, &rc);
sessionAppendBlob(&sOut, &pInput->aData[pInput->iNext], nByte, &rc);
if( rc ) goto finished_invert;
sInput.iNext += nByte;
memcpy(&aOut[i], &sInput.aData[iNext], nByte+1);
i += nByte+1;
pInput->iNext += nByte;
sqlite3_free(apVal);
apVal = 0;
abPK = sPK.aBuf;
@ -2651,22 +2700,22 @@ int sqlite3changeset_invert(
case SQLITE_INSERT:
case SQLITE_DELETE: {
int iStart;
int nByte;
sInput.iNext += 2;
iStart = sInput.iNext;
sessionReadRecord(&sInput, nCol, 0, 0);
aOut[i] = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE);
aOut[i+1] = aIn[i+1]; /* indirect-flag */
nByte = sInput.iNext - iStart;
memcpy(&aOut[i+2], &sInput.aData[iStart], nByte);
i += 2 + nByte;
int bIndirect = pInput->aData[pInput->iNext+1];
int eType2 = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE);
pInput->iNext += 2;
assert( rc==SQLITE_OK );
rc = sessionChangesetBufferRecord(pInput, nCol, &nByte);
sessionAppendByte(&sOut, eType2, &rc);
sessionAppendByte(&sOut, bIndirect, &rc);
sessionAppendBlob(&sOut, &pInput->aData[pInput->iNext], nByte, &rc);
pInput->iNext += nByte;
if( rc ) goto finished_invert;
break;
}
case SQLITE_UPDATE: {
int iCol;
int nWrite = 0;
if( 0==apVal ){
apVal = (sqlite3_value **)sqlite3_malloc(sizeof(apVal[0])*nCol*2);
@ -2678,15 +2727,14 @@ int sqlite3changeset_invert(
}
/* Write the header for the new UPDATE change. Same as the original. */
aOut[i] = SQLITE_UPDATE;
aOut[i+1] = sInput.aData[sInput.iNext+1];
nWrite = 2;
sessionAppendByte(&sOut, eType, &rc);
sessionAppendByte(&sOut, pInput->aData[pInput->iNext+1], &rc);
/* Read the old.* and new.* records for the update change. */
sInput.iNext += 2;
rc = sessionReadRecord(&sInput, nCol, 0, &apVal[0]);
pInput->iNext += 2;
rc = sessionReadRecord(pInput, nCol, 0, &apVal[0]);
if( rc==SQLITE_OK ){
rc = sessionReadRecord(&sInput, nCol, 0, &apVal[nCol]);
rc = sessionReadRecord(pInput, nCol, 0, &apVal[nCol]);
}
/* Write the new old.* record. Consists of the PK columns from the
@ -2694,7 +2742,7 @@ int sqlite3changeset_invert(
** new.* record. */
for(iCol=0; rc==SQLITE_OK && iCol<nCol; iCol++){
sqlite3_value *pVal = apVal[iCol + (abPK[iCol] ? 0 : nCol)];
rc = sessionSerializeValue(&aOut[i+nWrite], pVal, &nWrite);
sessionAppendValue(&sOut, pVal, &rc);
}
/* Write the new new.* record. Consists of a copy of all values
@ -2702,7 +2750,7 @@ int sqlite3changeset_invert(
** are set to "undefined". */
for(iCol=0; rc==SQLITE_OK && iCol<nCol; iCol++){
sqlite3_value *pVal = (abPK[iCol] ? 0 : apVal[iCol]);
rc = sessionSerializeValue(&aOut[i+nWrite], pVal, &nWrite);
sessionAppendValue(&sOut, pVal, &rc);
}
for(iCol=0; iCol<nCol*2; iCol++){
@ -2713,8 +2761,6 @@ int sqlite3changeset_invert(
goto finished_invert;
}
i += nWrite;
assert( i==sInput.iNext );
break;
}
@ -2722,21 +2768,73 @@ int sqlite3changeset_invert(
rc = SQLITE_CORRUPT_BKPT;
goto finished_invert;
}
assert( rc==SQLITE_OK );
if( xOutput && sOut.nBuf>=SESSIONS_STR_CHUNK_SIZE ){
rc = xOutput(pOut, sOut.aBuf, sOut.nBuf);
sOut.nBuf = 0;
if( rc!=SQLITE_OK ) goto finished_invert;
}
}
assert( rc==SQLITE_OK );
*pnInverted = nChangeset;
*ppInverted = (void *)aOut;
if( pnInverted ){
*pnInverted = sOut.nBuf;
*ppInverted = sOut.aBuf;
sOut.aBuf = 0;
}else if( sOut.nBuf>0 ){
rc = xOutput(pOut, sOut.aBuf, sOut.nBuf);
}
finished_invert:
if( rc!=SQLITE_OK ){
sqlite3_free(aOut);
}
sqlite3_free(sOut.aBuf);
sqlite3_free(apVal);
sqlite3_free(sPK.aBuf);
return rc;
}
/*
** Invert a changeset object.
*/
int sqlite3changeset_invert(
int nChangeset, /* Number of bytes in input */
const void *pChangeset, /* Input changeset */
int *pnInverted, /* OUT: Number of bytes in output changeset */
void **ppInverted /* OUT: Inverse of pChangeset */
){
SessionInput sInput;
/* Set up the input stream */
memset(&sInput, 0, sizeof(SessionInput));
sInput.nData = nChangeset;
sInput.aData = (u8*)pChangeset;
return sessionChangesetInvert(&sInput, 0, 0, pnInverted, ppInverted);
}
/*
** Streaming version of sqlite3changeset_invert().
*/
int sqlite3changeset_invert_str(
int (*xInput)(void *pIn, void *pData, int *pnData),
void *pIn,
int (*xOutput)(void *pOut, const void *pData, int nData),
void *pOut
){
SessionInput sInput;
int rc;
/* Set up the input stream */
memset(&sInput, 0, sizeof(SessionInput));
sInput.xInput = xInput;
sInput.pIn = pIn;
rc = sessionChangesetInvert(&sInput, xOutput, pOut, 0, 0);
sqlite3_free(sInput.buf.aBuf);
return rc;
}
typedef struct SessionApplyCtx SessionApplyCtx;
struct SessionApplyCtx {
sqlite3 *db;

View File

@ -670,6 +670,16 @@ int sqlite3changeset_invert(
int *pnOut, void **ppOut /* OUT: Inverse of input */
);
/*
** Streaming version of sqlite3changeset_invert().
*/
int sqlite3changeset_invert_str(
int (*xInput)(void *pIn, void *pData, int *pnData),
void *pIn,
int (*xOutput)(void *pOut, const void *pData, int nData),
void *pOut
);
/*
** CAPI3REF: Concatenate Two Changeset Objects
**

View File

@ -676,24 +676,33 @@ static int test_sqlite3changeset_invert(
Tcl_Obj *CONST objv[]
){
int rc; /* Return code from changeset_invert() */
void *aChangeset; /* Input changeset */
int nChangeset; /* Size of buffer aChangeset in bytes */
void *aOut; /* Output changeset */
int nOut; /* Size of buffer aOut in bytes */
TestStreamInput sIn; /* Input stream */
TestSessionsBlob sOut; /* Output blob */
if( objc!=2 ){
Tcl_WrongNumArgs(interp, 1, objv, "CHANGESET");
return TCL_ERROR;
}
aChangeset = (void *)Tcl_GetByteArrayFromObj(objv[1], &nChangeset);
rc = sqlite3changeset_invert(nChangeset, aChangeset, &nOut, &aOut);
if( rc!=SQLITE_OK ){
return test_session_error(interp, rc);
memset(&sIn, 0, sizeof(sIn));
memset(&sOut, 0, sizeof(sOut));
sIn.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);
sIn.aData = Tcl_GetByteArrayFromObj(objv[1], &sIn.nData);
if( sIn.nStream ){
rc = sqlite3changeset_invert_str(
testStreamInput, (void*)&sIn, testSessionsOutput, (void*)&sOut
);
}else{
rc = sqlite3changeset_invert(sIn.nData, sIn.aData, &sOut.n, &sOut.p);
}
Tcl_SetObjResult(interp, Tcl_NewByteArrayObj((unsigned char *)aOut, nOut));
sqlite3_free(aOut);
return TCL_OK;
if( rc!=SQLITE_OK ){
rc = test_session_error(interp, rc);
}else{
Tcl_SetObjResult(interp,Tcl_NewByteArrayObj((unsigned char*)sOut.p,sOut.n));
}
sqlite3_free(sOut.p);
return rc;
}
/*

View File

@ -1,5 +1,5 @@
C Add\sstreaming\sversion\sof\ssqlite3changeset_apply().\sTests\sand\sfixes\sfor\sthe\ssame\sand\ssqlite3changeset_start_str().
D 2014-09-24T17:13:20.331
C Add\sstreaming\sversion\sof\ssqlite3changeset_invert()\sto\ssessions\smodule.
D 2014-09-25T14:54:20.019
F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
F Makefile.in dd5f245aa8c741bc65845747203c8ce2f3fb6c83
F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@ -158,9 +158,9 @@ F ext/session/sessionA.test eb05c13e4ef1ca8046a3a6dbf2d5f6f5b04a11d4
F ext/session/sessionB.test 276267cd7fc37c2e2dd03f1e2ed9ada336a8bdb4
F ext/session/session_common.tcl 1539d8973b2aea0025c133eb0cc4c89fcef541a5
F ext/session/sessionfault.test e7965159a73d385c1a4af12d82c3a039ebdd71a6
F ext/session/sqlite3session.c 1c653844900de41e175f77f22fe1af7abb05e798
F ext/session/sqlite3session.h 7e7a31ad1992f6678a20654c9751dacd10384292
F ext/session/test_session.c 77f1e7a269daeb60f82441ff859c812d686ef79d
F ext/session/sqlite3session.c 9edf9273280c804c45e7508be9644cf96f278c63
F ext/session/sqlite3session.h 944d7b2c3e87b5598a2c34afe8dd032d51d09818
F ext/session/test_session.c 4449ef150e52baad844aa08c29569f3ec10902d8
F ext/userauth/sqlite3userauth.h 19cb6f0e31316d0ee4afdfb7a85ef9da3333a220
F ext/userauth/user-auth.txt e6641021a9210364665fe625d067617d03f27b04
F ext/userauth/userauth.c 5fa3bdb492f481bbc1709fc83c91ebd13460c69e
@ -1216,7 +1216,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1
F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
F tool/warnings.sh 0abfd78ceb09b7f7c27c688c8e3fe93268a13b32
F tool/win/sqlite.vsix deb315d026cc8400325c5863eef847784a219a2f
P 3c7d3d950bbf5f5ed3696ebc61c77ca48bafe2b5
R bba1026624291c9ce371985c2bc9cae0
P b917fc146876f764442de08d5ec36e5b4cf5ab52
R b1265a84c7bae5ddb87cac377b4def7e
U dan
Z 40a11f75418a19b3b02aba54325bf64c
Z 0bbb9bee98e102db876d83b2977969a7

View File

@ -1 +1 @@
b917fc146876f764442de08d5ec36e5b4cf5ab52
8ded6a46794c7bff1c8b790c662ba7e92f576380