Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade(installer): Persist task state when the installer restarts #34642

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions pkg/fleet/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/config/remote/client"
"github.com/DataDog/datadog-agent/pkg/config/utils"
"github.com/DataDog/datadog-agent/pkg/fleet/installer"
"github.com/DataDog/datadog-agent/pkg/fleet/installer/db"
"github.com/DataDog/datadog-agent/pkg/fleet/installer/env"
installerErrors "github.com/DataDog/datadog-agent/pkg/fleet/installer/errors"
"github.com/DataDog/datadog-agent/pkg/fleet/installer/exec"
Expand Down Expand Up @@ -87,7 +88,8 @@ type daemonImpl struct {
configs map[string]installerConfig
requests chan remoteAPIRequest
requestsWG sync.WaitGroup
requestsState map[string]requestState
requestsState map[string]db.RequestState
db *db.TasksDB
}

func newInstaller(installerBin string) func(env *env.Env) installer.Installer {
Expand Down Expand Up @@ -127,11 +129,16 @@ func NewDaemon(hostname string, rcFetcher client.ConfigFetcher, config config.Re
IsCentos6: env.DetectCentos6(),
}
installer := newInstaller(installerBin)
return newDaemon(rc, installer, env), nil
db, err := db.NewTasksDB("/tmp/tasks.db") // TODO use the installer temp dir
if err != nil {
return nil, fmt.Errorf("could not create tasks db: %w", err)
}
return newDaemon(rc, installer, env, db), nil
}

