Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Agent fix snapshot download for upgrade #22175

Merged
merged 3 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func streamFactory(ctx context.Context, agentInfo *info.AgentInfo, cfg *configur
}

func newOperator(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) {
fetcher := downloader.NewDownloader(log, config.DownloadConfig, false)
fetcher := downloader.NewDownloader(log, config.DownloadConfig)
allowEmptyPgp, pgp := release.PGP()
verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp, false)
verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp)
if err != nil {
return nil, errors.New(err, "initiating verifier")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,26 @@ package upgrade
import (
"context"
"strings"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/composed"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/fs"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/http"
downloader "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/localremote"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/snapshot"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)

func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI string) (string, error) {
// do not update source config
settings := *u.settings

// agent binaries are a bit larger and do not fit into normal timeout on slower connections
settings.Timeout = 120 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still enough time? How are you deciding this number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was also thining about N x standard timeout from config which seems more ok

if sourceURI != "" {
if strings.HasPrefix(sourceURI, "file://") {
// update the DropPath so the fs.Downloader can download from this
Expand All @@ -26,13 +37,16 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri
}
}

allowEmptyPgp, pgp := release.PGP()
verifier, err := downloader.NewVerifier(u.log, &settings, allowEmptyPgp, pgp, true)
verifier, err := newVerifier(version, u.log, &settings)
if err != nil {
return "", errors.New(err, "initiating verifier")
}

fetcher := downloader.NewDownloader(u.log, &settings, true)
fetcher, err := newDownloader(version, u.log, &settings)
if err != nil {
return "", errors.New(err, "initiating fetcher")
}

path, err := fetcher.Download(ctx, agentName, agentArtifactName, version)
if err != nil {
return "", errors.New(err, "failed upgrade of agent binary")
Expand All @@ -48,3 +62,45 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri

return path, nil
}

func newDownloader(version string, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) {
if !strings.HasSuffix(version, "-SNAPSHOT") {
return downloader.NewDownloader(log, settings), nil
}

// try snapshot repo before official
snapDownloader, err := snapshot.NewDownloader(settings, version)
if err != nil {
return nil, err
}

return composed.NewDownloader(
fs.NewDownloader(settings),
snapDownloader,
http.NewDownloader(settings),
), nil
}

func newVerifier(version string, log *logger.Logger, settings *artifact.Config) (download.Verifier, error) {
allowEmptyPgp, pgp := release.PGP()
if !strings.HasSuffix(version, "-SNAPSHOT") {
return downloader.NewVerifier(log, settings, allowEmptyPgp, pgp)
}

fsVerifier, err := fs.NewVerifier(settings, allowEmptyPgp, pgp)
if err != nil {
return nil, err
}

snapshotVerifier, err := snapshot.NewVerifier(settings, allowEmptyPgp, pgp, version)
if err != nil {
return nil, err
}

remoteVerifier, err := http.NewVerifier(settings, allowEmptyPgp, pgp)
if err != nil {
return nil, err
}

return composed.NewVerifier(fsVerifier, snapshotVerifier, remoteVerifier), nil
}
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/artifact/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func DefaultConfig() *Config {
return &Config{
SourceURI: "https://artifacts.elastic.co/downloads/",
TargetDirectory: filepath.Join(homePath, "downloads"),
Timeout: 30 * time.Second,
Timeout: 60 * time.Second, // binaries are a bit larger it might take >30s to download them
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You used 120 above and 60 here, why the difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is used for beats and endpoint, these binaries a smaller so it enables us to fail faster in case somethigns wrong.
i'm considering 2 approaches: have agent timeout *2 of what this regular is set to
or set this to higher number but slow down processing loop in case server responds very slowly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it really be slower? A TCP reset will not rely on timeout, it's only in the case the server stops pushing data in the HTTP response. I see that as a very rare-case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really slower, but it takes more time to download 100megs than 40, we use this as a timecap for execution of the whole request. not only getting a response

InstallPath: filepath.Join(homePath, "install"),
}
}
Expand Down
7 changes: 5 additions & 2 deletions x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
)

const (
ascSuffix = ".asc"
ascSuffix = ".asc"
sha512Length = 128
)

// Verifier verifies a downloaded package by comparing with public ASC
Expand Down Expand Up @@ -93,7 +94,9 @@ func (v *Verifier) verifyHash(filename, fullPath string) (bool, error) {
continue
}

expectedHash = strings.TrimSpace(strings.TrimSuffix(line, filename))
if len(line) > sha512Length {
expectedHash = strings.TrimSpace(line[:sha512Length])
}
}

