2505 | 505 | 0505
(2 rows)
+DELETE FROM result_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
+ QUERY PLAN
+---------------------------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, ('AAA'::text || async_pt_1.c)
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, ('AAA'::text || async_pt_2.c)
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+(10 rows)
+
+INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+---------
+ 1505 | 505 | AAA0505
+ 2505 | 505 | AAA0505
+(2 rows)
+
DELETE FROM result_tbl;
-- Check case where multiple partitions use the same connection
CREATE TABLE base_tbl3 (a int, b int, c text);
3900 | 900 | 0900 | 3900 | 900 | 0900
(30 rows)
+DELETE FROM join_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Insert on public.join_tbl
+ -> Append
+ -> Async Foreign Scan
+ Output: t1_1.a, t1_1.b, ('AAA'::text || t1_1.c), t2_1.a, t2_1.b, ('AAA'::text || t2_1.c)
+ Relations: (public.async_p1 t1_1) INNER JOIN (public.async_p1 t2_1)
+ Remote SQL: SELECT r5.a, r5.b, r5.c, r8.a, r8.b, r8.c FROM (public.base_tbl1 r5 INNER JOIN public.base_tbl1 r8 ON (((r5.a = r8.a)) AND ((r5.b = r8.b)) AND (((r5.b % 100) = 0))))
+ -> Async Foreign Scan
+ Output: t1_2.a, t1_2.b, ('AAA'::text || t1_2.c), t2_2.a, t2_2.b, ('AAA'::text || t2_2.c)
+ Relations: (public.async_p2 t1_2) INNER JOIN (public.async_p2 t2_2)
+ Remote SQL: SELECT r6.a, r6.b, r6.c, r9.a, r9.b, r9.c FROM (public.base_tbl2 r6 INNER JOIN public.base_tbl2 r9 ON (((r6.a = r9.a)) AND ((r6.b = r9.b)) AND (((r6.b % 100) = 0))))
+ -> Hash Join
+ Output: t1_3.a, t1_3.b, ('AAA'::text || t1_3.c), t2_3.a, t2_3.b, ('AAA'::text || t2_3.c)
+ Hash Cond: ((t2_3.a = t1_3.a) AND (t2_3.b = t1_3.b))
+ -> Seq Scan on public.async_p3 t2_3
+ Output: t2_3.a, t2_3.b, t2_3.c
+ -> Hash
+ Output: t1_3.a, t1_3.b, t1_3.c
+ -> Seq Scan on public.async_p3 t1_3
+ Output: t1_3.a, t1_3.b, t1_3.c
+ Filter: ((t1_3.b % 100) = 0)
+(20 rows)
+
+INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+SELECT * FROM join_tbl ORDER BY a1;
+ a1 | b1 | c1 | a2 | b2 | c2
+------+-----+---------+------+-----+---------
+ 1000 | 0 | AAA0000 | 1000 | 0 | AAA0000
+ 1100 | 100 | AAA0100 | 1100 | 100 | AAA0100
+ 1200 | 200 | AAA0200 | 1200 | 200 | AAA0200
+ 1300 | 300 | AAA0300 | 1300 | 300 | AAA0300
+ 1400 | 400 | AAA0400 | 1400 | 400 | AAA0400
+ 1500 | 500 | AAA0500 | 1500 | 500 | AAA0500
+ 1600 | 600 | AAA0600 | 1600 | 600 | AAA0600
+ 1700 | 700 | AAA0700 | 1700 | 700 | AAA0700
+ 1800 | 800 | AAA0800 | 1800 | 800 | AAA0800
+ 1900 | 900 | AAA0900 | 1900 | 900 | AAA0900
+ 2000 | 0 | AAA0000 | 2000 | 0 | AAA0000
+ 2100 | 100 | AAA0100 | 2100 | 100 | AAA0100
+ 2200 | 200 | AAA0200 | 2200 | 200 | AAA0200
+ 2300 | 300 | AAA0300 | 2300 | 300 | AAA0300
+ 2400 | 400 | AAA0400 | 2400 | 400 | AAA0400
+ 2500 | 500 | AAA0500 | 2500 | 500 | AAA0500
+ 2600 | 600 | AAA0600 | 2600 | 600 | AAA0600
+ 2700 | 700 | AAA0700 | 2700 | 700 | AAA0700
+ 2800 | 800 | AAA0800 | 2800 | 800 | AAA0800
+ 2900 | 900 | AAA0900 | 2900 | 900 | AAA0900
+ 3000 | 0 | AAA0000 | 3000 | 0 | AAA0000
+ 3100 | 100 | AAA0100 | 3100 | 100 | AAA0100
+ 3200 | 200 | AAA0200 | 3200 | 200 | AAA0200
+ 3300 | 300 | AAA0300 | 3300 | 300 | AAA0300
+ 3400 | 400 | AAA0400 | 3400 | 400 | AAA0400
+ 3500 | 500 | AAA0500 | 3500 | 500 | AAA0500
+ 3600 | 600 | AAA0600 | 3600 | 600 | AAA0600
+ 3700 | 700 | AAA0700 | 3700 | 700 | AAA0700
+ 3800 | 800 | AAA0800 | 3800 | 800 | AAA0800
+ 3900 | 900 | AAA0900 | 3900 | 900 | AAA0900
+(30 rows)
+
DELETE FROM join_tbl;
RESET enable_partitionwise_join;
-- Test rescan of an async Append node with do_exec_prune=false
DROP INDEX base_tbl1_idx;
DROP INDEX base_tbl2_idx;
DROP INDEX async_p3_idx;
+-- UNION queries
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl
+(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
+UNION
+(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
+ QUERY PLAN
+-----------------------------------------------------------------------------------------------------------------
+ Insert on public.result_tbl
+ -> HashAggregate
+ Output: async_p1.a, async_p1.b, (('AAA'::text || async_p1.c))
+ Group Key: async_p1.a, async_p1.b, (('AAA'::text || async_p1.c))
+ -> Append
+ -> Async Foreign Scan on public.async_p1
+ Output: async_p1.a, async_p1.b, ('AAA'::text || async_p1.c)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY a ASC NULLS LAST LIMIT 10::bigint
+ -> Async Foreign Scan on public.async_p2
+ Output: async_p2.a, async_p2.b, ('AAA'::text || async_p2.c)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((b < 10))
+(11 rows)
+
+INSERT INTO result_tbl
+(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
+UNION
+(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+----+---------
+ 1000 | 0 | AAA0000
+ 1005 | 5 | AAA0005
+ 1010 | 10 | AAA0010
+ 1015 | 15 | AAA0015
+ 1020 | 20 | AAA0020
+ 1025 | 25 | AAA0025
+ 1030 | 30 | AAA0030
+ 1035 | 35 | AAA0035
+ 1040 | 40 | AAA0040
+ 1045 | 45 | AAA0045
+ 2000 | 0 | AAA0000
+ 2005 | 5 | AAA0005
+(12 rows)
+
+DELETE FROM result_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl
+(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
+UNION ALL
+(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
+ QUERY PLAN
+-----------------------------------------------------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ -> Async Foreign Scan on public.async_p1
+ Output: async_p1.a, async_p1.b, ('AAA'::text || async_p1.c)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY a ASC NULLS LAST LIMIT 10::bigint
+ -> Async Foreign Scan on public.async_p2
+ Output: async_p2.a, async_p2.b, ('AAA'::text || async_p2.c)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((b < 10))
+(8 rows)
+
+INSERT INTO result_tbl
+(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
+UNION ALL
+(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+----+---------
+ 1000 | 0 | AAA0000
+ 1005 | 5 | AAA0005
+ 1010 | 10 | AAA0010
+ 1015 | 15 | AAA0015
+ 1020 | 20 | AAA0020
+ 1025 | 25 | AAA0025
+ 1030 | 30 | AAA0030
+ 1035 | 35 | AAA0035
+ 1040 | 40 | AAA0040
+ 1045 | 45 | AAA0045
+ 2000 | 0 | AAA0000
+ 2005 | 5 | AAA0005
+(12 rows)
+
+DELETE FROM result_tbl;
-- Test that pending requests are processed properly
SET enable_mergejoin TO false;
SET enable_hashjoin TO false;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
+INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
-- Check case where multiple partitions use the same connection
CREATE TABLE base_tbl3 (a int, b int, c text);
CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
SELECT * FROM join_tbl ORDER BY a1;
DELETE FROM join_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+
+SELECT * FROM join_tbl ORDER BY a1;
+DELETE FROM join_tbl;
+
RESET enable_partitionwise_join;
-- Test rescan of an async Append node with do_exec_prune=false
DROP INDEX base_tbl2_idx;
DROP INDEX async_p3_idx;
+-- UNION queries
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl
+(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
+UNION
+(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
+INSERT INTO result_tbl
+(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
+UNION
+(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl
+(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
+UNION ALL
+(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
+INSERT INTO result_tbl
+(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
+UNION ALL
+(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
-- Test that pending requests are processed properly
SET enable_mergejoin TO false;
SET enable_hashjoin TO false;
static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan,
List *gating_quals);
static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path);
-static bool is_async_capable_path(Path *path);
+static bool mark_async_capable_plan(Plan *plan, Path *path);
static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path,
int flags);
static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
}
/*
- * is_async_capable_path
- * Check whether a given Path node is async-capable.
+ * mark_async_capable_plan
+ * Check whether a given Path node is async-capable, and if so, mark the
+ * Plan node created from it as such and return true, otherwise return
+ * false.
*/
static bool
-is_async_capable_path(Path *path)
+mark_async_capable_plan(Plan *plan, Path *path)
{
switch (nodeTag(path))
{
+ case T_SubqueryScanPath:
+ {
+ SubqueryScan *scan_plan = (SubqueryScan *) plan;
+
+ /*
+ * If a SubqueryScan node atop of an async-capable plan node
+ * is deletable, consider it as async-capable.
+ */
+ if (trivial_subqueryscan(scan_plan) &&
+ mark_async_capable_plan(scan_plan->subplan,
+ ((SubqueryScanPath *) path)->subpath))
+ break;
+ return false;
+ }
case T_ForeignPath:
{
FdwRoutine *fdwroutine = path->parent->fdwroutine;
Assert(fdwroutine != NULL);
if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
- return true;
+ break;
+ return false;
}
- break;
+ case T_ProjectionPath:
+
+ /*
+ * If the generated plan node doesn't include a Result node,
+ * consider it as async-capable if the subpath is async-capable.
+ */
+ if (!IsA(plan, Result) &&
+ mark_async_capable_plan(plan,
+ ((ProjectionPath *) path)->subpath))
+ return true;
+ return false;
default:
- break;
+ return false;
}
- return false;
+
+ plan->async_capable = true;
+
+ return true;
}
/*
}
}
- subplans = lappend(subplans, subplan);
-
- /* Check to see if subplan can be executed asynchronously */
- if (consider_async && is_async_capable_path(subpath))
+ /* If needed, check to see if subplan can be executed asynchronously */
+ if (consider_async && mark_async_capable_plan(subplan, subpath))
{
- subplan->async_capable = true;
+ Assert(subplan->async_capable);
++nasyncplans;
}
+
+ subplans = lappend(subplans, subplan);
}
/*
plan->righttree = NULL;
node->scan.scanrelid = scanrelid;
node->subplan = subplan;
+ node->scanstatus = SUBQUERY_SCAN_UNKNOWN;
return node;
}