Skip to content

Commit

Permalink
upgrade(installer): Persist task state when the installer restarts
Browse files Browse the repository at this point in the history
  • Loading branch information
BaptisteFoy committed Mar 3, 2025
1 parent c99883d commit d5e0c05
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 38 deletions.
51 changes: 32 additions & 19 deletions pkg/fleet/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/config/remote/client"
"github.com/DataDog/datadog-agent/pkg/config/utils"
"github.com/DataDog/datadog-agent/pkg/fleet/installer"
"github.com/DataDog/datadog-agent/pkg/fleet/installer/db"
"github.com/DataDog/datadog-agent/pkg/fleet/installer/env"
installerErrors "github.com/DataDog/datadog-agent/pkg/fleet/installer/errors"
"github.com/DataDog/datadog-agent/pkg/fleet/installer/exec"
Expand Down Expand Up @@ -87,7 +88,8 @@ type daemonImpl struct {
configs map[string]installerConfig
requests chan remoteAPIRequest
requestsWG sync.WaitGroup
requestsState map[string]requestState
requestsState map[string]db.RequestState
db *db.TasksDB
}

func newInstaller(installerBin string) func(env *env.Env) installer.Installer {
Expand Down Expand Up @@ -127,11 +129,16 @@ func NewDaemon(hostname string, rcFetcher client.ConfigFetcher, config config.Re
IsCentos6: env.DetectCentos6(),
}
installer := newInstaller(installerBin)
return newDaemon(rc, installer, env), nil
db, err := db.NewTasksDB("/tmp/tasks.db") // TODO use the installer temp dir
if err != nil {
return nil, fmt.Errorf("could not create tasks db: %w", err)
}
return newDaemon(rc, installer, env, db), nil
}

