diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 2b8bc2f58d..2bd04bd177 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -24,6 +24,7 @@ #include "catalog/pg_enum.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/progress.h" #include "commands/vacuum.h" #include "executor/execParallel.h" #include "libpq/libpq.h" @@ -1199,6 +1200,23 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) break; } + case 'P': /* Parallel progress reporting */ + { + /* + * Only incremental progress reporting is currently supported. + * However, it's possible to add more fields to the message to + * allow for handling of other backend progress APIs. + */ + int index = pq_getmsgint(msg, 4); + int64 incr = pq_getmsgint64(msg); + + pq_getmsgend(msg); + + pgstat_progress_incr_param(index, incr); + + break; + } + case 'X': /* Terminate, indicating clean exit */ { shm_mq_detach(pcxt->worker[i].error_mqh); diff --git a/src/backend/utils/activity/backend_progress.c b/src/backend/utils/activity/backend_progress.c index fb48eafef9..67447ef03a 100644 --- a/src/backend/utils/activity/backend_progress.c +++ b/src/backend/utils/activity/backend_progress.c @@ -10,6 +10,8 @@ */ #include "postgres.h" +#include "access/parallel.h" +#include "libpq/pqformat.h" #include "port/atomics.h" /* for memory barriers */ #include "utils/backend_progress.h" #include "utils/backend_status.h" @@ -79,6 +81,36 @@ pgstat_progress_incr_param(int index, int64 incr) PGSTAT_END_WRITE_ACTIVITY(beentry); } +/*----------- + * pgstat_progress_parallel_incr_param() - + * + * A variant of pgstat_progress_incr_param to allow a worker to poke at + * a leader to do an incremental progress update. + *----------- + */ +void +pgstat_progress_parallel_incr_param(int index, int64 incr) +{ + /* + * Parallel workers notify a leader through a 'P' protocol message to + * update progress, passing the progress index and incremented value. + * Leaders can just call pgstat_progress_incr_param directly. + */ + if (IsParallelWorker()) + { + static StringInfoData progress_message; + + initStringInfo(&progress_message); + + pq_beginmessage(&progress_message, 'P'); + pq_sendint32(&progress_message, index); + pq_sendint64(&progress_message, incr); + pq_endmessage(&progress_message); + } + else + pgstat_progress_incr_param(index, incr); +} + /*----------- * pgstat_progress_update_multi_param() - * diff --git a/src/include/utils/backend_progress.h b/src/include/utils/backend_progress.h index a84752ade9..70dea55fc0 100644 --- a/src/include/utils/backend_progress.h +++ b/src/include/utils/backend_progress.h @@ -37,6 +37,7 @@ extern void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid); extern void pgstat_progress_update_param(int index, int64 val); extern void pgstat_progress_incr_param(int index, int64 incr); +extern void pgstat_progress_parallel_incr_param(int index, int64 incr); extern void pgstat_progress_update_multi_param(int nparam, const int *index, const int64 *val); extern void pgstat_progress_end_command(void);