func newDaemon(rc *remoteConfig, installer func(env *env.Env) installer.Installer, env *env.Env) *daemonImpl {
func newDaemon(rc *remoteConfig, installer func(env *env.Env) installer.Installer, env *env.Env, tasksDB *db.TasksDB) *daemonImpl {
i := &daemonImpl{
db: tasksDB,
env: env,
rc: rc,
installer: installer,
Expand All @@ -140,7 +147,7 @@ func newDaemon(rc *remoteConfig, installer func(env *env.Env) installer.Installe
catalogOverride: catalog{},
configs: make(map[string]installerConfig),
stopChan: make(chan struct{}),
requestsState: make(map[string]requestState),
requestsState: make(map[string]db.RequestState),
}
i.refreshState(context.Background())
return i
Expand Down Expand Up @@ -662,31 +669,22 @@ type requestKey int

var requestStateKey requestKey

// requestState represents the state of a task.
type requestState struct {
Package string
ID string
State pbgo.TaskState
Err error
ErrorCode installerErrors.InstallerErrorCode
}

func newRequestContext(request remoteAPIRequest) (*telemetry.Span, context.Context) {
ctx := context.WithValue(context.Background(), requestStateKey, &requestState{
ctx := context.WithValue(context.Background(), requestStateKey, &db.RequestState{
Package: request.Package,
ID: request.ID,
State: pbgo.TaskState_RUNNING,
})
return telemetry.StartSpanFromIDs(ctx, "remote_request", request.TraceID, request.ParentSpanID)
}

func setRequestInvalid(ctx context.Context) {
state := ctx.Value(requestStateKey).(*requestState)
func (d *daemonImpl) setRequestInvalid(ctx context.Context) {
state := ctx.Value(requestStateKey).(*db.RequestState)
state.State = pbgo.TaskState_INVALID_STATE
}

func setRequestDone(ctx context.Context, err error) {
state := ctx.Value(requestStateKey).(*requestState)
func (d *daemonImpl) setRequestDone(ctx context.Context, err error) {
state := ctx.Value(requestStateKey).(*db.RequestState)
state.State = pbgo.TaskState_DONE
if err != nil {
state.State = pbgo.TaskState_ERROR
Expand All @@ -696,10 +694,25 @@ func setRequestDone(ctx context.Context, err error) {
}

func (d *daemonImpl) refreshState(ctx context.Context) {
request, ok := ctx.Value(requestStateKey).(*requestState)
request, ok := ctx.Value(requestStateKey).(*db.RequestState)
if ok {
d.requestsState[request.Package] = *request
} else {
lastRequestState, err := d.db.GetLastTask()
if err != nil {
log.Warnf("could not get last task: %v", err)
}
if lastRequestState != nil {
d.requestsState[lastRequestState.Package] = *lastRequestState
}
}
defer func() {
err := d.db.SetLastTask(request)
if err != nil {
log.Warnf("could not set last task: %v", err)
}
}()

state, err := d.installer(d.env).States()
if err != nil {
// TODO: we should report this error through RC in some way
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package db
import (
"encoding/json"
"fmt"
"time"

"go.etcd.io/bbolt"
)
Expand All @@ -31,27 +30,14 @@ type Package struct {
InstallerVersion string
}

// PackagesDB is a database that stores information about packages
// PackagesDB is a database that stores information about packages.
// It is opened by the installer binary
type PackagesDB struct {
db *bbolt.DB
}

type options struct {
timeout time.Duration
}

// Option is a function that sets an option on a PackagesDB
type Option func(*options)

// WithTimeout sets the timeout for opening the database
func WithTimeout(timeout time.Duration) Option {
return func(o *options) {
o.timeout = timeout
}
}

// New creates a new PackagesDB
func New(dbPath string, opts ...Option) (*PackagesDB, error) {
func NewPackagesDB(dbPath string, opts ...Option) (*PackagesDB, error) {
o := options{}
for _, opt := range opts {
opt(&o)
Expand Down
109 changes: 109 additions & 0 deletions pkg/fleet/installer/db/task_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package db

import (
"encoding/json"
"fmt"

installerErrors "github.com/DataDog/datadog-agent/pkg/fleet/installer/errors"
pbgo "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
"go.etcd.io/bbolt"
)

var (
bucketTasks = []byte("tasks")
lastTaskKey = []byte("last_task")
)

// RequestState represents the state of a task.
type RequestState struct {
Package string `json:"package"`
ID string `json:"id"`
State pbgo.TaskState `json:"state"`
Err error `json:"error,omitempty"`
ErrorCode installerErrors.InstallerErrorCode `json:"error_code,omitempty"`
}

// TasksDB is a database that stores information about tasks.
// It is opened by the installer daemon.
type TasksDB struct {
db *bbolt.DB
}

// New creates a new TasksDB
func NewTasksDB(dbPath string, opts ...Option) (*TasksDB, error) {
o := options{}
for _, opt := range opts {
opt(&o)
}
db, err := bbolt.Open(dbPath, 0644, &bbolt.Options{
Timeout: o.timeout,
FreelistType: bbolt.FreelistArrayType,
})
if err != nil {
return nil, fmt.Errorf("could not open database: %w", err)
}
err = db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucketTasks)
return err
})
if err != nil {
return nil, fmt.Errorf("could not create tasks bucket: %w", err)
}
return &TasksDB{
db: db,
}, nil
}

// Close closes the database
func (p *TasksDB) Close() error {
return p.db.Close()
}

// SetLastTask sets the last task
func (p *TasksDB) SetLastTask(task *RequestState) error {
err := p.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucketTasks)
if b == nil {
return fmt.Errorf("bucket not found")
}
rawTask, err := json.Marshal(&task)
if err != nil {
return fmt.Errorf("could not marshal task: %w", err)
}
return b.Put(lastTaskKey, rawTask)
})
if err != nil {
return fmt.Errorf("could not set task: %w", err)
}
return nil
}

// GetLastTask retrieves the last task
func (p *TasksDB) GetLastTask() (*RequestState, error) {
var task *RequestState
err := p.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucketPackages)
if b == nil {
return fmt.Errorf("bucket not found")
}
v := b.Get(lastTaskKey)
if len(v) == 0 {
// No task found, no error
return nil
}
err := json.Unmarshal(v, task)
if err != nil {
return fmt.Errorf("could not unmarshal task: %w", err)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("could not get task: %w", err)
}
return task, nil
}
22 changes: 22 additions & 0 deletions pkg/fleet/installer/db/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package db

import "time"

type options struct {
timeout time.Duration
}

// Option is a function that sets an option on a PackagesDB
type Option func(*options)

// WithTimeout sets the timeout for opening the database
func WithTimeout(timeout time.Duration) Option {
return func(o *options) {
o.timeout = timeout
}
}
2 changes: 1 addition & 1 deletion pkg/fleet/installer/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewInstaller(env *env.Env) (Installer, error) {
if err != nil {
return nil, fmt.Errorf("could not ensure packages and config directory exists: %w", err)
}
db, err := db.New(filepath.Join(paths.PackagesPath, "packages.db"), db.WithTimeout(10*time.Second))
db, err := db.NewPackagesDB(filepath.Join(paths.PackagesPath, "packages.db"), db.WithTimeout(10*time.Second))
if err != nil {
return nil, fmt.Errorf("could not create packages db: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fleet/installer/installer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type testPackageManager struct {
func newTestPackageManager(t *testing.T, s *fixtures.Server, rootPath string) *testPackageManager {
packages := repository.NewRepositories(rootPath, packages.PreRemoveHooks)
configs := repository.NewRepositories(t.TempDir(), nil)
db, err := db.New(filepath.Join(rootPath, "packages.db"))
db, err := db.NewPackagesDB(filepath.Join(rootPath, "packages.db"))
assert.NoError(t, err)
return &testPackageManager{
installerImpl{
Expand Down
Loading