Skip to content

Commit

Permalink
feat: add service settings and retry policy
Browse files Browse the repository at this point in the history
Signed-off-by: Marko Kungla <marko@mkungla.dev>
  • Loading branch information
mkungla committed Jun 3, 2024
1 parent a84cbc2 commit 49456f6
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 48 deletions.
8 changes: 4 additions & 4 deletions pkg/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ func New[S Settings](s S) (*Blueprint, error) {
// Use type assertion to ensure compatibility with the original field type
if setter, ok := originalField.Addr().Interface().(SettingField); ok {
if err := setter.UnmarshalSetting([]byte(spec.Value)); err != nil {
return nil, fmt.Errorf("22222 failed to set field %s: %w", field.Name, err)
return nil, fmt.Errorf("failed to set field %s: %w", field.Name, err)
}
} else if nested, ok := originalField.Addr().Interface().(Settings); ok {
// Handle nested settings
if _, err := New(nested); err != nil {
return nil, fmt.Errorf("failed to set nested settings for field %s: %w", field.Name, err)
}
} else {
return nil, fmt.Errorf("33333 field %s does not implement SettingField interface", field.Name)
return nil, fmt.Errorf("field %s does not implement SettingField interface", field.Name)
}
}
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func fieldImplementsSettings(field reflect.StructField) bool {
settingsType := reflect.TypeOf((*Settings)(nil)).Elem()

// Check if the field's type implements the Settings interface
return fieldType.Implements(settingsType) || reflect.PtrTo(fieldType).Implements(settingsType)
return fieldType.Implements(settingsType) || reflect.PointerTo(fieldType).Implements(settingsType)
}

// fieldImplementsSetting checks if a field implements the SettingField interface
Expand All @@ -187,7 +187,7 @@ func fieldImplementsSetting(field reflect.StructField) bool {
settingType := reflect.TypeOf((*SettingField)(nil)).Elem()

// Check if the field's type or pointer to field's type implements the Setting interface
implements := fieldType.Implements(settingType) || reflect.PtrTo(fieldType).Implements(settingType)
implements := fieldType.Implements(settingType) || reflect.PointerTo(fieldType).Implements(settingType)
return implements
}

