-
Notifications
You must be signed in to change notification settings - Fork 78
/
Copy pathpackagessyncer.go
359 lines (310 loc) · 10.8 KB
/
packagessyncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
package internal
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"sync"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
)
// packagesSyncer performs the package syncing process.
type packagesSyncer struct {
logger types.Logger
available *protobufs.PackagesAvailable
clientSyncedState *ClientSyncedState
localState types.PackagesStateProvider
sender Sender
statuses *protobufs.PackageStatuses
mux *sync.Mutex
doneCh chan struct{}
}
// NewPackagesSyncer creates a new packages syncer.
func NewPackagesSyncer(
logger types.Logger,
available *protobufs.PackagesAvailable,
sender Sender,
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
mux *sync.Mutex,
) *packagesSyncer {
return &packagesSyncer{
logger: logger,
available: available,
sender: sender,
clientSyncedState: clientSyncedState,
localState: packagesStateProvider,
doneCh: make(chan struct{}),
mux: mux,
}
}
// Sync performs the package syncing process.
func (s *packagesSyncer) Sync(ctx context.Context) error {
defer func() {
close(s.doneCh)
}()
// Prepare package statuses.
// Grab a lock to make sure that package statuses are not overriden by
// another call to Sync running in parallel.
// In case Sync returns early with an error, take care of unlocking the
// mutex in this goroutine; otherwise it will be unlocked at the end
// of the sync operation.
s.mux.Lock()
if err := s.initStatuses(); err != nil {
s.mux.Unlock()
return err
}
if err := s.clientSyncedState.SetPackageStatuses(s.statuses); err != nil {
s.mux.Unlock()
return err
}
// Now do the actual syncing in the background and release the lock from
// inside of the goroutine.
go s.doSync(ctx)
return nil
}
func (s *packagesSyncer) Done() <-chan struct{} {
return s.doneCh
}
// initStatuses initializes the "statuses" field from the current "localState".
// The "statuses" will be updated as the syncing progresses and will be
// periodically reported to the Server.
func (s *packagesSyncer) initStatuses() error {
if s.localState == nil {
return errors.New("cannot sync packages because PackagesStateProvider is not provided")
}
// Restore statuses that were previously stored in the local state.
var err error
s.statuses, err = s.localState.LastReportedStatuses()
if err != nil {
return err
}
if s.statuses == nil {
// No statuses are stored locally (maybe first time), just start with empty.
s.statuses = &protobufs.PackageStatuses{}
}
if s.statuses.Packages == nil {
s.statuses.Packages = map[string]*protobufs.PackageStatus{}
}
// Report to the Server the "all" hash that we received from the Server so that
// the Server knows we are processing the right offer.
s.statuses.ServerProvidedAllPackagesHash = s.available.AllPackagesHash
return nil
}
// doSync performs the actual syncing process.
func (s *packagesSyncer) doSync(ctx context.Context) {
// Once doSync returns in a separate goroutine, make sure to release the
// mutex so that a new syncing process can take place.
defer s.mux.Unlock()
hash, err := s.localState.AllPackagesHash()
if err != nil {
s.logger.Errorf(ctx, "Package syncing failed: %V", err)
return
}
if bytes.Equal(hash, s.available.AllPackagesHash) {
s.logger.Debugf(ctx, "All packages are already up to date.")
return
}
failed := false
if err := s.deleteUnneededLocalPackages(ctx); err != nil {
s.logger.Errorf(ctx, "Cannot delete unneeded packages: %v", err)
failed = true
}
// Iterate through offered packages and sync them all from server.
for name, pkg := range s.available.Packages {
err := s.syncPackage(ctx, name, pkg)
if err != nil {
s.logger.Errorf(ctx, "Cannot sync package %s: %v", name, err)
failed = true
}
}
if !failed {
// Update the "all" hash on success, so that next time Sync() does not thing,
// unless a new hash is received from the Server.
if err := s.localState.SetAllPackagesHash(s.available.AllPackagesHash); err != nil {
s.logger.Errorf(ctx, "SetAllPackagesHash failed: %v", err)
} else {
s.logger.Debugf(ctx, "All packages are synced and up to date.")
}
} else {
s.logger.Errorf(ctx, "Package syncing was not successful.")
}
_ = s.reportStatuses(ctx, true)
}
// syncPackage downloads the package from the server and installs it.
func (s *packagesSyncer) syncPackage(
ctx context.Context,
pkgName string,
pkgAvail *protobufs.PackageAvailable,
) error {
status := s.statuses.Packages[pkgName]
if status == nil {
// This package has no status. Create one.
status = &protobufs.PackageStatus{
Name: pkgName,
}
s.statuses.Packages[pkgName] = status
}
// Update the newly offered package Version and Hash
status.ServerOfferedVersion = pkgAvail.Version
status.ServerOfferedHash = pkgAvail.Hash
pkgLocal, err := s.localState.PackageState(pkgName)
if err != nil {
return err
}
mustCreate := !pkgLocal.Exists
if pkgLocal.Exists {
if bytes.Equal(pkgLocal.Hash, pkgAvail.Hash) {
s.logger.Debugf(ctx, "Package %s hash is unchanged, skipping", pkgName)
return nil
}
if pkgLocal.Type != pkgAvail.Type {
// Package is of wrong type. Need to re-create it. So, delete it.
if err := s.localState.DeletePackage(pkgName); err != nil {
err = fmt.Errorf("cannot delete existing version of package %s: %v", pkgName, err)
status.Status = protobufs.PackageStatusEnum_PackageStatusEnum_InstallFailed
status.ErrorMessage = err.Error()
return err
}
// And mark that it needs to be created.
mustCreate = true
}
}
// Report that we are beginning to install it.
status.Status = protobufs.PackageStatusEnum_PackageStatusEnum_Installing
_ = s.reportStatuses(ctx, true)
if mustCreate {
// Make sure the package exists.
err = s.localState.CreatePackage(pkgName, pkgAvail.Type)
if err != nil {
err = fmt.Errorf("cannot create package %s: %v", pkgName, err)
status.Status = protobufs.PackageStatusEnum_PackageStatusEnum_InstallFailed
status.ErrorMessage = err.Error()
return err
}
}
// Sync package file: ensure it exists or download it.
err = s.syncPackageFile(ctx, pkgName, pkgAvail.File)
if err == nil {
// Only save the state on success, so that next sync does not retry this package.
pkgLocal.Hash = pkgAvail.Hash
pkgLocal.Version = pkgAvail.Version
if err := s.localState.SetPackageState(pkgName, pkgLocal); err == nil {
status.Status = protobufs.PackageStatusEnum_PackageStatusEnum_Installed
status.AgentHasHash = pkgAvail.Hash
status.AgentHasVersion = pkgAvail.Version
}
}
if err != nil {
status.Status = protobufs.PackageStatusEnum_PackageStatusEnum_InstallFailed
status.ErrorMessage = err.Error()
}
_ = s.reportStatuses(ctx, true)
return err
}
// syncPackageFile downloads the package file from the server.
// If the file already exists and contents are
// unchanged, it is not downloaded again.
func (s *packagesSyncer) syncPackageFile(
ctx context.Context, pkgName string, file *protobufs.DownloadableFile,
) error {
shouldDownload, err := s.shouldDownloadFile(ctx, pkgName, file)
if err == nil && shouldDownload {
err = s.downloadFile(ctx, pkgName, file)
}
return err
}
// shouldDownloadFile returns true if the file should be downloaded.
func (s *packagesSyncer) shouldDownloadFile(ctx context.Context,
packageName string,
file *protobufs.DownloadableFile) (bool, error) {
fileContentHash, err := s.localState.FileContentHash(packageName)
if err != nil {
err := fmt.Errorf("cannot calculate checksum of %s: %v", packageName, err)
s.logger.Errorf(ctx, err.Error())
return true, nil
} else {
// Compare the checksum of the file we have with what
// we are offered by the server.
if !bytes.Equal(fileContentHash, file.ContentHash) {
s.logger.Debugf(ctx, "Package %s: file hash mismatch, will download.", packageName)
return true, nil
}
}
return false, nil
}
// downloadFile downloads the file from the server.
func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file *protobufs.DownloadableFile) error {
s.logger.Debugf(ctx, "Downloading package %s file from %s", pkgName, file.DownloadUrl)
req, err := http.NewRequestWithContext(ctx, "GET", file.DownloadUrl, nil)
if err != nil {
return fmt.Errorf("cannot download file from %s: %v", file.DownloadUrl, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("cannot download file from %s: %v", file.DownloadUrl, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("cannot download file from %s, HTTP response=%v", file.DownloadUrl, resp.StatusCode)
}
// TODO: either add a callback to verify file.Signature or pass the Signature
// as a parameter to UpdateContent.
err = s.localState.UpdateContent(ctx, pkgName, resp.Body, file.ContentHash)
if err != nil {
return fmt.Errorf("failed to install/update the package %s downloaded from %s: %v", pkgName, file.DownloadUrl, err)
}
return nil
}
// deleteUnneededLocalPackages deletes local packages that are not
// needed anymore. This is done by comparing the local package state
// with the server's package state.
func (s *packagesSyncer) deleteUnneededLocalPackages(ctx context.Context) error {
// Read the list of packages we have locally.
localPackages, err := s.localState.Packages()
if err != nil {
return err
}
var lastErr error
for _, localPkg := range localPackages {
// Do we have a package that is not offered?
if _, offered := s.available.Packages[localPkg]; !offered {
s.logger.Debugf(ctx, "Package %s is no longer needed, deleting.", localPkg)
err := s.localState.DeletePackage(localPkg)
if err != nil {
lastErr = err
}
}
}
// Also remove packages that were not offered from the statuses.
for name := range s.statuses.Packages {
if _, offered := s.available.Packages[name]; !offered {
delete(s.statuses.Packages, name)
}
}
return lastErr
}
// reportStatuses saves the last reported statuses to provider and client state.
// If sendImmediately is true, the statuses are scheduled to be
// sent to the server.
func (s *packagesSyncer) reportStatuses(ctx context.Context, sendImmediately bool) error {
// Save it in the user-supplied state provider.
if err := s.localState.SetLastReportedStatuses(s.statuses); err != nil {
s.logger.Errorf(ctx, "Cannot save last reported statuses: %v", err)
return err
}
// Also save it in our internal state (will be needed if the Server asks for it).
if err := s.clientSyncedState.SetPackageStatuses(s.statuses); err != nil {
s.logger.Errorf(ctx, "Cannot save client state: %v", err)
return err
}
s.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.PackageStatuses = s.clientSyncedState.PackageStatuses()
})
if sendImmediately {
s.sender.ScheduleSend()
}
return nil
}