Set query ID in parallel workers for vacuum, BRIN and btree
authorMichael Paquier <michael@paquier.xyz>
Sun, 29 Sep 2024 23:43:28 +0000 (08:43 +0900)
committerMichael Paquier <michael@paquier.xyz>
Sun, 29 Sep 2024 23:43:28 +0000 (08:43 +0900)
All these code paths use their own entry point when starting parallel
workers, but failed to set a query ID, even if they set a text query.
Hence, this data would be missed in pg_stat_activity for the worker
processes.  The main entry point for parallel query processing,
ParallelQueryMain(), is already doing that by saving its query ID in a
dummy PlannedStmt, but not the others.  The code is changed so as the
query ID of these queries is set in their shared state, and reported
back once the parallel workers start.

Some tests are added to show how the failures can happen for btree and
BRIN with a parallel build enforced, which are able to trigger a failure
in an assertion added by 24f520594809 in the recovery TAP test
027_stream_regress.pl where pg_stat_statements is always loaded.  In
this case, the executor path was taken because the index expression
needs to be flattened when building its IndexInfo.

Alexander Lakhin has noticed the problem in btree, and I have noticed
that the issue was more spread.  This is arguably a bug, but nobody has
complained about that until now, so no backpatch is done out of caution.
If folks would like to see a backpatch, well, let me know.

Reported-by: Alexander Lakhin
Reviewed-by: Sami Imseih
Discussion: https://postgr.es/m/cf3547c1-498a-6a61-7b01-819f902a251f@gmail.com

src/backend/access/brin/brin.c
src/backend/access/nbtree/nbtsort.c
src/backend/commands/vacuumparallel.c
src/test/regress/expected/brin.out
src/test/regress/expected/btree_index.out
src/test/regress/sql/brin.sql
src/test/regress/sql/btree_index.sql

index 60853a0f6adb3259b5372bfa1d8536083da5e93d..c0b978119ac8692f5d3f3d4142882c5cd44cb023 100644 (file)
@@ -67,6 +67,9 @@ typedef struct BrinShared
    BlockNumber pagesPerRange;
    int         scantuplesortstates;
 
+   /* Query ID, for report in worker processes */
+   uint64      queryid;
+
    /*
     * workersdonecv is used to monitor the progress of workers.  All parallel
     * participants must indicate that they are done before leader can use
@@ -2448,6 +2451,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
    brinshared->isconcurrent = isconcurrent;
    brinshared->scantuplesortstates = scantuplesortstates;
    brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
+   brinshared->queryid = pgstat_get_my_query_id();
    ConditionVariableInit(&brinshared->workersdonecv);
    SpinLockInit(&brinshared->mutex);
 
@@ -2891,6 +2895,9 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
        indexLockmode = RowExclusiveLock;
    }
 
+   /* Track query ID */
+   pgstat_report_query_id(brinshared->queryid, false);
+
    /* Open relations within worker */
    heapRel = table_open(brinshared->heaprelid, heapLockmode);
    indexRel = index_open(brinshared->indexrelid, indexLockmode);
index f5d7b3b0c3014b1bc1a2e07300fb7cce91a8db3c..5cca0d4f520b5fd5bc943d59cd30f05f749bf219 100644 (file)
@@ -105,6 +105,9 @@ typedef struct BTShared
    bool        isconcurrent;
    int         scantuplesortstates;
 
+   /* Query ID, for report in worker processes */
+   uint64      queryid;
+
    /*
     * workersdonecv is used to monitor the progress of workers.  All parallel
     * participants must indicate that they are done before leader can use
@@ -1505,6 +1508,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
    btshared->nulls_not_distinct = btspool->nulls_not_distinct;
    btshared->isconcurrent = isconcurrent;
    btshared->scantuplesortstates = scantuplesortstates;
+   btshared->queryid = pgstat_get_my_query_id();
    ConditionVariableInit(&btshared->workersdonecv);
    SpinLockInit(&btshared->mutex);
    /* Initialize mutable state */
@@ -1787,6 +1791,9 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
        indexLockmode = RowExclusiveLock;
    }
 
+   /* Track query ID */
+   pgstat_report_query_id(btshared->queryid, false);
+
    /* Open relations within worker */
    heapRel = table_open(btshared->heaprelid, heapLockmode);
    indexRel = index_open(btshared->indexrelid, indexLockmode);