Expand Down
18 changes: 16 additions & 2 deletions sdk/app/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (e *Engine) handleEvent(sess *session.Context, ev events.Event) {
switch ev.Scope() {
case "services":
switch ev.Key() {
case "start.services":
case services.StartEvent.Key():
if e.state != engineRunning {
sess.Log().Warn("engine is not running, ignoring start.services event")
return
Expand All @@ -488,7 +488,7 @@ func (e *Engine) handleEvent(sess *session.Context, ev events.Event) {
go e.serviceStart(sess, v.String())
return true
})
case "stop.services":
case services.StopEvent.Key():
payload := ev.Payload()
payload.Range(func(v vars.Variable) bool {
go e.serviceStop(sess, v.String(), nil)
Expand Down Expand Up @@ -587,8 +587,13 @@ func (e *Engine) serviceStart(sess *session.Context, svcurl string) {
slog.String("err", err.Error()),
sarg,
)
if e.state == engineRunning && svcc.CanRetry() {
sess.Log().Notice("retrying to start the service", sarg, slog.Int("retry", svcc.Retries()))
e.serviceStart(sess, svcurl)
}
return
}

go func(svcc *services.Container, svcurl string, sarg slog.Attr) {

if !svcc.HasTick() {
Expand Down Expand Up @@ -666,6 +671,15 @@ func (e *Engine) serviceStop(sess *session.Context, svcurl string, err error) {
if e := svcc.Stop(sess, err); e != nil {
sess.Log().Error("failed to stop service", slog.String("err", err.Error()), sarg)
}

if e.state == engineRunning && svcc.CanRetry() {
if err != nil {
sess.Log().Warn("retrying to skipped due service top error", sarg)
return
}
sess.Log().Notice("retrying to start the service", sarg, slog.Int("retry", svcc.Retries()))
go e.serviceStart(sess, svcurl)
}
}

var nooptock = func(*session.Context, time.Duration, int) error { return nil }
Expand Down
98 changes: 83 additions & 15 deletions sdk/services/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
)

type Container struct {
mu sync.RWMutex
info *service.Info
svc *Service
cancel context.CancelCauseFunc
ctx context.Context
cron *serviceCron
mu sync.RWMutex
info *service.Info
svc *Service
cancel context.CancelCauseFunc
ctx context.Context
cron *serviceCron
retries int
}

func NewContainer(sess *session.Context, addr *address.Address, svc *Service) (*Container, error) {
Expand All @@ -53,7 +54,15 @@ func (c *Container) Info() *service.Info {
return c.info
}

func (c *Container) Settings() service.Settings {
c.mu.RLock()
defer c.mu.RUnlock()
return c.svc.settings
}

func (c *Container) Register(sess *session.Context) error {
c.mu.RLock()
defer c.mu.RUnlock()
initerrs := errors.Join(c.svc.errs...)
if initerrs != nil {
return fmt.Errorf("%w(%s): service failed to initialize: %w", Error, c.info.Name(), initerrs)
Expand All @@ -75,7 +84,44 @@ func (c *Container) Register(sess *session.Context) error {
return nil
}

func (c *Container) CanRetry() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return bool(c.svc.settings.RetryOnError) &&
int(c.svc.settings.MaxRetries) > 0 &&
c.retries <= int(c.svc.settings.MaxRetries)
}

func (c *Container) Retries() int {
c.mu.RLock()
defer c.mu.RUnlock()
return c.retries
}

func (c *Container) Start(ectx context.Context, sess *session.Context) (err error) {
c.mu.RLock()
if c.svc.settings.RetryOnError && c.svc.settings.MaxRetries > 0 && c.retries > 0 {
if c.retries > int(c.svc.settings.MaxRetries) {
c.mu.RUnlock()
return fmt.Errorf("%w: service start cancelled: max retries reached", Error)
}
if c.svc.settings.RetryBackoff > 0 {
ctx, _ := context.WithTimeout(ectx, time.Duration(c.svc.settings.RetryBackoff))
<-ctx.Done()
if !errors.Is(ctx.Err(), context.DeadlineExceeded) {
c.mu.RUnlock()
return fmt.Errorf("%w: service start cancelled: %s", Error, ctx.Err())
}
c.mu.RUnlock()
}
} else {
c.mu.RUnlock()
}

c.mu.Lock()
defer c.mu.Unlock()

c.retries++
if c.svc.startAction != nil {
if err := c.svc.startAction(sess); err != nil {
return err
Expand All @@ -88,9 +134,7 @@ func (c *Container) Start(ectx context.Context, sess *session.Context) (err erro
}
}

c.mu.Lock()
c.ctx, c.cancel = context.WithCancelCause(ectx) // with engine context
c.mu.Unlock()

payload := new(vars.Map)

Expand Down Expand Up @@ -120,6 +164,9 @@ func (c *Container) Start(ectx context.Context, sess *session.Context) (err erro
}

func (c *Container) Stop(sess *session.Context, e error) (err error) {
c.mu.RLock()
defer c.mu.RUnlock()

if e != nil {
sess.Log().Error(e.Error(), slog.String("service", c.info.Addr().String()))
}
Expand Down Expand Up @@ -171,33 +218,50 @@ func (c *Container) Stop(sess *session.Context, e error) (err error) {
}

func (c *Container) Done() <-chan struct{} {
c.mu.Lock()
defer c.mu.Unlock()
done := c.ctx.Done()
return done
c.mu.RLock()
defer c.mu.RUnlock()
return c.ctx.Done()
}

func (c *Container) HasTick() bool {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.RLock()
defer c.mu.RUnlock()
return c.svc.tickAction != nil
}

func (c *Container) Tick(sess *session.Context, ts time.Time, delta time.Duration) error {
c.mu.RLock()
defer c.mu.RUnlock()
if c.svc.tickAction == nil {
return nil
}
return c.svc.tickAction(sess, ts, delta)
}

func (c *Container) Tock(sess *session.Context, delta time.Duration, tps int) error {
c.mu.RLock()
if c.svc.tockAction == nil {
c.mu.RUnlock()
return nil
}
return c.svc.tockAction(sess, delta, tps)
if err := c.svc.tockAction(sess, delta, tps); err != nil {
c.mu.RUnlock()
return err
}
retries := c.retries
c.mu.RUnlock()

if retries > 0 {
c.mu.Lock()
c.retries = 0
c.mu.Unlock()
}
return nil
}

func (c *Container) HandleEvent(sess *session.Context, ev events.Event) {
c.mu.RLock()
defer c.mu.RUnlock()
if c.svc.listeners == nil {
return
}
Expand All @@ -215,6 +279,8 @@ func (c *Container) HandleEvent(sess *session.Context, ev events.Event) {
}

func (c *Container) Listeners() []string {
c.mu.RLock()
defer c.mu.RUnlock()
if c.svc.listeners == nil {
return nil
}
Expand All @@ -226,5 +292,7 @@ func (c *Container) Listeners() []string {
}

func (c *Container) Cancel(err error) {
c.mu.RLock()
defer c.mu.RUnlock()
c.cancel(err)
}
14 changes: 4 additions & 10 deletions sdk/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package services

import (
"github.com/happy-sdk/happy/pkg/strings/slug"
"github.com/happy-sdk/happy/sdk/action"
"github.com/happy-sdk/happy/sdk/app/session"
"github.com/happy-sdk/happy/sdk/events"
Expand All @@ -14,8 +13,6 @@ import (

type Service struct {
settings service.Settings
slug string
name string
registerAction action.Action
startAction action.Action
stopAction action.WithPrevErr
Expand All @@ -41,19 +38,16 @@ func New(s service.Settings) *Service {
svc.errs = append(svc.errs, err)
}

svc.name = s.Name.String()
svc.slug = slug.Create(s.Name.String())
svc.settings = s

return svc
}

func (s *Service) Name() string {
if s.name == "" {
return "anonymous-service"
}
return s.name
return s.settings.Name.String()
}
func (s *Service) Slug() string {
return s.slug
return s.settings.Slug.String()
}

// OnRegister is called when app is preparing runtime and attaching services,
Expand Down
28 changes: 11 additions & 17 deletions sdk/services/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
package service

import (
"fmt"

"github.com/happy-sdk/happy/pkg/settings"
"github.com/happy-sdk/happy/pkg/strings/slug"
"github.com/happy-sdk/happy/sdk/events"
)

Expand All @@ -19,23 +18,18 @@ var (
)

type Settings struct {
Name settings.String `key:",config" default:"background" desc:"The name of the service."`
Description settings.String `key:",config" default:"xxx" desc:"The name of the service."`
RetryOnError settings.Bool `default:"false" desc:"Retry the service in case of an error."`
MaxRetries settings.Int `default:"3" desc:"Maximum number of retries on error."`
RetryBackoff settings.Duration `default:"1s" desc:"Duration to wait before each retry."`
Name settings.String `key:",init" default:"Background" desc:"The name of the service."`
// Slug is the unique identifier of the service, if not provided it will be generated from the name.
Slug settings.String `key:",init" desc:"The slug of the service."`
Description settings.String `key:",init" default:"xxx" desc:"The name of the service."`
RetryOnError settings.Bool `key:",init" default:"false" desc:"Retry the service in case of an error."`
MaxRetries settings.Int `key:",init" default:"3" desc:"Maximum number of retries on error."`
RetryBackoff settings.Duration `key:",init" default:"5s" desc:"Duration to wait before each retry."`
}

func (s *Settings) Blueprint() (*settings.Blueprint, error) {
bp, err := settings.New(s)
if err != nil {
return nil, err
if s.Slug == "" && s.Name != "" {
s.Slug = settings.String(slug.Create(s.Name.String()))
}
fmt.Println("s.Name:", s.Name.String())
fmt.Println("s.Description: ", s.Description.String())
fmt.Printf("s.RetryOnError: %t\n", s.RetryOnError)
fmt.Printf("s.MaxRetries: %d\n", s.MaxRetries)
fmt.Printf("s.RetryBackoff: %s\n", s.RetryBackoff.String())

return bp, nil
return settings.New(s)
}
1 change: 1 addition & 0 deletions sdk/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
Error = fmt.Errorf("services error")
// StartEvent starts services defined in payload
StartEvent = events.New("services", "start.services")
StopEvent = events.New("services", "stop.services")
)

type Settings struct {
Expand Down

0 comments on commit 49456f6

Please sign in to comment.