@@ -2493,70 +2493,114 @@ func AssertPvcHasLabels(
2493
2493
})
2494
2494
}
2495
2495
2496
- // AssertRepSlotsOnPod checks if all the required replication slot exists in a given pod,
2496
+ // AssertReplicationSlotsOnPod checks that all the required replication slots exist in a given pod,
2497
2497
// and that obsolete slots are correctly deleted (post management operations).
2498
- // In case we are targeting the primary, it will also check if the slot is active.
2499
- func AssertRepSlotsOnPod (
2498
+ // In the primary, it will also check if the slots are active.
2499
+ func AssertReplicationSlotsOnPod (
2500
2500
namespace ,
2501
2501
clusterName string ,
2502
2502
pod corev1.Pod ,
2503
2503
) {
2504
- expectedSlots , err := testsUtils .GetExpectedRepSlotsOnPod (namespace , clusterName , pod .GetName (), env )
2504
+ // Replication slot high availability requires PostgreSQL 11 or above
2505
+ if env .PostgresVersion == 10 {
2506
+ Skip ("Ignoring replication slots verification for postgres 10" )
2507
+ return
2508
+ }
2509
+
2510
+ expectedSlots , err := testsUtils .GetExpectedReplicationSlotsOnPod (namespace , clusterName , pod .GetName (), env )
2505
2511
Expect (err ).ToNot (HaveOccurred ())
2506
2512
2507
2513
Eventually (func () ([]string , error ) {
2508
- currentSlots , err := testsUtils .GetRepSlotsOnPod (namespace , pod .GetName (), env )
2514
+ currentSlots , err := testsUtils .GetReplicationSlotsOnPod (namespace , pod .GetName (), env )
2509
2515
return currentSlots , err
2510
2516
}, 300 ).Should (BeEquivalentTo (expectedSlots ),
2511
- fmt .Sprintf (
2512
- "List of expected replication slots on %v pod %v" ,
2513
- pod .Labels ["role" ],
2514
- pod .GetName ()))
2517
+ func () string {
2518
+ return testsUtils .PrintReplicationSlots (namespace , clusterName , env )
2519
+ })
2515
2520
2516
2521
for _ , slot := range expectedSlots {
2517
2522
query := fmt .Sprintf (
2518
2523
"SELECT EXISTS (SELECT 1 FROM pg_replication_slots " +
2519
2524
"WHERE slot_name = '%v' AND active = 'f' " +
2520
2525
"AND temporary = 'f' AND slot_type = 'physical')" , slot )
2521
- description := fmt .Sprintf (
2522
- "On %v pod %v, expect replication slot %v to exist and be inactive" ,
2523
- pod .Labels ["role" ],
2524
- pod .GetName (),
2525
- slot )
2526
2526
if specs .IsPodPrimary (pod ) {
2527
2527
query = fmt .Sprintf (
2528
2528
"SELECT EXISTS (SELECT 1 FROM pg_replication_slots " +
2529
2529
"WHERE slot_name = '%v' AND active = 't' " +
2530
2530
"AND temporary = 'f' AND slot_type = 'physical')" , slot )
2531
- description = fmt .Sprintf (
2532
- "On %v pod %v, expect replication slot %v to exist and be active" ,
2533
- pod .Labels ["role" ],
2534
- pod .GetName (),
2535
- slot )
2536
2531
}
2537
2532
Eventually (func () (string , error ) {
2538
2533
stdout , _ , err := testsUtils .RunQueryFromPod (& pod , testsUtils .PGLocalSocketDir ,
2539
2534
"app" , "postgres" , "''" , query , env )
2540
2535
return strings .TrimSpace (stdout ), err
2541
- }, 300 ).Should (BeEquivalentTo ("t" ), description )
2536
+ }, 300 ).Should (BeEquivalentTo ("t" ),
2537
+ func () string {
2538
+ return testsUtils .PrintReplicationSlots (namespace , clusterName , env )
2539
+ })
2542
2540
}
2543
2541
}
2544
2542
2545
- // AssertClusterRepSlotsAligned will compare all the replication slot restart_lsn
2543
+ // AssertClusterReplicationSlotsAligned will compare all the replication slot restart_lsn
2546
2544
// in a cluster. The assertion will succeed if they are all equivalent.
2547
- func AssertClusterRepSlotsAligned (
2545
+ func AssertClusterReplicationSlotsAligned (
2548
2546
namespace ,
2549
2547
clusterName string ,
2550
2548
) {
2549
+ // Replication slot high availability requires PostgreSQL 11 or above
2550
+ if env .PostgresVersion == 10 {
2551
+ Skip ("Ignoring replication slots verification for postgres 10" )
2552
+ }
2553
+
2551
2554
podList , err := env .GetClusterPodList (namespace , clusterName )
2552
2555
Expect (err ).ToNot (HaveOccurred ())
2553
2556
Eventually (func () bool {
2554
2557
var lsnList []string
2555
2558
for _ , pod := range podList .Items {
2556
- out , err := testsUtils .GetRepSlotsLsnOnPod (namespace , clusterName , pod , env )
2559
+ out , err := testsUtils .GetReplicationSlotLsnsOnPod (namespace , clusterName , pod , env )
2557
2560
Expect (err ).ToNot (HaveOccurred ())
2558
2561
lsnList = append (lsnList , out ... )
2559
2562
}
2560
- return testsUtils .CompareLsn (lsnList )
2561
- }, 300 ).Should (BeEquivalentTo (true ))
2563
+ return testsUtils .AreSameLsn (lsnList )
2564
+ }, 300 ).Should (BeEquivalentTo (true ),
2565
+ func () string {
2566
+ return testsUtils .PrintReplicationSlots (namespace , clusterName , env )
2567
+ })
2568
+ }
2569
+
2570
+ // AssertClusterReplicationSlots will verify if the replication slots of each pod
2571
+ // of the cluster exist and are aligned.
2572
+ func AssertClusterReplicationSlots (namespace , clusterName string ) {
2573
+ By ("verifying all cluster's replication slots exist and are aligned" , func () {
2574
+ podList , err := env .GetClusterPodList (namespace , clusterName )
2575
+ Expect (err ).ToNot (HaveOccurred ())
2576
+ for _ , pod := range podList .Items {
2577
+ AssertReplicationSlotsOnPod (namespace , clusterName , pod )
2578
+ }
2579
+ AssertClusterReplicationSlotsAligned (namespace , clusterName )
2580
+ })
2581
+ }
2582
+
2583
+ // AssertClusterRollingRestart restart given cluster
2584
+ func AssertClusterRollingRestart (namespace , clusterName string ) {
2585
+ By (fmt .Sprintf ("restarting cluster %v" , clusterName ), func () {
2586
+ cluster , err := env .GetCluster (namespace , clusterName )
2587
+ Expect (err ).ToNot (HaveOccurred ())
2588
+ clusterRestarted := cluster .DeepCopy ()
2589
+ if clusterRestarted .Annotations == nil {
2590
+ clusterRestarted .Annotations = make (map [string ]string )
2591
+ }
2592
+ clusterRestarted .Annotations [specs .ClusterRestartAnnotationName ] = time .Now ().Format (time .RFC3339 )
2593
+ clusterRestarted .ManagedFields = nil
2594
+ err = env .Client .Patch (env .Ctx , clusterRestarted , ctrlclient .MergeFrom (cluster ))
2595
+ Expect (err ).ToNot (HaveOccurred ())
2596
+ })
2597
+
2598
+ By ("waiting for the cluster to end up in upgrading state" , func () {
2599
+ // waiting for cluster phase to end up in "Upgrading cluster" state after restarting the cluster.
2600
+ Eventually (func () (bool , error ) {
2601
+ cluster , err := env .GetCluster (namespace , clusterName )
2602
+ return cluster .Status .Phase == apiv1 .PhaseUpgrade , err
2603
+ }, 120 , 3 ).Should (BeTrue ())
2604
+ })
2605
+ AssertClusterIsReady (namespace , clusterName , 300 , env )
2562
2606
}
0 commit comments