@@ -8,18 +8,30 @@ Copyright (C) 2019-2021 EnterpriseDB Corporation.
8
8
package walarchive
9
9
10
10
import (
11
+ "context"
12
+ "errors"
11
13
"fmt"
12
- "os/exec"
14
+ "os"
15
+ "path"
16
+ "path/filepath"
17
+ "strings"
18
+ "time"
13
19
14
20
"github.com/spf13/cobra"
15
21
16
22
apiv1 "github.com/EnterpriseDB/cloud-native-postgresql/api/v1"
17
23
"github.com/EnterpriseDB/cloud-native-postgresql/internal/management/cache"
18
24
cacheClient "github.com/EnterpriseDB/cloud-native-postgresql/internal/management/cache/client"
19
25
"github.com/EnterpriseDB/cloud-native-postgresql/pkg/management/barman"
20
- barmanCapabilities "github.com/EnterpriseDB/cloud-native-postgresql/pkg/management/barman/capabilities"
21
- "github.com/EnterpriseDB/cloud-native-postgresql/pkg/management/execlog"
26
+ "github.com/EnterpriseDB/cloud-native-postgresql/pkg/management/barman/archiver"
22
27
"github.com/EnterpriseDB/cloud-native-postgresql/pkg/management/log"
28
+ "github.com/EnterpriseDB/cloud-native-postgresql/pkg/postgres"
29
+ )
30
+
31
+ const (
32
+ // SpoolDirectory is the directory where we spool the WAL files that
33
+ // were pre-archived in parallel
34
+ SpoolDirectory = postgres .ScratchDataDirectory + "/wal-archive-spool"
23
35
)
24
36
25
37
// NewCmd creates the new cobra command
@@ -30,7 +42,9 @@ func NewCmd() *cobra.Command {
30
42
Args : cobra .ExactArgs (1 ),
31
43
RunE : func (cobraCmd * cobra.Command , args []string ) error {
32
44
contextLog := log .WithName ("wal-archive" )
33
- err := run (contextLog , args )
45
+ ctx := log .IntoContext (cobraCmd .Context (), contextLog )
46
+
47
+ err := run (ctx , args )
34
48
if err != nil {
35
49
contextLog .Error (err , "failed to run wal-archive command" )
36
50
return err
@@ -42,15 +56,15 @@ func NewCmd() *cobra.Command {
42
56
return & cmd
43
57
}
44
58
45
- func run (contextLog log.Logger , args []string ) error {
59
+ func run (ctx context.Context , args []string ) error {
60
+ startTime := time .Now ()
61
+ contextLog := log .FromContext (ctx )
46
62
walName := args [0 ]
47
63
48
64
var cluster * apiv1.Cluster
49
65
var err error
50
66
51
- cluster , err = cacheClient .GetCluster ()
52
- if err != nil {
53
- contextLog .Error (err , "Error while getting cluster from cache" )
67
+ if cluster , err = cacheClient .GetCluster (); err != nil {
54
68
return fmt .Errorf ("failed to get cluster: %w" , err )
55
69
}
56
70
@@ -64,53 +78,135 @@ func run(contextLog log.Logger, args []string) error {
64
78
return nil
65
79
}
66
80
67
- options , err := barmanCloudWalArchiveOptions (* cluster , cluster .Name , walName )
68
- if err != nil {
69
- contextLog .Error (err , "while getting barman-cloud-wal-archive options" )
70
- return err
81
+ maxParallel := 1
82
+ if cluster .Spec .Backup .BarmanObjectStore .Wal != nil {
83
+ maxParallel = cluster .Spec .Backup .BarmanObjectStore .Wal .MaxParallel
71
84
}
72
85
73
- env , err := cacheClient .GetEnv (cache .WALArchiveKey )
86
+ // Get environment from cache
87
+ var env []string
88
+ env , err = cacheClient .GetEnv (cache .WALArchiveKey )
74
89
if err != nil {
75
- contextLog .Error (err , "Error while getting environment from cache" )
76
90
return fmt .Errorf ("failed to get envs: %w" , err )
77
91
}
78
92
79
- contextLog .Trace ("Executing " + barmanCapabilities .BarmanCloudWalArchive ,
80
- "walName" , walName ,
81
- "currentPrimary" , cluster .Status .CurrentPrimary ,
82
- "targetPrimary" , cluster .Status .TargetPrimary ,
83
- "options" , options ,
84
- )
85
-
86
- barmanCloudWalArchiveCmd := exec .Command (barmanCapabilities .BarmanCloudWalArchive , options ... ) // #nosec G204
87
- barmanCloudWalArchiveCmd .Env = env
93
+ // Create the archiver
94
+ var walArchiver * archiver.WALArchiver
95
+ if walArchiver , err = archiver .New (cluster , env , SpoolDirectory ); err != nil {
96
+ return fmt .Errorf ("while creating the archiver: %w" , err )
97
+ }
88
98
89
- err = execlog .RunStreaming (barmanCloudWalArchiveCmd , barmanCapabilities .BarmanCloudWalArchive )
99
+ // Step 1: check if this WAL file has not been already archived
100
+ var isDeletedFromSpool bool
101
+ isDeletedFromSpool , err = walArchiver .DeleteFromSpool (walName )
90
102
if err != nil {
91
- contextLog .Error (err , "Error invoking " + barmanCapabilities .BarmanCloudWalArchive ,
103
+ return fmt .Errorf ("while testing the existence of the WAL file in the spool directory: %w" , err )
104
+ }
105
+ if isDeletedFromSpool {
106
+ contextLog .Info ("Archived WAL file (parallel)" ,
92
107
"walName" , walName ,
93
108
"currentPrimary" , cluster .Status .CurrentPrimary ,
94
- "targetPrimary" , cluster .Status .TargetPrimary ,
95
- "options" , options ,
96
- "exitCode" , barmanCloudWalArchiveCmd .ProcessState .ExitCode (),
97
- )
98
- return fmt .Errorf ("unexpected failure invoking %s: %w" , barmanCapabilities .BarmanCloudWalArchive , err )
109
+ "targetPrimary" , cluster .Status .TargetPrimary )
110
+ return nil
111
+ }
112
+
113
+ // Step 3: gather the WAL files names to archive
114
+ walFilesList := gatherWALFilesToArchive (ctx , walName , maxParallel )
115
+
116
+ options , err := barmanCloudWalArchiveOptions (cluster , cluster .Name )
117
+ if err != nil {
118
+ log .Error (err , "while getting barman-cloud-wal-archive options" )
119
+ return err
99
120
}
100
121
101
- contextLog .Info ("Archived WAL file" ,
102
- "walName" , walName ,
103
- "currentPrimary" , cluster .Status .CurrentPrimary ,
104
- "targetPrimary" , cluster .Status .TargetPrimary ,
105
- )
122
+ // Step 4: archive the WAL files in parallel
123
+ uploadStartTime := time .Now ()
124
+ walStatus := walArchiver .ArchiveList (walFilesList , options )
125
+ if len (walStatus ) > 1 {
126
+ contextLog .Info ("Completed archive command (parallel)" ,
127
+ "walsCount" , len (walStatus ),
128
+ "startTime" , startTime ,
129
+ "uploadStartTime" , uploadStartTime ,
130
+ "uploadTotalTime" , time .Since (uploadStartTime ),
131
+ "totalTime" , time .Since (startTime ))
132
+ }
133
+
134
+ // We return only the first error to PostgreSQL, because the first error
135
+ // is the one raised by the file that PostgreSQL has requested to archive.
136
+ // The other errors are related to WAL files that were pre-archived as
137
+ // a performance optimization and are just logged
138
+ return walStatus [0 ].Err
139
+ }
140
+
141
+ // gatherWALFilesToArchive reads from the archived status the list of WAL files
142
+ // that can be archived in parallel way.
143
+ // `requestedWALFile` is the name of the file whose archiving was requested by
144
+ // PostgreSQL, and that file is always the first of the list and is always included.
145
+ // `parallel` is the maximum number of WALs that we can archive in parallel
146
+ func gatherWALFilesToArchive (ctx context.Context , requestedWALFile string , parallel int ) (walList []string ) {
147
+ contextLog := log .FromContext (ctx )
148
+ pgWalDirectory := path .Join (os .Getenv ("PGDATA" ), "pg_wal" )
149
+ archiveStatusPath := path .Join (pgWalDirectory , "archive_status" )
150
+ noMoreWALFilesNeeded := errors .New ("no more files needed" )
151
+
152
+ // slightly more optimized, but equivalent to:
153
+ // walList = []string{requestedWALFile}
154
+ walList = make ([]string , 1 , 1 + parallel )
155
+ walList [0 ] = requestedWALFile
156
+
157
+ err := filepath .WalkDir (archiveStatusPath , func (path string , d os.DirEntry , err error ) error {
158
+ // If err is set, it means the current path is a directory and the readdir raised an error
159
+ // The only available option here is to skip the path and log the error.
160
+ if err != nil {
161
+ contextLog .Error (err , "failed reading path" , "path" , path )
162
+ return filepath .SkipDir
163
+ }
164
+
165
+ if len (walList ) >= parallel {
166
+ return noMoreWALFilesNeeded
167
+ }
168
+
169
+ // We don't process directories beside the archive status path
170
+ if d .IsDir () {
171
+ // We want to proceed exploring the archive status folder
172
+ if path == archiveStatusPath {
173
+ return nil
174
+ }
175
+
176
+ return filepath .SkipDir
177
+ }
178
+
179
+ // We only process ready files
180
+ if ! strings .HasSuffix (path , ".ready" ) {
181
+ return nil
182
+ }
183
+
184
+ walFileName := strings .TrimSuffix (filepath .Base (path ), ".ready" )
185
+
186
+ // We are already archiving the requested WAL file,
187
+ // and we need to avoid archiving it twice.
188
+ // requestedWALFile is usually "pg_wal/wal_file_name" and
189
+ // we compare it with the path we read
190
+ if strings .HasSuffix (requestedWALFile , walFileName ) {
191
+ return nil
192
+ }
193
+
194
+ walList = append (walList , filepath .Join ("pg_wal" , walFileName ))
195
+ return nil
196
+ })
197
+
198
+ // In this point err must be nil or noMoreWALFilesNeeded, if it is something different
199
+ // there is a programming error
200
+ if err != nil && err != noMoreWALFilesNeeded {
201
+ contextLog .Error (err , "unexpected error while reading the list of WAL files to archive" )
202
+ }
106
203
107
- return nil
204
+ return walList
108
205
}
109
206
110
207
func barmanCloudWalArchiveOptions (
111
- cluster apiv1.Cluster ,
208
+ cluster * apiv1.Cluster ,
112
209
clusterName string ,
113
- walName string ,
114
210
) ([]string , error ) {
115
211
configuration := cluster .Spec .Backup .BarmanObjectStore
116
212
@@ -147,7 +243,6 @@ func barmanCloudWalArchiveOptions(
147
243
options = append (
148
244
options ,
149
245
configuration .DestinationPath ,
150
- serverName ,
151
- walName )
246
+ serverName )
152
247
return options , nil
153
248
}
0 commit comments