}
/*
- * Modify slot with user data provided as C strings.
+ * Replace selected columns with user data provided as C strings.
* This is somewhat similar to heap_modify_tuple but also calls the type
- * input function on the user data as the input is the text representation
- * of the types.
+ * input functions on the user data.
+ * "slot" is filled with a copy of the tuple in "srcslot", with
+ * columns selected by the "replaces" array replaced with data values
+ * from "values".
+ * Caution: unreplaced pass-by-ref columns in "slot" will point into the
+ * storage for "srcslot". This is OK for current usage, but someday we may
+ * need to materialize "slot" at the end to make it independent of "srcslot".
*/
static void
-slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
+slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
+ LogicalRepRelMapEntry *rel,
char **values, bool *replaces)
{
int natts = slot->tts_tupleDescriptor->natts;
SlotErrCallbackArg errarg;
ErrorContextCallback errcallback;
- slot_getallattrs(slot);
+ /* We'll fill "slot" with a virtual tuple, so we must start with ... */
ExecClearTuple(slot);
- /* Push callback + info on the error context stack */
+ /*
+ * Copy all the column data from srcslot, so that we'll have valid values
+ * for unreplaced columns.
+ */
+ Assert(natts == srcslot->tts_tupleDescriptor->natts);
+ slot_getallattrs(srcslot);
+ memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
+ memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
+
+ /* For error reporting, push callback + info on the error context stack */
errarg.rel = rel;
errarg.local_attnum = -1;
errarg.remote_attnum = -1;
/* Pop the error context stack */
error_context_stack = errcallback.previous;
+ /* And finally, declare that "slot" contains a valid virtual tuple */
ExecStoreVirtualTuple(slot);
}
{
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- ExecCopySlot(remoteslot, localslot);
- slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
+ slot_modify_cstrings(remoteslot, localslot, rel,
+ newtup.values, newtup.changed);
MemoryContextSwitchTo(oldctx);
EvalPlanQualSetSlot(&epqstate, remoteslot);
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 17;
+use Test::More tests => 20;
# Initialize publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rep (a int primary key)");
$node_publisher->safe_psql('postgres',
- "CREATE TABLE tab_mixed (a int primary key, b text)");
+ "CREATE TABLE tab_mixed (a int primary key, b text, c numeric)");
$node_publisher->safe_psql('postgres',
- "INSERT INTO tab_mixed (a, b) VALUES (1, 'foo')");
+ "INSERT INTO tab_mixed (a, b, c) VALUES (1, 'foo', 1.1)");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))"
);
# different column count and order than on publisher
$node_subscriber->safe_psql('postgres',
- "CREATE TABLE tab_mixed (c text, b text, a int primary key)");
+ "CREATE TABLE tab_mixed (d text default 'local', c numeric, b text, a int primary key)"
+);
# replication of the table with included index
$node_subscriber->safe_psql('postgres',
$node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a");
$node_publisher->safe_psql('postgres',
- "INSERT INTO tab_mixed VALUES (2, 'bar')");
+ "INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_include SELECT generate_series(1,50)");
"SELECT count(*), min(a), max(a) FROM tab_rep");
is($result, qq(20|-20|-1), 'check replicated changes on subscriber');
-$result =
- $node_subscriber->safe_psql('postgres', "SELECT c, b, a FROM tab_mixed");
-is( $result, qq(|foo|1
-|bar|2), 'check replicated changes with different column order');
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_mixed");
+is( $result, qq(local|1.1|foo|1
+local|2.2|bar|2), 'check replicated changes with different column order');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_include");
"ALTER TABLE tab_ins REPLICA IDENTITY FULL");
$node_subscriber->safe_psql('postgres',
"ALTER TABLE tab_ins REPLICA IDENTITY FULL");
+# tab_mixed can use DEFAULT, since it has a primary key
# and do the updates
$node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a");
$node_publisher->safe_psql('postgres',
"UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_mixed SET b = 'baz' WHERE a = 1");
$node_publisher->wait_for_catchup('tap_sub');
bb),
'update works with REPLICA IDENTITY FULL and text datums');
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab_mixed ORDER BY a");
+is( $result, qq(local|1.1|baz|1
+local|2.2|bar|2),
+ 'update works with different column order and subscriber local values');
+
+# check behavior with dropped columns
+
+$node_publisher->safe_psql('postgres', "ALTER TABLE tab_mixed DROP COLUMN b");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_mixed SET c = 11.11 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab_mixed ORDER BY a");
+is( $result, qq(local|11.11|baz|1
+local|2.2|bar|2),
+ 'update works with dropped publisher column');
+
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_mixed DROP COLUMN d");
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_mixed SET c = 22.22 WHERE a = 2");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab_mixed ORDER BY a");
+is( $result, qq(11.11|baz|1
+22.22|bar|2),
+ 'update works with dropped subscriber column');
+
# check that change of connection string and/or publication list causes
# restart of subscription workers. Not all of these are registered as tests
# as we need to poll for a change but the test suite will fail none the less