static HTAB *last_start_times = NULL;
ListCell *lc;
bool started_tx = false;
+ bool should_exit = false;
Assert(!IsTransactionState());
last_start_times = NULL;
}
- /*
- * Even when the two_phase mode is requested by the user, it remains as
- * 'pending' until all tablesyncs have reached READY state.
- *
- * When this happens, we restart the apply worker and (if the conditions
- * are still ok) then the two_phase tri-state will become 'enabled' at
- * that time.
- *
- * Note: If the subscription has no tables then leave the state as
- * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
- * work.
- */
- if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
- AllTablesyncsReady())
- {
- ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
- MySubscription->name)));
-
- proc_exit(0);
- }
-
/*
* Process all tables that are being synchronized.
*/
if (started_tx)
{
+ /*
+ * Even when the two_phase mode is requested by the user, it remains
+ * as 'pending' until all tablesyncs have reached READY state.
+ *
+ * When this happens, we restart the apply worker and (if the
+ * conditions are still ok) then the two_phase tri-state will become
+ * 'enabled' at that time.
+ *
+ * Note: If the subscription has no tables then leave the state as
+ * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+ * work.
+ */
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
+ {
+ CommandCounterIncrement(); /* make updates visible */
+ if (AllTablesyncsReady())
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
+ MySubscription->name)));
+ should_exit = true;
+ }
+ }
+
CommitTransactionCommand();
pgstat_report_stat(true);
}
+
+ if (should_exit)
+ proc_exit(0);
}
/*
TupleTableSlot *tslot;
Oid attrsRow[] = {INT2VECTOROID};
StringInfoData pub_names;
+
initStringInfo(&pub_names);
foreach(lc, MySubscription->publications)
{