Skip to content

Commit 1ef0ab0

Browse files
mnenciaarmru
andauthored
fix: make waitForWalArchiveWorking resilient to connection errors (cloudnative-pg#1399)
Close: cloudnative-pg#1398 Signed-off-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com> Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com> Co-authored-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
1 parent 20a004f commit 1ef0ab0

File tree

5 files changed

+217
-67
lines changed

5 files changed

+217
-67
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/cloudnative-pg/cloudnative-pg
33
go 1.19
44

55
require (
6+
github.com/DATA-DOG/go-sqlmock v1.5.0
67
github.com/Masterminds/semver/v3 v3.2.0
78
github.com/avast/retry-go/v4 v4.3.2
89
github.com/blang/semver v3.5.1+incompatible

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
3333
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
3434
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
3535
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
36+
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
37+
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
3638
github.com/Masterminds/semver/v3 v3.2.0 h1:3MEsd0SM6jqZojhjLWWeBY+Kcjy9i6MQAeY7YgDP83g=
3739
github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
3840
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=

pkg/management/postgres/backup.go

+4-67
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ package postgres
1818

1919
import (
2020
"context"
21-
"database/sql"
22-
"errors"
2321
"fmt"
2422
"os"
2523
"os/exec"
@@ -177,69 +175,6 @@ func (b *BackupCommand) getBarmanCloudBackupOptions(
177175
return options, nil
178176
}
179177

180-
// waitForWalArchiveWorking retry until the wal archiving is working or the timeout occur
181-
func waitForWalArchiveWorking(instance *Instance) error {
182-
db, err := sql.Open(
183-
"pgx",
184-
fmt.Sprintf("%s dbname=%s", instance.GetPrimaryConnInfo(), "postgres"),
185-
)
186-
if err != nil {
187-
log.Error(err, "can not open postgres database")
188-
return err
189-
}
190-
defer func() {
191-
err = db.Close()
192-
if err != nil {
193-
log.Error(err, "Error while closing connection")
194-
}
195-
}()
196-
197-
walError := errors.New("wal-archive not working")
198-
199-
firstWalArchiveTriggered := false
200-
201-
return retry.OnError(retryUntilWalArchiveWorking, func(err error) bool {
202-
return errors.Is(err, walError)
203-
}, func() error {
204-
row := db.QueryRow("SELECT COALESCE(last_archived_time,'-infinity') > " +
205-
"COALESCE(last_failed_time, '-infinity') AS is_archiving, last_failed_time IS NOT NULL " +
206-
"FROM pg_stat_archiver")
207-
208-
var walArchivingWorking, lastFailedTimePresent bool
209-
210-
if err := row.Scan(&walArchivingWorking, &lastFailedTimePresent); err != nil {
211-
log.Error(err, "can't get WAL archiving status")
212-
return err
213-
}
214-
215-
if walArchivingWorking {
216-
log.Info("WAL archiving is working, proceeding with the backup")
217-
return nil
218-
}
219-
220-
if lastFailedTimePresent {
221-
log.Info("WAL archiving is not working, will retry in one minute")
222-
return walError
223-
}
224-
225-
if firstWalArchiveTriggered {
226-
log.Info("Waiting for the first WAL file to be archived")
227-
return walError
228-
}
229-
230-
log.Info("Triggering the first WAL file to be archived")
231-
if _, err := db.Exec("CHECKPOINT"); err != nil {
232-
return fmt.Errorf("error while requiring a checkpoint: %w", err)
233-
}
234-
235-
if _, err := db.Exec("SELECT pg_switch_wal()"); err != nil {
236-
return fmt.Errorf("error while switching to a new WAL: %w", err)
237-
}
238-
firstWalArchiveTriggered = true
239-
return walError
240-
})
241-
}
242-
243178
// Start initiates a backup for this instance using
244179
// barman-cloud-backup
245180
func (b *BackupCommand) Start(ctx context.Context) error {
@@ -254,8 +189,10 @@ func (b *BackupCommand) Start(ctx context.Context) error {
254189
return fmt.Errorf("can't set backup as running: %v", err)
255190
}
256191

257-
err = waitForWalArchiveWorking(b.Instance)
258-
if err != nil {
192+
if err := newWalArchiveBootstrapper().
193+
withTimeout(&retryUntilWalArchiveWorking).
194+
withInstanceDBProvider(b.Instance).
195+
execute(); err != nil {
259196
log.Warning("WAL archiving is not working", "err", err)
260197
b.Backup.GetStatus().Phase = apiv1.BackupPhaseWalArchivingFailing
261198
return UpdateBackupStatusAndRetry(ctx, b.Client, b.Backup)

pkg/management/postgres/wal.go

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
Copyright The CloudNativePG Contributors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package postgres
18+
19+
import (
20+
"database/sql"
21+
"errors"
22+
"fmt"
23+
24+
"k8s.io/apimachinery/pkg/util/wait"
25+
"k8s.io/client-go/util/retry"
26+
27+
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/log"
28+
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources"
29+
)
30+
31+
type dbProvider func() (*sql.DB, error)
32+
33+
type walArchiveBootstrapper struct {
34+
firstWalArchiveTriggered bool
35+
backoff *wait.Backoff
36+
dbProviderFunc dbProvider
37+
}
38+
39+
func newWalArchiveBootstrapper() *walArchiveBootstrapper {
40+
return &walArchiveBootstrapper{}
41+
}
42+
43+
func (w *walArchiveBootstrapper) withTimeout(backoff *wait.Backoff) *walArchiveBootstrapper {
44+
w.backoff = backoff
45+
return w
46+
}
47+
48+
func (w *walArchiveBootstrapper) withDBProvider(provider dbProvider) *walArchiveBootstrapper {
49+
w.dbProviderFunc = provider
50+
return w
51+
}
52+
53+
func (w *walArchiveBootstrapper) withInstanceDBProvider(instance *Instance) *walArchiveBootstrapper {
54+
return w.withDBProvider(func() (*sql.DB, error) {
55+
db, openErr := sql.Open(
56+
"pgx",
57+
fmt.Sprintf("%s dbname=%s", instance.GetPrimaryConnInfo(), "postgres"),
58+
)
59+
if openErr != nil {
60+
log.Error(openErr, "can not open postgres database")
61+
return nil, openErr
62+
}
63+
return db, nil
64+
})
65+
}
66+
67+
func (w *walArchiveBootstrapper) execute() error {
68+
if w.backoff == nil {
69+
return w.tryBootstrapWal()
70+
}
71+
72+
return retry.OnError(*w.backoff, resources.RetryAlways, func() error {
73+
return w.tryBootstrapWal()
74+
})
75+
}
76+
77+
func (w *walArchiveBootstrapper) tryBootstrapWal() error {
78+
db, err := w.dbProviderFunc()
79+
if err != nil {
80+
return err
81+
}
82+
defer func() {
83+
if closeErr := db.Close(); closeErr != nil {
84+
log.Error(closeErr, "Error while closing connection")
85+
}
86+
}()
87+
88+
row := db.QueryRow("SELECT COALESCE(last_archived_time,'-infinity') > " +
89+
"COALESCE(last_failed_time, '-infinity') AS is_archiving, last_failed_time IS NOT NULL " +
90+
"FROM pg_stat_archiver")
91+
92+
var walArchivingWorking, lastFailedTimePresent bool
93+
94+
if err := row.Scan(&walArchivingWorking, &lastFailedTimePresent); err != nil {
95+
log.Error(err, "can't get WAL archiving status")
96+
return err
97+
}
98+
99+
if walArchivingWorking {
100+
log.Info("WAL archiving is working, proceeding with the backup")
101+
return nil
102+
}
103+
104+
if lastFailedTimePresent {
105+
log.Info("WAL archiving is not working, will retry in one minute")
106+
return errors.New("wal-archive not working")
107+
}
108+
109+
if w.firstWalArchiveTriggered {
110+
log.Info("Waiting for the first WAL file to be archived")
111+
return errors.New("waiting for first wal-archive")
112+
}
113+
114+
log.Info("Triggering the first WAL file to be archived")
115+
if _, err := db.Exec("CHECKPOINT"); err != nil {
116+
return fmt.Errorf("error while requiring a checkpoint: %w", err)
117+
}
118+
119+
if _, err := db.Exec("SELECT pg_switch_wal()"); err != nil {
120+
return fmt.Errorf("error while switching to a new WAL: %w", err)
121+
}
122+
123+
w.firstWalArchiveTriggered = true
124+
return errors.New("first wal-archive triggered")
125+
}

pkg/management/postgres/wal_test.go

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
Copyright The CloudNativePG Contributors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package postgres
18+
19+
import (
20+
"database/sql"
21+
"errors"
22+
23+
"github.com/DATA-DOG/go-sqlmock"
24+
25+
. "github.com/onsi/ginkgo/v2"
26+
. "github.com/onsi/gomega"
27+
)
28+
29+
var _ = Describe("ensure isWalArchiveWorking works correctly", func() {
30+
const flexibleCoalescenceQuery = "SELECT COALESCE.*FROM pg_stat_archiver"
31+
var (
32+
db *sql.DB
33+
mock sqlmock.Sqlmock
34+
fakeResult = sqlmock.NewResult(0, 1)
35+
bootstrapper walArchiveBootstrapper
36+
)
37+
38+
BeforeEach(func() {
39+
var err error
40+
db, mock, err = sqlmock.New()
41+
Expect(err).ToNot(HaveOccurred())
42+
43+
bootstrapper = walArchiveBootstrapper{
44+
dbProviderFunc: func() (*sql.DB, error) {
45+
return db, nil
46+
},
47+
}
48+
})
49+
50+
It("returns nil if WAL archiving is working", func() {
51+
bootstrapper.firstWalArchiveTriggered = true
52+
rows := sqlmock.NewRows([]string{"is_archiving", "last_failed_time_present"}).
53+
AddRow(true, false)
54+
mock.ExpectQuery(flexibleCoalescenceQuery).WillReturnRows(rows)
55+
56+
err := bootstrapper.tryBootstrapWal()
57+
Expect(err).To(BeNil())
58+
Expect(mock.ExpectationsWereMet()).To(BeNil())
59+
})
60+
61+
It("returns an error if WAL archiving is not working and last_failed_time is present", func() {
62+
rows := sqlmock.NewRows([]string{"is_archiving", "last_failed_time_present"}).
63+
AddRow(false, true)
64+
mock.ExpectQuery(flexibleCoalescenceQuery).WillReturnRows(rows)
65+
66+
err := bootstrapper.tryBootstrapWal()
67+
Expect(err).To(Equal(errors.New("wal-archive not working")))
68+
Expect(mock.ExpectationsWereMet()).To(BeNil())
69+
})
70+
71+
It("triggers the first WAL archive if it has not been triggered", func() {
72+
// set up mock expectations
73+
rows := sqlmock.NewRows([]string{"is_archiving", "last_failed_time_present"}).AddRow(false, false)
74+
mock.ExpectQuery(flexibleCoalescenceQuery).WillReturnRows(rows)
75+
mock.ExpectExec("CHECKPOINT").WillReturnResult(fakeResult)
76+
mock.ExpectExec("SELECT pg_switch_wal()").WillReturnResult(fakeResult)
77+
78+
// Call the function
79+
err := bootstrapper.tryBootstrapWal()
80+
Expect(err).To(Equal(errors.New("first wal-archive triggered")))
81+
82+
// Ensure the mock expectations are met
83+
Expect(mock.ExpectationsWereMet()).To(BeNil())
84+
})
85+
})

0 commit comments

Comments
 (0)