diff --git a/provider/cmd/run.go b/provider/cmd/run.go index 00a786c594..a0d9d14502 100644 --- a/provider/cmd/run.go +++ b/provider/cmd/run.go @@ -85,6 +85,8 @@ const ( FlagMinimumBalance = "minimum-balance" FlagBalanceCheckPeriod = "balance-check-period" FlagProviderConfig = "provider-config" + FlagCachedResultMaxAge = "cached-result-max-age" + FlagRPCQueryTimeout = "rpc-query-timeout" ) var ( @@ -285,6 +287,16 @@ func RunCmd() *cobra.Command { return nil } + cmd.Flags().Duration(FlagRPCQueryTimeout, time.Minute, "timeout for requests made to the RPC node") + if err := viper.BindPFlag(FlagRPCQueryTimeout, cmd.Flags().Lookup(FlagRPCQueryTimeout)); err != nil { + return nil + } + + cmd.Flags().Duration(FlagCachedResultMaxAge, 5*time.Second, "max. cache age for results from the RPC node") + if err := viper.BindPFlag(FlagCachedResultMaxAge, cmd.Flags().Lookup(FlagCachedResultMaxAge)); err != nil { + return nil + } + return cmd } @@ -378,6 +390,8 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { manifestTimeout := viper.GetDuration(FlagManifestTimeout) metricsListener := viper.GetString(FlagMetricsListener) providerConfig := viper.GetString(FlagProviderConfig) + cachedResultMaxAge := viper.GetDuration(FlagCachedResultMaxAge) + rpcQueryTimeout := viper.GetDuration(FlagRPCQueryTimeout) var metricsRouter http.Handler if len(metricsListener) != 0 { @@ -552,6 +566,8 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { return err } config.BidDeposit = bidDeposit + config.RPCQueryTimeout = rpcQueryTimeout + config.CachedResultMaxAge = cachedResultMaxAge service, err := provider.NewService(ctx, cctx, info.GetAddress(), session, bus, cclient, config) if err != nil { diff --git a/provider/config.go b/provider/config.go index f31181dfab..ed412e6d97 100644 --- a/provider/config.go +++ b/provider/config.go @@ -29,6 +29,8 @@ type Config struct { DeploymentIngressStaticHosts bool DeploymentIngressDomain string ClusterSettings map[interface{}]interface{} + RPCQueryTimeout time.Duration + CachedResultMaxAge time.Duration } func NewDefaultConfig() Config { diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index 98ddf0bcde..79e730d777 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -55,6 +55,7 @@ const ( // errors from private use staring websocketInternalServerErrorCode = 4000 websocketLeaseNotFound = 4001 + manifestSubmitTimeout = 120 * time.Second ) type wsStreamConfig struct { @@ -383,7 +384,7 @@ func validateHandler(log log.Logger, cl provider.ValidateClient) http.HandlerFun } } -func createManifestHandler(_ log.Logger, mclient pmanifest.Client) http.HandlerFunc { +func createManifestHandler(log log.Logger, mclient pmanifest.Client) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { var mani manifest.Manifest decoder := json.NewDecoder(req.Body) @@ -396,7 +397,9 @@ func createManifestHandler(_ log.Logger, mclient pmanifest.Client) http.HandlerF return } - if err := mclient.Submit(req.Context(), requestDeploymentID(req), mani); err != nil { + subctx, cancel := context.WithTimeout(req.Context(), manifestSubmitTimeout) + defer cancel() + if err := mclient.Submit(subctx, requestDeploymentID(req), mani); err != nil { if errors.Is(err, manifestValidation.ErrInvalidManifest) { http.Error(w, err.Error(), http.StatusUnprocessableEntity) return @@ -405,6 +408,7 @@ func createManifestHandler(_ log.Logger, mclient pmanifest.Client) http.HandlerF http.Error(w, err.Error(), http.StatusNotFound) return } + log.Error("manifest submit failed", "err", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/provider/manifest/config.go b/provider/manifest/config.go index 3c902decb8..b909aef812 100644 --- a/provider/manifest/config.go +++ b/provider/manifest/config.go @@ -5,4 +5,6 @@ import "time" type ServiceConfig struct { HTTPServicesRequireAtLeastOneHost bool ManifestTimeout time.Duration + RPCQueryTimeout time.Duration + CachedResultMaxAge time.Duration } diff --git a/provider/manifest/manager.go b/provider/manifest/manager.go index e240af6fb9..b5af3144ef 100644 --- a/provider/manifest/manager.go +++ b/provider/manifest/manager.go @@ -25,6 +25,10 @@ import ( mtypes "github.com/ovrclk/akash/x/market/types" ) +const ( + manifestLingerDuration = time.Minute * time.Duration(5) +) + var ( // ErrShutdownTimerExpired for a terminating deployment ErrShutdownTimerExpired = errors.New("shutdown timer expired") @@ -33,6 +37,7 @@ var ( ErrManifestVersion = errors.New("manifest version validation failed") ErrNoManifestForDeployment = errors.New("manifest not yet received for that deployment") ErrNoLeaseForDeployment = errors.New("no lease for deployment") + errNoGroupForLease = errors.New("group not found") ) func newManager(h *service, daddr dtypes.DeploymentID) *manager { @@ -70,13 +75,16 @@ type manager struct { manifestch chan manifestRequest updatech chan []byte - data *dtypes.QueryDeploymentResponse + data dtypes.QueryDeploymentResponse requests []manifestRequest - pendingRequests []chan<- error - leases []event.LeaseWon + pendingRequests []manifestRequest manifests []*manifest.Manifest versions [][]byte + localLeases []event.LeaseWon + fetched bool + fetchedAt time.Time + stoptimer *time.Timer log log.Logger @@ -122,6 +130,12 @@ func (m *manager) handleUpdate(version []byte) { } } +func (m *manager) clearFetched() { + m.fetchedAt = time.Time{} + m.fetched = false + m.data = dtypes.QueryDeploymentResponse{} + m.localLeases = nil +} func (m *manager) run(donech chan<- *manager) { defer m.lc.ShutdownCompleted() defer func() { donech <- m }() @@ -151,21 +165,14 @@ loop: case ev := <-m.leasech: m.log.Info("new lease", "lease", ev.LeaseID) - - m.leases = append(m.leases, ev) + m.clearFetched() m.emitReceivedEvents() m.maybeScheduleStop() runch = m.maybeFetchData(ctx, runch) case id := <-m.rmleasech: m.log.Info("lease removed", "lease", id) - - for idx, lease := range m.leases { - if id.Equals(lease.LeaseID) { - m.leases = append(m.leases[:idx], m.leases[idx+1:]...) - } - } - + m.clearFetched() m.maybeScheduleStop() case req := <-m.manifestch: @@ -180,9 +187,7 @@ loop: case version := <-m.updatech: m.log.Info("received version", "version", hex.EncodeToString(version)) m.versions = append(m.versions, version) - if m.data != nil { - m.data.Deployment.Version = version - } + m.clearFetched() case result := <-runch: runch = nil @@ -194,14 +199,17 @@ loop: break } - m.data = result.Value().(*dtypes.QueryDeploymentResponse) + fetchResult := result.Value().(manifestManagerFetchDataResult) + m.fetched = true + m.fetchedAt = time.Now() + m.data = fetchResult.deployment + m.localLeases = fetchResult.leases m.log.Info("data received", "version", hex.EncodeToString(m.data.Deployment.Version)) m.validateRequests() m.emitReceivedEvents() m.maybeScheduleStop() - } } @@ -222,7 +230,13 @@ loop: } func (m *manager) maybeFetchData(ctx context.Context, runch <-chan runner.Result) <-chan runner.Result { - if m.data == nil && runch == nil { + if runch != nil { + return runch + } + + expired := time.Since(m.fetchedAt) > m.config.CachedResultMaxAge + if !m.fetched || expired { + m.clearFetched() return m.fetchData(ctx) } return runch @@ -235,16 +249,65 @@ func (m *manager) fetchData(ctx context.Context) <-chan runner.Result { }) } -func (m *manager) doFetchData(ctx context.Context) (*dtypes.QueryDeploymentResponse, error) { - res, err := m.session.Client().Query().Deployment(ctx, &dtypes.QueryDeploymentRequest{ID: m.daddr}) +type manifestManagerFetchDataResult struct { + deployment dtypes.QueryDeploymentResponse + leases []event.LeaseWon +} + +func (m *manager) doFetchData(ctx context.Context) (manifestManagerFetchDataResult, error) { + subctx, cancel := context.WithTimeout(ctx, m.config.RPCQueryTimeout) + defer cancel() + deploymentResponse, err := m.session.Client().Query().Deployment(subctx, &dtypes.QueryDeploymentRequest{ID: m.daddr}) + if err != nil { + return manifestManagerFetchDataResult{}, err + } + + leasesResponse, err := m.session.Client().Query().Leases(subctx, &mtypes.QueryLeasesRequest{ + Filters: mtypes.LeaseFilters{ + Owner: m.daddr.Owner, + DSeq: m.daddr.DSeq, + GSeq: 0, + OSeq: 0, + Provider: m.session.Provider().GetOwner(), + State: mtypes.LeaseActive.String(), + }, + Pagination: nil, + }) + if err != nil { - return nil, err + return manifestManagerFetchDataResult{}, err } - return res, nil + + groups := make(map[uint32]dtypes.Group) + for _, g := range deploymentResponse.GetGroups() { + groups[g.ID().GSeq] = g + } + + leases := make([]event.LeaseWon, len(leasesResponse.Leases)) + for i, leaseEntry := range leasesResponse.Leases { + lease := leaseEntry.GetLease() + leaseID := lease.GetLeaseID() + groupForLease, foundGroup := groups[leaseID.GetGSeq()] + if !foundGroup { + return manifestManagerFetchDataResult{}, fmt.Errorf("%w: could not locate group %v ", errNoGroupForLease, leaseID) + } + ev := event.LeaseWon{ + LeaseID: leaseID, + Group: &groupForLease, + Price: lease.GetPrice(), + } + + leases[i] = ev + } + + return manifestManagerFetchDataResult{ + deployment: *deploymentResponse, + leases: leases, + }, nil } func (m *manager) maybeScheduleStop() bool { // nolint:golint,unparam - if len(m.leases) > 0 || len(m.manifests) > 0 { + if len(m.localLeases) > 0 || len(m.manifests) > 0 { if m.stoptimer != nil { m.log.Info("stopping stop timer") if m.stoptimer.Stop() { @@ -255,7 +318,7 @@ func (m *manager) maybeScheduleStop() bool { // nolint:golint,unparam return false } if m.stoptimer != nil { - const manifestLingerDuration = time.Minute * time.Duration(5) + m.log.Info("starting stop timer", "duration", manifestLingerDuration) m.stoptimer = time.NewTimer(manifestLingerDuration) } @@ -263,8 +326,8 @@ func (m *manager) maybeScheduleStop() bool { // nolint:golint,unparam } func (m *manager) fillAllRequests(response error) { - for _, reqch := range m.pendingRequests { - reqch <- response + for _, req := range m.pendingRequests { + req.ch <- response } m.pendingRequests = nil @@ -275,46 +338,53 @@ func (m *manager) fillAllRequests(response error) { } func (m *manager) emitReceivedEvents() { - if len(m.leases) == 0 { - m.log.Debug("emit received events skips due to no leases", "data", m.data, "leases", len(m.leases), "manifests", len(m.manifests)) - m.fillAllRequests(ErrNoLeaseForDeployment) + if !m.fetched || len(m.manifests) == 0 { + m.log.Debug("emit received events skipped", "data", m.data, "manifests", len(m.manifests)) return } - if m.data == nil || len(m.manifests) == 0 { - m.log.Debug("emit received events skipped", "data", m.data, "leases", len(m.leases), "manifests", len(m.manifests)) + + if len(m.localLeases) == 0 { + m.log.Debug("emit received events skips due to no leases", "data", m.data, "manifests", len(m.manifests)) + m.fillAllRequests(ErrNoLeaseForDeployment) return } - manifest := m.manifests[len(m.manifests)-1] - - m.log.Debug("publishing manifest received", "num-leases", len(m.leases)) - - for _, lease := range m.leases { + latestManifest := m.manifests[len(m.manifests)-1] + m.log.Debug("publishing manifest received", "num-leases", len(m.localLeases)) + copyOfData := new(dtypes.QueryDeploymentResponse) + *copyOfData = m.data + for _, lease := range m.localLeases { m.log.Debug("publishing manifest received for lease", "lease_id", lease.LeaseID) if err := m.bus.Publish(event.ManifestReceived{ LeaseID: lease.LeaseID, Group: lease.Group, - Manifest: manifest, - Deployment: m.data, + Manifest: latestManifest, + Deployment: copyOfData, }); err != nil { m.log.Error("publishing event", "err", err, "lease", lease.LeaseID) } } // A manifest has been published, satisfy all pending requests - for _, reqch := range m.pendingRequests { - reqch <- nil + for _, req := range m.pendingRequests { + req.ch <- nil } m.pendingRequests = nil } func (m *manager) validateRequests() { - if m.data == nil || len(m.requests) == 0 { + if !m.fetched || len(m.requests) == 0 { return } manifests := make([]*manifest.Manifest, 0) for _, req := range m.requests { + // If the request context is complete then skip processing it + select { + case <-req.ctx.Done(): + continue + default: + } if err := m.validateRequest(req); err != nil { m.log.Error("invalid manifest", "err", err) req.ch <- err @@ -322,10 +392,11 @@ func (m *manager) validateRequests() { } manifests = append(manifests, &req.value.Manifest) - // The manifest has been grabbed from the request but not published yet, store this response - m.pendingRequests = append(m.pendingRequests, req.ch) + // The manifest has been grabbed from the request but not published yet, store this response + m.pendingRequests = append(m.pendingRequests, req) + } - m.requests = nil + m.requests = nil // all requests processed at this time m.log.Debug("requests valid", "num-requests", len(manifests)) @@ -400,7 +471,8 @@ func (m *manager) validateRequest(req manifestRequest) error { } groupNames := make([]string, 0) - for _, lease := range m.leases { + + for _, lease := range m.localLeases { groupNames = append(groupNames, lease.Group.GroupSpec.Name) } // Check that hostnames are not in use diff --git a/provider/manifest/manager_test.go b/provider/manifest/manager_test.go index 38e1a893e1..670788f951 100644 --- a/provider/manifest/manager_test.go +++ b/provider/manifest/manager_test.go @@ -2,8 +2,8 @@ package manifest import ( "context" - "errors" clustertypes "github.com/ovrclk/akash/provider/cluster/types" + escrowtypes "github.com/ovrclk/akash/x/escrow/types" "testing" "time" @@ -19,7 +19,7 @@ import ( "github.com/ovrclk/akash/testutil" "github.com/ovrclk/akash/x/deployment/types" dtypes "github.com/ovrclk/akash/x/deployment/types" - markettypes "github.com/ovrclk/akash/x/market/types" + mtypes "github.com/ovrclk/akash/x/market/types" ptypes "github.com/ovrclk/akash/x/provider/types" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -33,8 +33,7 @@ type scaffold struct { hostnames clustertypes.HostnameServiceClient } -func serviceForManifestTest(t *testing.T, cfg ServiceConfig, mani sdl.SDL, did dtypes.DeploymentID) *scaffold { - +func serviceForManifestTest(t *testing.T, cfg ServiceConfig, mani sdl.SDL, did dtypes.DeploymentID, leases []mtypes.Lease, providerAddr string, delayQueryDeployment bool) *scaffold { clientMock := &clientMocks.Client{} queryMock := &clientMocks.QueryClient{} @@ -57,14 +56,14 @@ func serviceForManifestTest(t *testing.T, cfg ServiceConfig, mani sdl.SDL, did d dgroups, err := mani.DeploymentGroups() require.NoError(t, err) require.NotNil(t, dgroups) - for _, g := range dgroups { + for i, g := range dgroups { groups = append(groups, dtypes.Group{ GroupID: dtypes.GroupID{ - Owner: "", - DSeq: 0, - GSeq: 0, + Owner: did.GetOwner(), + DSeq: did.DSeq, + GSeq: uint32(i), }, - State: 0, + State: dtypes.GroupOpen, GroupSpec: *g, }) } @@ -78,7 +77,39 @@ func serviceForManifestTest(t *testing.T, cfg ServiceConfig, mani sdl.SDL, did d }, Groups: groups, } - queryMock.On("Deployment", mock.Anything, mock.Anything).Return(res, nil) + + x := queryMock.On("Deployment", mock.Anything, mock.Anything).After(time.Second*2).Return(res, nil) + if delayQueryDeployment { + x = x.After(time.Second * 2) + } + x.Return(res, nil) + + leasesMock := make([]mtypes.QueryLeaseResponse, 0) + for _, lease := range leases { + leasesMock = append(leasesMock, mtypes.QueryLeaseResponse{ + Lease: mtypes.Lease{ + LeaseID: lease.GetLeaseID(), + State: lease.GetState(), + Price: lease.GetPrice(), + CreatedAt: lease.GetCreatedAt(), + }, + EscrowPayment: escrowtypes.Payment{}, // Ignored in this test + }) + } + queryMock.On("Leases", mock.Anything, &mtypes.QueryLeasesRequest{ + Filters: mtypes.LeaseFilters{ + Owner: did.GetOwner(), + DSeq: did.GetDSeq(), + GSeq: 0, + OSeq: 0, + Provider: providerAddr, + State: mtypes.LeaseActive.String(), + }, + Pagination: nil, + }).Return(&mtypes.QueryLeasesResponse{ + Leases: leasesMock, + Pagination: nil, + }, nil) ctx, cancel := context.WithCancel(context.Background()) @@ -88,14 +119,13 @@ func serviceForManifestTest(t *testing.T, cfg ServiceConfig, mani sdl.SDL, did d log := testutil.Logger(t) bus := pubsub.NewBus() - accAddr := testutil.AccAddress(t) p := &ptypes.Provider{ - Owner: accAddr.String(), + Owner: providerAddr, HostURI: "", Attributes: nil, } - queryMock.On("ActiveLeasesForProvider", p.Address()).Return([]markettypes.QueryLeaseResponse{}, nil) + queryMock.On("ActiveLeasesForProvider", p.Address()).Return([]mtypes.QueryLeaseResponse{}, nil) serviceInterface, err := NewService(ctx, session.New(log, clientMock, p), bus, hostnames, cfg) require.NoError(t, err) @@ -111,19 +141,46 @@ func serviceForManifestTest(t *testing.T, cfg ServiceConfig, mani sdl.SDL, did d } } -func TestManagerReturnsNoLease(t *testing.T) { - s := serviceForManifestTest(t, ServiceConfig{}, nil, dtypes.DeploymentID{}) +func TestManagerReturnsWrongVersion(t *testing.T) { + sdl2A, err := sdl.ReadFile("../../x/deployment/testdata/deployment-v2-c2c.yaml") + require.NoError(t, err) + + sdl2B, err := sdl.ReadFile("../../x/deployment/testdata/deployment-v2.yaml") + require.NoError(t, err) + + did := testutil.DeploymentID(t) + s := serviceForManifestTest(t, ServiceConfig{}, sdl2B, did, nil, testutil.AccAddress(t).String(), false) + + sdlManifest, err := sdl2A.Manifest() + require.NoError(t, err) + + err = s.svc.Submit(context.Background(), did, sdlManifest) + require.Error(t, err) + require.ErrorIs(t, err, ErrManifestVersion) + + s.cancel() + + select { + case <-s.svc.lc.Done(): + + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for service shutdown") + } +} +func TestManagerReturnsNoLease(t *testing.T) { sdl2, err := sdl.ReadFile("../../x/deployment/testdata/deployment-v2.yaml") require.NoError(t, err) + did := testutil.DeploymentID(t) + s := serviceForManifestTest(t, ServiceConfig{}, sdl2, did, nil, testutil.AccAddress(t).String(), false) + sdlManifest, err := sdl2.Manifest() require.NoError(t, err) - did := testutil.DeploymentID(t) err = s.svc.Submit(context.Background(), did, sdlManifest) require.Error(t, err) - require.True(t, errors.Is(ErrNoLeaseForDeployment, err)) + require.ErrorIs(t, err, ErrNoLeaseForDeployment) s.cancel() @@ -135,15 +192,104 @@ func TestManagerReturnsNoLease(t *testing.T) { } } +func TestManagerHandlesTimeout(t *testing.T) { + sdl2, err := sdl.ReadFile("../../x/deployment/testdata/deployment-v2.yaml") + require.NoError(t, err) + + sdlManifest, err := sdl2.Manifest() + require.NoError(t, err) + + lid := testutil.LeaseID(t) + lid.GSeq = 0 + did := lid.DeploymentID() + + dgroups, err := sdl2.DeploymentGroups() + require.NoError(t, err) + + // Tell the service that a lease has been won + dgroup := &dtypes.Group{ + GroupID: lid.GroupID(), + State: 0, + GroupSpec: *dgroups[0], + } + + ev := event.LeaseWon{ + LeaseID: lid, + Group: dgroup, + Price: sdk.Coin{ + Denom: testutil.CoinDenom, + Amount: sdk.NewInt(111), + }, + } + + s := serviceForManifestTest(t, ServiceConfig{HTTPServicesRequireAtLeastOneHost: true}, sdl2, did, nil, lid.GetProvider(), true) + err = s.bus.Publish(ev) + require.NoError(t, err) + time.Sleep(time.Second) // Wait for publish to do its thing + + testctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + err = s.svc.Submit(testctx, did, sdlManifest) + require.ErrorIs(t, err, context.DeadlineExceeded) + + s.cancel() + + select { + case <-s.svc.lc.Done(): + + case <-time.After(20 * time.Second): + t.Fatal("timed out waiting for service shutdown") + } +} + +func TestManagerHandlesMissingGroup(t *testing.T) { + sdl2, err := sdl.ReadFile("../../x/deployment/testdata/deployment-v2.yaml") + require.NoError(t, err) + + sdlManifest, err := sdl2.Manifest() + require.NoError(t, err) + + lid := testutil.LeaseID(t) + lid.GSeq = 99999 + did := lid.DeploymentID() + + version, err := sdl.ManifestVersion(sdlManifest) + require.NotNil(t, version) + require.NoError(t, err) + leases := []mtypes.Lease{{ + LeaseID: lid, + State: mtypes.LeaseActive, + Price: sdk.Coin{ + Denom: "uakt", + Amount: sdk.NewInt(111), + }, + CreatedAt: 0, + }} + s := serviceForManifestTest(t, ServiceConfig{}, sdl2, did, leases, lid.GetProvider(), false) + + err = s.svc.Submit(context.Background(), did, sdlManifest) + require.Error(t, err) + require.Regexp(t, `^group not found:.+$`, err.Error()) + + s.cancel() + select { + case <-s.svc.lc.Done(): + + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for service shutdown") + } +} + func TestManagerRequiresHostname(t *testing.T) { sdl2, err := sdl.ReadFile("../../x/deployment/testdata/deployment-v2-nohost.yaml") require.NoError(t, err) sdlManifest, err := sdl2.Manifest() - require.Len(t, sdlManifest[0].Services[0].Expose[0].Hosts, 0) require.NoError(t, err) + require.Len(t, sdlManifest[0].Services[0].Expose[0].Hosts, 0) lid := testutil.LeaseID(t) + lid.GSeq = 0 did := lid.DeploymentID() dgroups, err := sdl2.DeploymentGroups() require.NoError(t, err) @@ -166,7 +312,13 @@ func TestManagerRequiresHostname(t *testing.T) { version, err := sdl.ManifestVersion(sdlManifest) require.NotNil(t, version) require.NoError(t, err) - s := serviceForManifestTest(t, ServiceConfig{HTTPServicesRequireAtLeastOneHost: true}, sdl2, did) + leases := []mtypes.Lease{{ + LeaseID: lid, + State: mtypes.LeaseActive, + Price: ev.Price, + CreatedAt: 0, + }} + s := serviceForManifestTest(t, ServiceConfig{HTTPServicesRequireAtLeastOneHost: true}, sdl2, did, leases, lid.GetProvider(), false) err = s.bus.Publish(ev) require.NoError(t, err) @@ -195,6 +347,7 @@ func TestManagerAllowsUpdate(t *testing.T) { require.NoError(t, err) lid := testutil.LeaseID(t) + lid.GSeq = 0 did := lid.DeploymentID() dgroups, err := sdl2.DeploymentGroups() require.NoError(t, err) @@ -217,7 +370,13 @@ func TestManagerAllowsUpdate(t *testing.T) { version, err := sdl.ManifestVersion(sdlManifest) require.NotNil(t, version) require.NoError(t, err) - s := serviceForManifestTest(t, ServiceConfig{HTTPServicesRequireAtLeastOneHost: true}, sdl2, did) + leases := []mtypes.Lease{{ + LeaseID: lid, + State: mtypes.LeaseActive, + Price: ev.Price, + CreatedAt: 0, + }} + s := serviceForManifestTest(t, ServiceConfig{HTTPServicesRequireAtLeastOneHost: true}, sdl2, did, leases, lid.GetProvider(), false) err = s.bus.Publish(ev) require.NoError(t, err) diff --git a/provider/manifest/service.go b/provider/manifest/service.go index bc4d8f4c7c..2a5d132965 100644 --- a/provider/manifest/service.go +++ b/provider/manifest/service.go @@ -161,6 +161,8 @@ func (s *service) IsActive(ctx context.Context, dID dtypes.DeploymentID) (bool, // Send incoming manifest request. func (s *service) Submit(ctx context.Context, did dtypes.DeploymentID, mani manifest.Manifest) error { + // This needs to be buffered because the goroutine writing to this may get the result + // after the context has returned an error ch := make(chan error, 1) req := manifestRequest{ value: &submitRequest{ @@ -234,6 +236,9 @@ loop: switch ev := ev.(type) { case event.LeaseWon: + if ev.LeaseID.GetProvider() != s.session.Provider().Address().String() { + continue + } s.session.Log().Info("lease won", "lease", ev.LeaseID) s.handleLease(ev, true) @@ -254,7 +259,7 @@ loop: } case mtypes.EventLeaseClosed: - if ev.ID.Provider != s.session.Provider().Address().String() { + if ev.ID.GetProvider() != s.session.Provider().Address().String() { continue } diff --git a/provider/manifest/watchdog.go b/provider/manifest/watchdog.go index 8133b490c6..83c94eed55 100644 --- a/provider/manifest/watchdog.go +++ b/provider/manifest/watchdog.go @@ -4,6 +4,7 @@ import ( "context" "github.com/boz/go-lifecycle" "github.com/ovrclk/akash/provider/session" + "github.com/ovrclk/akash/util/runner" dtypes "github.com/ovrclk/akash/x/deployment/types" "github.com/ovrclk/akash/x/market/types" "github.com/tendermint/tendermint/libs/log" @@ -52,20 +53,28 @@ func (wd *watchdog) stop() { func (wd *watchdog) run() { defer wd.lc.ShutdownCompleted() + var runch <-chan runner.Result + var err error + wd.log.Debug("watchdog start") select { case <-time.After(wd.timeout): // Close the bid, since if this point is reached then a manifest has not been received - case err := <-wd.lc.ShutdownRequest(): - wd.lc.ShutdownInitiated(err) - return // Nothing to do + wd.log.Info("watchdog closing bid") + + runch = runner.Do(func() runner.Result { + return runner.NewResult(nil, wd.sess.Client().Tx().Broadcast(wd.ctx, &types.MsgCloseBid{ + BidID: types.MakeBidID(wd.leaseID.OrderID(), wd.sess.Provider().Address()), + })) + }) + case err = <-wd.lc.ShutdownRequest(): } - wd.log.Info("watchdog closing bid") - err := wd.sess.Client().Tx().Broadcast(wd.ctx, &types.MsgCloseBid{ - BidID: types.MakeBidID(wd.leaseID.OrderID(), wd.sess.Provider().Address()), - }) - if err != nil { - wd.log.Error("failed closing bid", "err", err) + wd.lc.ShutdownInitiated(err) + if runch != nil { + result := <-runch + if err := result.Error(); err != nil { + wd.log.Error("failed closing bid", "err", err) + } } } diff --git a/provider/manifest/watchdog_test.go b/provider/manifest/watchdog_test.go index 0f4c4e77b1..65c4ce52a7 100644 --- a/provider/manifest/watchdog_test.go +++ b/provider/manifest/watchdog_test.go @@ -72,6 +72,7 @@ func TestWatchdogStops(t *testing.T) { wd, scaffold := makeWatchdogTestScaffold(t, 1*time.Minute) wd.stop() // ask it to stop immediately + wd.stop() // ask it to stop a second time, this is expected usage select { case <-wd.lc.Done(): diff --git a/provider/service.go b/provider/service.go index 3091829659..913dec2768 100644 --- a/provider/service.go +++ b/provider/service.go @@ -104,6 +104,8 @@ func NewService(ctx context.Context, cctx client.Context, accAddr sdk.AccAddress manifestConfig := manifest.ServiceConfig{ HTTPServicesRequireAtLeastOneHost: !cfg.DeploymentIngressStaticHosts, ManifestTimeout: cfg.ManifestTimeout, + RPCQueryTimeout: cfg.RPCQueryTimeout, + CachedResultMaxAge: cfg.CachedResultMaxAge, } manifest, err := manifest.NewService(ctx, session, bus, cluster.HostnameService(), manifestConfig)