func newDaemon(rc *remoteConfig, installer func(env *env.Env) installer.Installer, env *env.Env) *daemonImpl {
func newDaemon(rc *remoteConfig, installer func(env *env.Env) installer.Installer, env *env.Env, tasksDB *db.TasksDB) *daemonImpl {
i := &daemonImpl{
db: tasksDB,
env: env,
rc: rc,
installer: installer,
Expand All @@ -140,7 +147,7 @@ func newDaemon(rc *remoteConfig, installer func(env *env.Env) installer.Installe
catalogOverride: catalog{},
configs: make(map[string]installerConfig),
stopChan: make(chan struct{}),
requestsState: make(map[string]requestState),
requestsState: make(map[string]db.RequestState),
}
i.refreshState(context.Background())
return i
Expand Down Expand Up @@ -662,31 +669,22 @@ type requestKey int

var requestStateKey requestKey

// requestState represents the state of a task.
type requestState struct {
Package string
ID string
State pbgo.TaskState
Err error
ErrorCode installerErrors.InstallerErrorCode
}

func newRequestContext(request remoteAPIRequest) (*telemetry.Span, context.Context) {
ctx := context.WithValue(context.Background(), requestStateKey, &requestState{
ctx := context.WithValue(context.Background(), requestStateKey, &db.RequestState{
Package: request.Package,
ID: request.ID,
State: pbgo.TaskState_RUNNING,
})
return telemetry.StartSpanFromIDs(ctx, "remote_request", request.TraceID, request.ParentSpanID)
}

func setRequestInvalid(ctx context.Context) {
state := ctx.Value(requestStateKey).(*requestState)
func (d *daemonImpl) setRequestInvalid(ctx context.Context) {
state := ctx.Value(requestStateKey).(*db.RequestState)
state.State = pbgo.TaskState_INVALID_STATE
}

func setRequestDone(ctx context.Context, err error) {
state := ctx.Value(requestStateKey).(*requestState)
func (d *daemonImpl) setRequestDone(ctx context.Context, err error) {
state := ctx.Value(requestStateKey).(*db.RequestState)
state.State = pbgo.TaskState_DONE
if err != nil {
state.State = pbgo.TaskState_ERROR
Expand All @@ -696,10 +694,25 @@ func setRequestDone(ctx context.Context, err error) {
}

func (d *daemonImpl) refreshState(ctx context.Context) {
request, ok := ctx.Value(requestStateKey).(*requestState)
request, ok := ctx.Value(requestStateKey).(*db.RequestState)
if ok {
d.requestsState[request.Package] = *request
} else {
lastRequestState, err := d.db.GetLastTask()
if err != nil {
log.Warnf("could not get last task: %v", err)
}
if lastRequestState != nil {
d.requestsState[lastRequestState.Package] = *lastRequestState
}
}
defer func() {
err := d.db.SetLastTask(request)
if err != nil {
log.Warnf("could not set last task: %v", err)
}
}()

state, err := d.installer(d.env).States()
if err != nil {
// TODO: we should report this error through RC in some way
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package db
import (
"encoding/json"
"fmt"
"time"

"go.etcd.io/bbolt"
)
Expand All @@ -31,27 +30,14 @@ type Package struct {
InstallerVersion string
}

// PackagesDB is a database that stores information about packages
// PackagesDB is a database that stores information about packages.
// It is opened by the installer binary
type PackagesDB struct {
db *bbolt.DB
}

type options struct {
timeout time.Duration
}

// Option is a function that sets an option on a PackagesDB
type Option func(*options)

// WithTimeout sets the timeout for opening the database
func WithTimeout(timeout time.Duration) Option {
return func(o *options) {
o.timeout = timeout
}
}

// New creates a new PackagesDB
func New(dbPath string, opts ...Option) (*PackagesDB, error) {
func NewPackagesDB(dbPath string, opts ...Option) (*PackagesDB, error) {
o := options{}
for _, opt := range opts {
opt(&o)
Expand Down
File renamed without changes.
109 changes: 109 additions & 0 deletions pkg/fleet/installer/db/task_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package db

import (
"encoding/json"
"fmt"

installerErrors "github.com/DataDog/datadog-agent/pkg/fleet/installer/errors"
pbgo "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
"go.etcd.io/bbolt"
)

var (
bucketTasks = []byte("tasks")
lastTaskKey = []byte("last_task")
)

// RequestState represents the state of a task.
type RequestState struct {
Package string `json:"package"`
ID string `json:"id"`
State pbgo.TaskState `json:"state"`
Err error `json:"error,omitempty"`
ErrorCode installerErrors.InstallerErrorCode `json:"error_code,omitempty"`
}

// TasksDB is a database that stores information about tasks.
// It is opened by the installer daemon.
type TasksDB struct {
db *bbolt.DB
}

// New creates a new TasksDB
func NewTasksDB(dbPath string, opts ...Option) (*TasksDB, error) {
o := options{}
for _, opt := range opts {
opt(&o)
}
db, err := bbolt.Open(dbPath, 0644, &bbolt.Options{
Timeout: o.timeout,
FreelistType: bbolt.FreelistArrayType,
})
if err != nil {
return nil, fmt.Errorf("could not open database: %w", err)
}
err = db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucketTasks)
return err
})
if err != nil {
return nil, fmt.Errorf("could not create tasks bucket: %w", err)
}
return &TasksDB{
db: db,
}, nil
}

// Close closes the database
func (p *TasksDB) Close() error {
return p.db.Close()
}

// SetLastTask sets the last task
func (p *TasksDB) SetLastTask(task *RequestState) error {
err := p.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucketTasks)
if b == nil {
return fmt.Errorf("bucket not found")
}
rawTask, err := json.Marshal(&task)
if err != nil {
return fmt.Errorf("could not marshal task: %w", err)
}
return b.Put(lastTaskKey, rawTask)
})
if err != nil {
return fmt.Errorf("could not set task: %w", err)
}
return nil
}

// GetLastTask retrieves the last task
func (p *TasksDB) GetLastTask() (*RequestState, error) {
var task *RequestState
err := p.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucketPackages)
if b == nil {
return fmt.Errorf("bucket not found")
}
v := b.Get(lastTaskKey)
if len(v) == 0 {
// No task found, no error
return nil
}
err := json.Unmarshal(v, task)
if err != nil {
return fmt.Errorf("could not unmarshal task: %w", err)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("could not get task: %w", err)
}
return task, nil
}
22 changes: 22 additions & 0 deletions pkg/fleet/installer/db/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package db

import "time"

type options struct {
timeout time.Duration
}

// Option is a function that sets an option on a PackagesDB
type Option func(*options)

// WithTimeout sets the timeout for opening the database
func WithTimeout(timeout time.Duration) Option {
return func(o *options) {
o.timeout = timeout
}
}
2 changes: 1 addition & 1 deletion pkg/fleet/installer/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewInstaller(env *env.Env) (Installer, error) {
if err != nil {
return nil, fmt.Errorf("could not ensure packages and config directory exists: %w", err)
}
db, err := db.New(filepath.Join(paths.PackagesPath, "packages.db"), db.WithTimeout(10*time.Second))
db, err := db.NewPackagesDB(filepath.Join(paths.PackagesPath, "packages.db"), db.WithTimeout(10*time.Second))
if err != nil {
return nil, fmt.Errorf("could not create packages db: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fleet/installer/installer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type testPackageManager struct {
func newTestPackageManager(t *testing.T, s *fixtures.Server, rootPath string) *testPackageManager {
packages := repository.NewRepositories(rootPath, packages.PreRemoveHooks)
configs := repository.NewRepositories(t.TempDir(), nil)
db, err := db.New(filepath.Join(rootPath, "packages.db"))
db, err := db.NewPackagesDB(filepath.Join(rootPath, "packages.db"))
assert.NoError(t, err)
return &testPackageManager{
installerImpl{
Expand Down

0 comments on commit d5e0c05

Please sign in to comment.