Skip to content

Commit 10f630e

Browse files
mnenciaNiccoloFeiwadlejitendrasxdjsilvela
authored
feat: cluster-managed replication slots for High Availability (cloudnative-pg#740)
Introduce the concept of cluster-managed replication slots, starting with high availability clusters. This feature automatically manages physical replication slots for each hot standby replica in the High Availability cluster, both in the primary and the standby. We are introducing these two terms: - "Primary HA slot": a physical replication slot whose lifecycle is entirely managed by the current primary of the cluster and whose purpose is to map to a specific standby in streaming replication. Such a slot lives on the primary only. - "Standby HA slot": a physical replication slot for a standby whose lifecycle is entirely managed by another standby in the cluster, based on the content of the `pg_replication_slots` view in the primary, and updated at regular intervals using `pg_replication_slot_advance()`. This feature is controlled through the following options: - `.spec.replicationSlots.highAvailability.enabled`: if true, the feature is enabled (`false` is the default) - `.spec.replicationSlots.highAvailability.slotPrefix`: the prefix that identifies replication slots managed by the operator for this feature (default: `_cnpg_`) - `.spec.replicationSlots.updateInterval`: how often the standby synchronizes the position of the local copy of the replication slots with the position on the current primary, expressed in seconds (default: 30) IMPORTANT: this capability requires PostgreSQL 11 or higher, as it relies on the `pg_replication_slot_advance()` administration function to directly manipulate the position of a replication slot. In PostgreSQL 11, enabling replication slots if initially disabled, or conversely disabling them if initially enabled, will require a rolling update of the cluster (due to the presence of the `recovery.conf` file that is only read at startup). Closes cloudnative-pg#8 ABOUT REPLICATION SLOTS IN POSTGRESQL: replication slots are a native PostgreSQL feature introduced in 9.4 that provides an automated way to ensure that the primary does not remove WAL segments until all the attached streaming replication clients have received them, and that the primary does not remove rows which could cause a recovery conflict even when the standby is (temporarily) disconnected. A replication slot exists solely on the instance that created it, and PostgreSQL does not replicate it on the standby servers. As a result, after a failover or a switchover, the new primary does not contain the replication slot from the old primary. This can create problems for the streaming replication clients that were connected to the old primary and have lost their slot. More information at: https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS Signed-off-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com> Signed-off-by: Niccolò Fei <niccolo.fei@enterprisedb.com> Signed-off-by: Jitendra Wadle <jitendra.wadle@enterprisedb.com> Signed-off-by: Jonathan Gonzalez V <jonathan.gonzalez@enterprisedb.com> Signed-off-by: Gabriele Bartolini <gabriele.bartolini@enterprisedb.com> Signed-off-by: Jaime Silvela <jaime.silvela@enterprisedb.com> Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com> Signed-off-by: Tao Li <tao.li@enterprisedb.com> Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com> Signed-off-by: Philippe Scorsolini <philippe.scorsolini@enterprisedb.com> Co-authored-by: Niccolò Fei <niccolo.fei@enterprisedb.com> Co-authored-by: Jitendra Wadle <jitendra.wadle@enterprisedb.com> Co-authored-by: Jonathan Gonzalez V <jonathan.gonzalez@enterprisedb.com> Co-authored-by: Jaime Silvela <jaime.silvela@enterprisedb.com> Co-authored-by: Armando Ruocco <armando.ruocco@enterprisedb.com> Co-authored-by: Gabriele Bartolini <gabriele.bartolini@enterprisedb.com> Co-authored-by: Tao Li <tao.li@enterprisedb.com> Co-authored-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com> Co-authored-by: Philippe Scorsolini <philippe.scorsolini@enterprisedb.com>
1 parent 932e770 commit 10f630e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2378
-77
lines changed

.wordlist-en-custom.txt

+4
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ RedHat
243243
RedHat's
244244
ReplicaClusterConfiguration
245245
ReplicaSet
246+
ReplicationSlotsConfiguration
247+
ReplicationSlotsHAConfiguration
246248
ReplicationTLSSecret
247249
ResizingPVC
248250
ResourceRequirements
@@ -528,6 +530,7 @@ gzip
528530
hba
529531
hdr
530532
healthz
533+
highAvailability
531534
historyTags
532535
horikyota
533536
hostPort
@@ -749,6 +752,7 @@ recoverytarget
749752
recv
750753
redhat
751754
relatime
755+
replicationSlots
752756
replicationTLSSecret
753757
repmgr
754758
reportNonRedacted

api/v1/cluster_types.go

+104-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ package v1
1919
import (
2020
"context"
2121
"fmt"
22+
"regexp"
23+
"strings"
24+
"time"
2225

2326
corev1 "k8s.io/api/core/v1"
2427
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -157,6 +160,9 @@ type ClusterSpec struct {
157160
// +optional
158161
PostgresConfiguration PostgresConfiguration `json:"postgresql,omitempty"`
159162

163+
// Replication slots management configuration
164+
ReplicationSlots *ReplicationSlotsConfiguration `json:"replicationSlots,omitempty"`
165+
160166
// Instructions to bootstrap this cluster
161167
// +optional
162168
Bootstrap *BootstrapConfiguration `json:"bootstrap,omitempty"`
@@ -423,6 +429,9 @@ type ClusterStatus struct {
423429

424430
// Conditions for cluster object
425431
Conditions []metav1.Condition `json:"conditions,omitempty"`
432+
433+
// List of instance names in the cluster
434+
InstanceNames []string `json:"instanceNames,omitempty"`
426435
}
427436

428437
// InstanceReportedState describes the last reported state of an instance during a reconciliation loop
@@ -523,6 +532,86 @@ type ReplicaClusterConfiguration struct {
523532
Source string `json:"source"`
524533
}
525534

535+
// DefaultReplicationSlotsUpdateInterval is the default in seconds for the replication slots update interval
536+
const DefaultReplicationSlotsUpdateInterval = 30
537+
538+
// DefaultReplicationSlotsHASlotPrefix is the default prefix for names of replication slots used for HA.
539+
const DefaultReplicationSlotsHASlotPrefix = "_cnpg_"
540+
541+
// ReplicationSlotsConfiguration encapsulates the configuration
542+
// of replication slots
543+
type ReplicationSlotsConfiguration struct {
544+
// Replication slots for high availability configuration
545+
HighAvailability *ReplicationSlotsHAConfiguration `json:"highAvailability,omitempty"`
546+
547+
// Standby will update the status of the local replication slots
548+
// every `updateInterval` seconds (default 30).
549+
//+kubebuilder:default:=30
550+
//+kubebuilder:validation:Minimum=1
551+
UpdateInterval int `json:"updateInterval,omitempty"`
552+
}
553+
554+
// GetUpdateInterval returns the update interval, defaulting to DefaultReplicationSlotsUpdateInterval if empty
555+
func (r *ReplicationSlotsConfiguration) GetUpdateInterval() time.Duration {
556+
if r == nil || r.UpdateInterval <= 0 {
557+
return DefaultReplicationSlotsUpdateInterval
558+
}
559+
return time.Duration(r.UpdateInterval) * time.Second
560+
}
561+
562+
// ReplicationSlotsHAConfiguration encapsulates the configuration
563+
// of the replication slots that are automatically managed by
564+
// the operator to control the streaming replication connections
565+
// with the standby instances for high availability (HA) purposes.
566+
// Replication slots are a PostgreSQL feature that makes sure
567+
// that PostgreSQL automatically keeps WAL files in the primary
568+
// when a streaming client (in this specific case a replica that
569+
// is part of the HA cluster) gets disconnected.
570+
type ReplicationSlotsHAConfiguration struct {
571+
// If enabled, the operator will automatically manage replication slots
572+
// on the primary instance and use them in streaming replication
573+
// connections with all the standby instances that are part of the HA
574+
// cluster. If disabled (default), the operator will not take advantage
575+
// of replication slots in streaming connections with the replicas.
576+
// This feature also controls replication slots in replica cluster,
577+
// from the designated primary to its cascading replicas. This can only
578+
// be set at creation time.
579+
// +optional
580+
Enabled bool `json:"enabled"`
581+
582+
// Prefix for replication slots managed by the operator for HA.
583+
// It may only contain lower case letters, numbers, and the underscore character.
584+
// This can only be set at creation time. By default set to `_cnpg_`.
585+
//+kubebuilder:default:=_cnpg_
586+
//+kubebuilder:validation:Pattern=^[0-9a-z_]*$
587+
SlotPrefix string `json:"slotPrefix,omitempty"`
588+
}
589+
590+
// GetSlotPrefix returns the HA slot prefix, defaulting to DefaultReplicationSlotsHASlotPrefix if empty
591+
func (r *ReplicationSlotsHAConfiguration) GetSlotPrefix() string {
592+
if r == nil || r.SlotPrefix == "" {
593+
return DefaultReplicationSlotsHASlotPrefix
594+
}
595+
return r.SlotPrefix
596+
}
597+
598+
// GetSlotNameFromInstanceName returns the slot name, given the instance name.
599+
// It returns an empty string if High Availability Replication Slots are disabled
600+
func (r *ReplicationSlotsHAConfiguration) GetSlotNameFromInstanceName(instanceName string) string {
601+
if r == nil || !r.Enabled {
602+
return ""
603+
}
604+
605+
slotName := fmt.Sprintf(
606+
"%s%s",
607+
r.GetSlotPrefix(),
608+
instanceName,
609+
)
610+
sanitizedName := slotNameNegativeRegex.ReplaceAllString(strings.ToLower(slotName), "_")
611+
612+
return sanitizedName
613+
}
614+
526615
// KubernetesUpgradeStrategy tells the operator if the user want to
527616
// allocate more space while upgrading a k8s node which is hosting
528617
// the PostgreSQL Pods or just wait for the node to come up
@@ -999,7 +1088,7 @@ type StorageConfiguration struct {
9991088
//
10001089
// In future synchronous replica election restriction by name will be supported.
10011090
type SyncReplicaElectionConstraints struct {
1002-
// This flag enabled the constraints for sync replicas
1091+
// This flag enables the constraints for sync replicas
10031092
Enabled bool `json:"enabled"`
10041093

10051094
// A list of node labels values to extract and compare to evaluate if the pods reside in the same topology or not
@@ -1894,6 +1983,20 @@ func (cluster Cluster) IsReplica() bool {
18941983
return cluster.Spec.ReplicaCluster != nil && cluster.Spec.ReplicaCluster.Enabled
18951984
}
18961985

1986+
var slotNameNegativeRegex = regexp.MustCompile("[^a-z0-9_]+")
1987+
1988+
// GetSlotNameFromInstanceName returns the slot name, given the instance name.
1989+
// It returns an empty string if High Availability Replication Slots are disabled
1990+
func (cluster Cluster) GetSlotNameFromInstanceName(instanceName string) string {
1991+
if cluster.Spec.ReplicationSlots == nil ||
1992+
cluster.Spec.ReplicationSlots.HighAvailability == nil ||
1993+
!cluster.Spec.ReplicationSlots.HighAvailability.Enabled {
1994+
return ""
1995+
}
1996+
1997+
return cluster.Spec.ReplicationSlots.HighAvailability.GetSlotNameFromInstanceName(instanceName)
1998+
}
1999+
18972000
// GetBarmanEndpointCAForReplicaCluster checks if this is a replica cluster which needs barman endpoint CA
18982001
func (cluster Cluster) GetBarmanEndpointCAForReplicaCluster() *SecretKeySelector {
18992002
if !cluster.IsReplica() {

api/v1/cluster_types_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -795,3 +795,51 @@ var _ = Describe("Barman credentials", func() {
795795
}.ArePopulated()).To(BeTrue())
796796
})
797797
})
798+
799+
var _ = Describe("Replication slots names for instances", func() {
800+
It("returns an empty name when no replication slots are configured", func() {
801+
cluster := Cluster{}
802+
Expect(cluster.GetSlotNameFromInstanceName("cluster-example-1")).To(BeEmpty())
803+
804+
cluster = Cluster{
805+
Spec: ClusterSpec{
806+
ReplicationSlots: &ReplicationSlotsConfiguration{
807+
HighAvailability: nil,
808+
UpdateInterval: 0,
809+
},
810+
},
811+
}
812+
Expect(cluster.GetSlotNameFromInstanceName("cluster-example-1")).To(BeEmpty())
813+
})
814+
815+
It("returns the name of the slot for an instance when they are configured", func() {
816+
cluster := Cluster{
817+
Spec: ClusterSpec{
818+
ReplicationSlots: &ReplicationSlotsConfiguration{
819+
HighAvailability: &ReplicationSlotsHAConfiguration{
820+
Enabled: true,
821+
},
822+
UpdateInterval: 0,
823+
},
824+
},
825+
}
826+
Expect(cluster.GetSlotNameFromInstanceName("cluster-example-1")).To(Equal(
827+
"_cnpg_cluster_example_1"))
828+
})
829+
830+
It("sanitizes the name of replication slots", func() {
831+
cluster := Cluster{
832+
Spec: ClusterSpec{
833+
ReplicationSlots: &ReplicationSlotsConfiguration{
834+
HighAvailability: &ReplicationSlotsHAConfiguration{
835+
Enabled: true,
836+
SlotPrefix: "%232'test_",
837+
},
838+
UpdateInterval: 0,
839+
},
840+
},
841+
}
842+
Expect(cluster.GetSlotNameFromInstanceName("cluster-example-1")).To(Equal(
843+
"_232_test_cluster_example_1"))
844+
})
845+
})

api/v1/cluster_webhook.go

+65
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ func (r *Cluster) Validate() (allErrs field.ErrorList) {
292292
r.validateBackupConfiguration,
293293
r.validateConfiguration,
294294
r.validateLDAP,
295+
r.validateReplicationSlots,
295296
}
296297

297298
for _, validate := range validations {
@@ -337,6 +338,7 @@ func (r *Cluster) ValidateChanges(old *Cluster) (allErrs field.ErrorList) {
337338
allErrs = append(allErrs, r.validateWalStorageChange(old)...)
338339
allErrs = append(allErrs, r.validateReplicaModeChange(old)...)
339340
allErrs = append(allErrs, r.validateUnixPermissionIdentifierChange(old)...)
341+
allErrs = append(allErrs, r.validateReplicationSlotsChange(old)...)
340342
return allErrs
341343
}
342344

@@ -1535,6 +1537,69 @@ func (r *Cluster) validateRecoveryAndBackupTarget() field.ErrorList {
15351537
return allErrors
15361538
}
15371539

1540+
func (r *Cluster) validateReplicationSlots() field.ErrorList {
1541+
replicationSlots := r.Spec.ReplicationSlots
1542+
if replicationSlots == nil ||
1543+
replicationSlots.HighAvailability == nil ||
1544+
!replicationSlots.HighAvailability.Enabled {
1545+
return nil
1546+
}
1547+
1548+
psqlVersion, err := r.GetPostgresqlVersion()
1549+
if err != nil {
1550+
// The validation error will be already raised by the
1551+
// validateImageName function
1552+
return nil
1553+
}
1554+
1555+
if psqlVersion >= 110000 {
1556+
return nil
1557+
}
1558+
1559+
return field.ErrorList{
1560+
field.Invalid(
1561+
field.NewPath("spec", "replicationSlots", "highAvailability", "enabled"),
1562+
replicationSlots.HighAvailability.Enabled,
1563+
"Cannot enable replication slot high availability. It requires PostgreSQL 11 or above"),
1564+
}
1565+
}
1566+
1567+
func (r *Cluster) validateReplicationSlotsChange(old *Cluster) field.ErrorList {
1568+
newReplicationSlots := r.Spec.ReplicationSlots
1569+
oldReplicationSlots := old.Spec.ReplicationSlots
1570+
1571+
if oldReplicationSlots == nil || oldReplicationSlots.HighAvailability == nil ||
1572+
!oldReplicationSlots.HighAvailability.Enabled {
1573+
return nil
1574+
}
1575+
1576+
var errs field.ErrorList
1577+
1578+
// when disabling we should check that the prefix it's not removed, and it doesn't change to
1579+
// properly execute the cleanup logic
1580+
if newReplicationSlots == nil || newReplicationSlots.HighAvailability == nil {
1581+
path := field.NewPath("spec", "replicationSlots")
1582+
if newReplicationSlots != nil {
1583+
path = path.Child("highAvailability")
1584+
}
1585+
errs = append(errs,
1586+
field.Invalid(
1587+
path,
1588+
nil,
1589+
fmt.Sprintf("Cannot remove %v section while highAvailability is enabled", path)),
1590+
)
1591+
} else if oldReplicationSlots.HighAvailability.SlotPrefix != newReplicationSlots.HighAvailability.SlotPrefix {
1592+
errs = append(errs,
1593+
field.Invalid(
1594+
field.NewPath("spec", "replicationSlots", "highAvailability", "slotPrefix"),
1595+
newReplicationSlots.HighAvailability.SlotPrefix,
1596+
"Cannot change replication slot prefix while highAvailability is enabled"),
1597+
)
1598+
}
1599+
1600+
return errs
1601+
}
1602+
15381603
// validateAzureCredentials checks and validates the azure credentials
15391604
func (azure *AzureCredentials) validateAzureCredentials(path *field.Path) field.ErrorList {
15401605
allErrors := field.ErrorList{}

0 commit comments

Comments
 (0)