pgbench: Use COPY for client-side data generation
This commit switches the client-side data generation from INSERT queries to COPY for the two tables pgbench_branches and pgbench_tellers. pgbench_accounts was already using COPY. COPY is a better interface for bulk loading or high latency connections (this point can be countered with the option for server-side data generation, still client-side is the default), and measurements have proved that using it for these two other tables can lead to improvements during initialization. I did not notice slowdowns at large scale numbers on a local setup, either, most of the work happening for the accounts table. Previously COPY was only used for the pgbench_accounts table because the amount of data was much larger than the two other tables. The code is refactored so as all three tables use the same code path to execute the COPY queries, with a callback to build data rows. Author: Tristan Partin Discussion: https://postgr.es/m/CSTU5P82ONZ1.19XFUGHMXHBRY@c3po
This commit is contained in:
parent
29836df323
commit
e35cc3b3f2
@ -231,10 +231,11 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
|
|||||||
extensively through a <command>COPY</command>.
|
extensively through a <command>COPY</command>.
|
||||||
<command>pgbench</command> uses the FREEZE option with version 14 or later
|
<command>pgbench</command> uses the FREEZE option with version 14 or later
|
||||||
of <productname>PostgreSQL</productname> to speed up
|
of <productname>PostgreSQL</productname> to speed up
|
||||||
subsequent <command>VACUUM</command>, unless partitions are enabled.
|
subsequent <command>VACUUM</command>, except on the
|
||||||
Using <literal>g</literal> causes logging to print one message
|
<literal>pgbench_accounts</literal> table if partitions are
|
||||||
every 100,000 rows while generating data for the
|
enabled. Using <literal>g</literal> causes logging to
|
||||||
<structname>pgbench_accounts</structname> table.
|
print one message every 100,000 rows while generating data for all
|
||||||
|
tables.
|
||||||
</para>
|
</para>
|
||||||
<para>
|
<para>
|
||||||
With <literal>G</literal> (server-side data generation),
|
With <literal>G</literal> (server-side data generation),
|
||||||
|
@ -835,6 +835,8 @@ static void add_socket_to_set(socket_set *sa, int fd, int idx);
|
|||||||
static int wait_on_socket_set(socket_set *sa, int64 usecs);
|
static int wait_on_socket_set(socket_set *sa, int64 usecs);
|
||||||
static bool socket_has_input(socket_set *sa, int fd, int idx);
|
static bool socket_has_input(socket_set *sa, int fd, int idx);
|
||||||
|
|
||||||
|
/* callback used to build rows for COPY during data loading */
|
||||||
|
typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
|
||||||
|
|
||||||
/* callback functions for our flex lexer */
|
/* callback functions for our flex lexer */
|
||||||
static const PsqlScanCallbacks pgbench_callbacks = {
|
static const PsqlScanCallbacks pgbench_callbacks = {
|
||||||
@ -4859,17 +4861,45 @@ initTruncateTables(PGconn *con)
|
|||||||
"pgbench_tellers");
|
"pgbench_tellers");
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Fill the standard tables with some data generated and sent from the client
|
|
||||||
*/
|
|
||||||
static void
|
static void
|
||||||
initGenerateDataClientSide(PGconn *con)
|
initBranch(PQExpBufferData *sql, int64 curr)
|
||||||
{
|
{
|
||||||
PQExpBufferData sql;
|
/* "filler" column uses NULL */
|
||||||
|
printfPQExpBuffer(sql,
|
||||||
|
INT64_FORMAT "\t0\t\\N\n",
|
||||||
|
curr + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
initTeller(PQExpBufferData *sql, int64 curr)
|
||||||
|
{
|
||||||
|
/* "filler" column uses NULL */
|
||||||
|
printfPQExpBuffer(sql,
|
||||||
|
INT64_FORMAT "\t" INT64_FORMAT "\t0\t\\N\n",
|
||||||
|
curr + 1, curr / ntellers + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
initAccount(PQExpBufferData *sql, int64 curr)
|
||||||
|
{
|
||||||
|
/* "filler" column defaults to blank padded empty string */
|
||||||
|
printfPQExpBuffer(sql,
|
||||||
|
INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n",
|
||||||
|
curr + 1, curr / naccounts + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
initPopulateTable(PGconn *con, const char *table, int64 base,
|
||||||
|
initRowMethod init_row)
|
||||||
|
{
|
||||||
|
int n;
|
||||||
|
int k;
|
||||||
|
int chars = 0;
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
int i;
|
PQExpBufferData sql;
|
||||||
int64 k;
|
char copy_statement[256];
|
||||||
char *copy_statement;
|
const char *copy_statement_fmt = "copy %s from stdin";
|
||||||
|
int64 total = base * scale;
|
||||||
|
|
||||||
/* used to track elapsed time and estimate of the remaining time */
|
/* used to track elapsed time and estimate of the remaining time */
|
||||||
pg_time_usec_t start;
|
pg_time_usec_t start;
|
||||||
@ -4878,50 +4908,24 @@ initGenerateDataClientSide(PGconn *con)
|
|||||||
/* Stay on the same line if reporting to a terminal */
|
/* Stay on the same line if reporting to a terminal */
|
||||||
char eol = isatty(fileno(stderr)) ? '\r' : '\n';
|
char eol = isatty(fileno(stderr)) ? '\r' : '\n';
|
||||||
|
|
||||||
fprintf(stderr, "generating data (client-side)...\n");
|
|
||||||
|
|
||||||
/*
|
|
||||||
* we do all of this in one transaction to enable the backend's
|
|
||||||
* data-loading optimizations
|
|
||||||
*/
|
|
||||||
executeStatement(con, "begin");
|
|
||||||
|
|
||||||
/* truncate away any old data */
|
|
||||||
initTruncateTables(con);
|
|
||||||
|
|
||||||
initPQExpBuffer(&sql);
|
initPQExpBuffer(&sql);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* fill branches, tellers, accounts in that order in case foreign keys
|
* Use COPY with FREEZE on v14 and later for all the tables except
|
||||||
* already exist
|
* pgbench_accounts when it is partitioned.
|
||||||
*/
|
*/
|
||||||
for (i = 0; i < nbranches * scale; i++)
|
if (PQserverVersion(con) >= 140000)
|
||||||
{
|
{
|
||||||
/* "filler" column defaults to NULL */
|
if (strcmp(table, "pgbench_accounts") != 0 ||
|
||||||
printfPQExpBuffer(&sql,
|
partitions == 0)
|
||||||
"insert into pgbench_branches(bid,bbalance) values(%d,0)",
|
copy_statement_fmt = "copy %s from stdin with (freeze on)";
|
||||||
i + 1);
|
|
||||||
executeStatement(con, sql.data);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i = 0; i < ntellers * scale; i++)
|
n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table);
|
||||||
{
|
if (n >= sizeof(copy_statement))
|
||||||
/* "filler" column defaults to NULL */
|
pg_fatal("invalid buffer size: must be at least %d characters long", n);
|
||||||
printfPQExpBuffer(&sql,
|
else if (n == -1)
|
||||||
"insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
|
pg_fatal("invalid format string");
|
||||||
i + 1, i / ntellers + 1);
|
|
||||||
executeStatement(con, sql.data);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* accounts is big enough to be worth using COPY and tracking runtime
|
|
||||||
*/
|
|
||||||
|
|
||||||
/* use COPY with FREEZE on v14 and later without partitioning */
|
|
||||||
if (partitions == 0 && PQserverVersion(con) >= 140000)
|
|
||||||
copy_statement = "copy pgbench_accounts from stdin with (freeze on)";
|
|
||||||
else
|
|
||||||
copy_statement = "copy pgbench_accounts from stdin";
|
|
||||||
|
|
||||||
res = PQexec(con, copy_statement);
|
res = PQexec(con, copy_statement);
|
||||||
|
|
||||||
@ -4931,14 +4935,11 @@ initGenerateDataClientSide(PGconn *con)
|
|||||||
|
|
||||||
start = pg_time_now();
|
start = pg_time_now();
|
||||||
|
|
||||||
for (k = 0; k < (int64) naccounts * scale; k++)
|
for (k = 0; k < total; k++)
|
||||||
{
|
{
|
||||||
int64 j = k + 1;
|
int64 j = k + 1;
|
||||||
|
|
||||||
/* "filler" column defaults to blank padded empty string */
|
init_row(&sql, k);
|
||||||
printfPQExpBuffer(&sql,
|
|
||||||
INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n",
|
|
||||||
j, k / naccounts + 1);
|
|
||||||
if (PQputline(con, sql.data))
|
if (PQputline(con, sql.data))
|
||||||
pg_fatal("PQputline failed");
|
pg_fatal("PQputline failed");
|
||||||
|
|
||||||
@ -4952,25 +4953,26 @@ initGenerateDataClientSide(PGconn *con)
|
|||||||
if ((!use_quiet) && (j % 100000 == 0))
|
if ((!use_quiet) && (j % 100000 == 0))
|
||||||
{
|
{
|
||||||
double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
|
double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
|
||||||
double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
|
double remaining_sec = ((double) total - j) * elapsed_sec / j;
|
||||||
|
|
||||||
fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)%c",
|
chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)%c",
|
||||||
j, (int64) naccounts * scale,
|
j, total,
|
||||||
(int) (((int64) j * 100) / (naccounts * (int64) scale)),
|
(int) ((j * 100) / total),
|
||||||
elapsed_sec, remaining_sec, eol);
|
table, elapsed_sec, remaining_sec, eol);
|
||||||
}
|
}
|
||||||
/* let's not call the timing for each row, but only each 100 rows */
|
/* let's not call the timing for each row, but only each 100 rows */
|
||||||
else if (use_quiet && (j % 100 == 0))
|
else if (use_quiet && (j % 100 == 0))
|
||||||
{
|
{
|
||||||
double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
|
double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
|
||||||
double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
|
double remaining_sec = ((double) total - j) * elapsed_sec / j;
|
||||||
|
|
||||||
/* have we reached the next interval (or end)? */
|
/* have we reached the next interval (or end)? */
|
||||||
if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
|
if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
|
||||||
{
|
{
|
||||||
fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)%c",
|
chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)%c",
|
||||||
j, (int64) naccounts * scale,
|
j, total,
|
||||||
(int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec, eol);
|
(int) ((j * 100) / total),
|
||||||
|
table, elapsed_sec, remaining_sec, eol);
|
||||||
|
|
||||||
/* skip to the next interval */
|
/* skip to the next interval */
|
||||||
log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
|
log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
|
||||||
@ -4978,8 +4980,8 @@ initGenerateDataClientSide(PGconn *con)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (eol != '\n')
|
if (chars != 0 && eol != '\n')
|
||||||
fputc('\n', stderr); /* Need to move to next line */
|
fprintf(stderr, "%*c\r", chars - 1, ' '); /* Clear the current line */
|
||||||
|
|
||||||
if (PQputline(con, "\\.\n"))
|
if (PQputline(con, "\\.\n"))
|
||||||
pg_fatal("very last PQputline failed");
|
pg_fatal("very last PQputline failed");
|
||||||
@ -4987,6 +4989,35 @@ initGenerateDataClientSide(PGconn *con)
|
|||||||
pg_fatal("PQendcopy failed");
|
pg_fatal("PQendcopy failed");
|
||||||
|
|
||||||
termPQExpBuffer(&sql);
|
termPQExpBuffer(&sql);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fill the standard tables with some data generated and sent from the client.
|
||||||
|
*
|
||||||
|
* The filler column is NULL in pgbench_branches and pgbench_tellers, and is
|
||||||
|
* a blank-padded string in pgbench_accounts.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
initGenerateDataClientSide(PGconn *con)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "generating data (client-side)...\n");
|
||||||
|
|
||||||
|
/*
|
||||||
|
* we do all of this in one transaction to enable the backend's
|
||||||
|
* data-loading optimizations
|
||||||
|
*/
|
||||||
|
executeStatement(con, "begin");
|
||||||
|
|
||||||
|
/* truncate away any old data */
|
||||||
|
initTruncateTables(con);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* fill branches, tellers, accounts in that order in case foreign keys
|
||||||
|
* already exist
|
||||||
|
*/
|
||||||
|
initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
|
||||||
|
initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
|
||||||
|
initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
|
||||||
|
|
||||||
executeStatement(con, "commit");
|
executeStatement(con, "commit");
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user