index 22c057fe61be1d1c1e094bf14dab7814e27da4ee..4fd6574e12981c883dbba06f1870a7266af512e4 100644 (file)
 typedef struct PVShared
 {
    /*
-    * Target table relid and log level (for messages about parallel workers
-    * launched during VACUUM VERBOSE).  These fields are not modified during
-    * the parallel vacuum.
+    * Target table relid, log level (for messages about parallel workers
+    * launched during VACUUM VERBOSE) and query ID.  These fields are not
+    * modified during the parallel vacuum.
     */
    Oid         relid;
    int         elevel;
+   uint64      queryid;
 
    /*
     * Fields for both index vacuum and cleanup.
@@ -369,6 +370,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
    MemSet(shared, 0, est_shared_len);
    shared->relid = RelationGetRelid(rel);
    shared->elevel = elevel;
+   shared->queryid = pgstat_get_my_query_id();
    shared->maintenance_work_mem_worker =
        (nindexes_mwm > 0) ?
        maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
@@ -1014,6 +1016,9 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
    debug_query_string = sharedquery;
    pgstat_report_activity(STATE_RUNNING, debug_query_string);
 
+   /* Track query ID */
+   pgstat_report_query_id(shared->queryid, false);
+
    /*
     * Open table.  The lock mode is the same as the leader process.  It's
     * okay because the lock mode does not conflict among the parallel
index d6779d8c7d21a873deb410f9f15f2e19708751ea..e1db2280cf92e0c21d7af58784ae070352b73209 100644 (file)
@@ -567,6 +567,16 @@ SELECT * FROM brintest_3 WHERE b < '0';
 
 DROP TABLE brintest_3;
 RESET enable_seqscan;
+-- test parallel build with immutable function.
+CREATE TABLE brintest_expr (n int);
+CREATE FUNCTION brintest_func() RETURNS int LANGUAGE sql IMMUTABLE RETURN 0;
+BEGIN;
+SET LOCAL min_parallel_table_scan_size = 0;
+SET LOCAL max_parallel_maintenance_workers = 4;
+CREATE INDEX brintest_expr_idx ON brintest_expr USING brin (brintest_func());
+COMMIT;
+DROP TABLE brintest_expr;
+DROP FUNCTION brintest_func();
 -- test an unlogged table, mostly to get coverage of brinbuildempty
 CREATE UNLOGGED TABLE brintest_unlogged (n numrange);
 CREATE INDEX brinidx_unlogged ON brintest_unlogged USING brin (n);
index b350efe128c7ba131d7655ec8387c3bc28210f8d..d3f4c7e08c379434bca2b99f67c45cf33bb90068 100644 (file)
@@ -476,6 +476,16 @@ INSERT INTO delete_test_table SELECT i, 1, 2, 3 FROM generate_series(1,1000) i;
 -- Test unsupported btree opclass parameters
 create index on btree_tall_tbl (id int4_ops(foo=1));
 ERROR:  operator class int4_ops has no options
+-- test parallel build with immutable function.
+CREATE TABLE btree_test_expr (n int);
+CREATE FUNCTION btree_test_func() RETURNS int LANGUAGE sql IMMUTABLE RETURN 0;
+BEGIN;
+SET LOCAL min_parallel_table_scan_size = 0;
+SET LOCAL max_parallel_maintenance_workers = 4;
+CREATE INDEX btree_test_expr_idx ON btree_test_expr USING btree (btree_test_func());
+COMMIT;
+DROP TABLE btree_test_expr;
+DROP FUNCTION btree_test_func();
 -- Test case of ALTER INDEX with abuse of column names for indexes.
 -- This grammar is not officially supported, but the parser allows it.
 CREATE INDEX btree_tall_idx2 ON btree_tall_tbl (id);
index 695cfad4bea3a3c90f54025a581b0b947cb4b59f..7ea97f47c8d0ebe8486a0f44740a6034e3f28613 100644 (file)
@@ -510,6 +510,17 @@ SELECT * FROM brintest_3 WHERE b < '0';
 DROP TABLE brintest_3;
 RESET enable_seqscan;
 
+-- test parallel build with immutable function.
+CREATE TABLE brintest_expr (n int);
+CREATE FUNCTION brintest_func() RETURNS int LANGUAGE sql IMMUTABLE RETURN 0;
+BEGIN;
+SET LOCAL min_parallel_table_scan_size = 0;
+SET LOCAL max_parallel_maintenance_workers = 4;
+CREATE INDEX brintest_expr_idx ON brintest_expr USING brin (brintest_func());
+COMMIT;
+DROP TABLE brintest_expr;
+DROP FUNCTION brintest_func();
+
 -- test an unlogged table, mostly to get coverage of brinbuildempty
 CREATE UNLOGGED TABLE brintest_unlogged (n numrange);
 CREATE INDEX brinidx_unlogged ON brintest_unlogged USING brin (n);
index 0d2a33f37053cef22a42909d28b1d310086a10f1..2c3b1352926ecadd171a405812de3d30024e4e47 100644 (file)
@@ -272,6 +272,17 @@ INSERT INTO delete_test_table SELECT i, 1, 2, 3 FROM generate_series(1,1000) i;
 -- Test unsupported btree opclass parameters
 create index on btree_tall_tbl (id int4_ops(foo=1));
 
+-- test parallel build with immutable function.
+CREATE TABLE btree_test_expr (n int);
+CREATE FUNCTION btree_test_func() RETURNS int LANGUAGE sql IMMUTABLE RETURN 0;
+BEGIN;
+SET LOCAL min_parallel_table_scan_size = 0;
+SET LOCAL max_parallel_maintenance_workers = 4;
+CREATE INDEX btree_test_expr_idx ON btree_test_expr USING btree (btree_test_func());
+COMMIT;
+DROP TABLE btree_test_expr;
+DROP FUNCTION btree_test_func();
+
 -- Test case of ALTER INDEX with abuse of column names for indexes.
 -- This grammar is not officially supported, but the parser allows it.
 CREATE INDEX btree_tall_idx2 ON btree_tall_tbl (id);