if expectedHash == "" {
Expand Down
12 changes: 6 additions & 6 deletions x-pack/elastic-agent/pkg/artifact/download/http/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
}

destinationFile, err := os.OpenFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved to take the local file into account first? Not sure I fully follow this part here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was to ensure that the file path to write to can be opened before even starting the HTTP connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly, just a micro optimization

if err != nil {
return "", errors.New(err, "creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath))
}
defer destinationFile.Close()

resp, err := e.client.Do(req.WithContext(ctx))
if err != nil {
return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
Expand All @@ -142,12 +148,6 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
return "", errors.New(fmt.Sprintf("call to '%s' returned unsuccessful status code: %d", sourceURI, resp.StatusCode), errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
}

destinationFile, err := os.OpenFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions)
if err != nil {
return "", errors.New(err, "creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath))
}
defer destinationFile.Close()

_, err = io.Copy(destinationFile, resp.Body)
return fullPath, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (

// NewDownloader creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewDownloader(log *logger.Logger, config *artifact.Config, forceSnapshot bool) download.Downloader {
func NewDownloader(log *logger.Logger, config *artifact.Config) download.Downloader {
downloaders := make([]download.Downloader, 0, 3)
downloaders = append(downloaders, fs.NewDownloader(config))

// try snapshot repo before official
if release.Snapshot() || forceSnapshot {
snapDownloader, err := snapshot.NewDownloader(config)
if release.Snapshot() {
snapDownloader, err := snapshot.NewDownloader(config, "")
if err != nil {
log.Error(err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// NewVerifier creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte, forceSnapshot bool) (download.Verifier, error) {
func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte) (download.Verifier, error) {
verifiers := make([]download.Verifier, 0, 3)

fsVer, err := fs.NewVerifier(config, allowEmptyPgp, pgp)
Expand All @@ -27,8 +27,8 @@ func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool
verifiers = append(verifiers, fsVer)

// try snapshot repo before official
if release.Snapshot() || forceSnapshot {
snapshotVerifier, err := snapshot.NewVerifier(config, allowEmptyPgp, pgp)
if release.Snapshot() {
snapshotVerifier, err := snapshot.NewVerifier(config, allowEmptyPgp, pgp, "")
if err != nil {
log.Error(err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ import (

// NewDownloader creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewDownloader(config *artifact.Config) (download.Downloader, error) {
cfg, err := snapshotConfig(config)
func NewDownloader(config *artifact.Config, versionOverride string) (download.Downloader, error) {
cfg, err := snapshotConfig(config, versionOverride)
if err != nil {
return nil, err
}
return http.NewDownloader(cfg), nil
}

func snapshotConfig(config *artifact.Config) (*artifact.Config, error) {
snapshotURI, err := snapshotURI()
func snapshotConfig(config *artifact.Config, versionOverride string) (*artifact.Config, error) {
snapshotURI, err := snapshotURI(versionOverride)
if err != nil {
return nil, fmt.Errorf("failed to detect remote snapshot repo, proceeding with configured: %v", err)
}
Expand All @@ -43,8 +43,16 @@ func snapshotConfig(config *artifact.Config) (*artifact.Config, error) {
}, nil
}

func snapshotURI() (string, error) {
artifactsURI := fmt.Sprintf("https://artifacts-api.elastic.co/v1/search/%s-SNAPSHOT/elastic-agent", release.Version())
func snapshotURI(versionOverride string) (string, error) {
version := release.Version()
if versionOverride != "" {
if strings.HasSuffix(versionOverride, "-SNAPSHOT") {
versionOverride = strings.TrimSuffix(versionOverride, "-SNAPSHOT")
}
version = versionOverride
}

artifactsURI := fmt.Sprintf("https://artifacts-api.elastic.co/v1/search/%s-SNAPSHOT/elastic-agent", version)
resp, err := gohttp.Get(artifactsURI)
if err != nil {
return "", err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (

// NewVerifier creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte) (download.Verifier, error) {
cfg, err := snapshotConfig(config)
func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte, versionOverride string) (download.Verifier, error) {
cfg, err := snapshotConfig(config, versionOverride)
if err != nil {
return nil, err
}
Expand Down