diff --git a/.changeset/implement_support_for_pruning_using_rhp4.md b/.changeset/implement_support_for_pruning_using_rhp4.md new file mode 100644 index 000000000..2adfb8043 --- /dev/null +++ b/.changeset/implement_support_for_pruning_using_rhp4.md @@ -0,0 +1,9 @@ +--- +default: minor +--- + +# Implement support for pruning using RHP4 + +#1711 by @ChrisSchinnerl + +Closes https://github.com/SiaFoundation/renterd/issues/1676 diff --git a/bus/pruning.go b/bus/pruning.go new file mode 100644 index 000000000..bea56a2b6 --- /dev/null +++ b/bus/pruning.go @@ -0,0 +1,185 @@ +package bus + +import ( + "context" + "errors" + "fmt" + "time" + + rhpv4 "go.sia.tech/core/rhp/v4" + "go.sia.tech/core/types" + cRHP4 "go.sia.tech/coreutils/rhp/v4" + "go.sia.tech/renterd/api" + ibus "go.sia.tech/renterd/internal/bus" + "go.sia.tech/renterd/internal/gouging" + rhp2 "go.sia.tech/renterd/internal/rhp/v2" +) + +func (b *Bus) pruneContractV1(ctx context.Context, rk types.PrivateKey, cm api.ContractMetadata, hostIP string, gc gouging.Checker, pendingUploads map[types.Hash256]struct{}) (api.ContractPruneResponse, error) { + // prune contract + rev, spending, pruned, remaining, err := b.rhp2Client.PruneContract(ctx, rk, gc, hostIP, cm.HostKey, cm.ID, cm.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) { + indices, err := b.store.PrunableContractRoots(ctx, fcid, roots) + if err != nil { + return nil, err + } else if len(indices) > len(roots) { + return nil, fmt.Errorf("selected %d prunable roots but only %d were provided", len(indices), len(roots)) + } + + filtered := indices[:0] + for _, index := range indices { + _, ok := pendingUploads[roots[index]] + if !ok { + filtered = append(filtered, index) + } + } + indices = filtered + return indices, nil + }) + if err != nil && !errors.Is(err, rhp2.ErrNoSectorsToPrune) && !errors.Is(err, context.Canceled) { + return api.ContractPruneResponse{}, err + } + + // record spending + if !spending.Total().IsZero() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + b.store.RecordContractSpending(ctx, []api.ContractSpendingRecord{ + { + ContractSpending: spending, + ContractID: cm.ID, + RevisionNumber: rev.RevisionNumber, + Size: rev.Filesize, + + MissedHostPayout: rev.MissedHostPayout(), + ValidRenterPayout: rev.ValidRenterPayout(), + }, + }) + } + + var pruneErr string + if err != nil { + pruneErr = err.Error() + } + + return api.ContractPruneResponse{ + ContractSize: rev.Filesize, + Pruned: pruned, + Remaining: remaining, + Error: pruneErr, + }, nil +} + +func (b *Bus) pruneContractV2(ctx context.Context, rk types.PrivateKey, cm api.ContractMetadata, hostIP string, gc gouging.Checker, pendingUploads map[types.Hash256]struct{}) (api.ContractPruneResponse, error) { + signer := ibus.NewFormContractSigner(b.w, rk) + + // get latest revision + rev, err := b.rhp4Client.LatestRevision(ctx, cm.HostKey, hostIP, cm.ID) + if err != nil { + return api.ContractPruneResponse{}, fmt.Errorf("failed to fetch revision for pruning: %w", err) + } else if rev.RevisionNumber < cm.RevisionNumber { + return api.ContractPruneResponse{}, fmt.Errorf("latest known revision %d is less than contract revision %d", rev.RevisionNumber, cm.RevisionNumber) + } + + // get prices + settings, err := b.rhp4Client.Settings(ctx, cm.HostKey, hostIP) + if err != nil { + return api.ContractPruneResponse{}, fmt.Errorf("failed to fetch prices for pruning: %w", err) + } + prices := settings.Prices + + // make sure they are sane + if gb := gc.CheckV2(settings); gb.Gouging() { + return api.ContractPruneResponse{}, fmt.Errorf("host for pruning is gouging: %v", gb.String()) + } + + // fetch all contract roots + numsectors := rev.Filesize / rhpv4.SectorSize + sectorRoots := make([]types.Hash256, 0, numsectors) + var rootsUsage rhpv4.Usage + for offset := uint64(0); offset < numsectors; { + // calculate the batch size + length := uint64(rhpv4.MaxSectorBatchSize) + if offset+length > numsectors { + length = numsectors - offset + } + + // fetch the batch + res, err := b.rhp4Client.SectorRoots(ctx, cm.HostKey, hostIP, b.cm.TipState(), prices, signer, cRHP4.ContractRevision{ + ID: cm.ID, + Revision: rev, + }, offset, length) + if err != nil { + return api.ContractPruneResponse{}, err + } + + // update revision since it was revised + rev = res.Revision + + // collect roots + sectorRoots = append(sectorRoots, res.Roots...) + offset += uint64(len(res.Roots)) + + // update the cost + rootsUsage = rootsUsage.Add(res.Usage) + } + + // fetch indices to prune + indices, err := b.store.PrunableContractRoots(ctx, cm.ID, sectorRoots) + if err != nil { + return api.ContractPruneResponse{}, err + } + + // avoid pruning pending uploads + toPrune := indices[:0] + for _, index := range indices { + _, ok := pendingUploads[sectorRoots[index]] + if !ok { + toPrune = append(toPrune, index) + } + } + totalToPrune := uint64(len(toPrune)) + + // cap at max batch size + batchSize := rhpv4.MaxSectorBatchSize + if batchSize > len(toPrune) { + batchSize = len(toPrune) + } + toPrune = toPrune[:batchSize] + + // prune the batch + res, err := b.rhp4Client.FreeSectors(ctx, cm.HostKey, hostIP, b.cm.TipState(), prices, rk, cRHP4.ContractRevision{ + ID: cm.ID, + Revision: rev, + }, toPrune) + if err != nil { + return api.ContractPruneResponse{}, err + } + deleteUsage := res.Usage + rev = res.Revision // update rev + + // record spending + if !rootsUsage.Add(deleteUsage).RenterCost().IsZero() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + b.store.RecordContractSpending(ctx, []api.ContractSpendingRecord{ + { + ContractSpending: api.ContractSpending{ + Deletions: deleteUsage.RenterCost(), + SectorRoots: rootsUsage.RenterCost(), + }, + ContractID: cm.ID, + RevisionNumber: rev.RevisionNumber, + Size: rev.Filesize, + + MissedHostPayout: rev.MissedHostOutput().Value, + ValidRenterPayout: rev.RenterOutput.Value, + }, + }) + } + + return api.ContractPruneResponse{ + ContractSize: rev.Filesize, + Pruned: uint64(len(toPrune) * rhpv4.SectorSize), + Remaining: (totalToPrune - uint64(len(toPrune))) * rhpv4.SectorSize, + }, nil +} diff --git a/bus/routes.go b/bus/routes.go index 70fecfc6b..be6ba00ba 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -25,7 +25,6 @@ import ( "go.sia.tech/renterd/stores/sql" "go.sia.tech/renterd/internal/gouging" - rhp2 "go.sia.tech/renterd/internal/rhp/v2" "go.sia.tech/core/gateway" "go.sia.tech/core/types" @@ -926,9 +925,9 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) { return } - // fetch the host from the bus - host, err := b.store.Host(jc.Request.Context(), c.HostKey) - if jc.Check("failed to fetch host for contract", err) != nil { + // fetch the corresponding host + host, err := b.store.Host(ctx, c.HostKey) + if jc.Check("failed to fetch host for pruning", err) != nil { return } @@ -940,56 +939,14 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) { // prune the contract rk := b.masterKey.DeriveContractKey(c.HostKey) - rev, spending, pruned, remaining, err := b.rhp2Client.PruneContract(pruneCtx, rk, gc, host.NetAddress, c.HostKey, fcid, c.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) { - indices, err := b.store.PrunableContractRoots(ctx, fcid, roots) - if err != nil { - return nil, err - } else if len(indices) > len(roots) { - return nil, fmt.Errorf("selected %d prunable roots but only %d were provided", len(indices), len(roots)) - } - - filtered := indices[:0] - for _, index := range indices { - _, ok := pending[roots[index]] - if !ok { - filtered = append(filtered, index) - } - } - indices = filtered - return indices, nil - }) - - if errors.Is(err, rhp2.ErrNoSectorsToPrune) { - err = nil // ignore error - } else if !errors.Is(err, context.Canceled) { - if jc.Check("couldn't prune contract", err) != nil { - return - } - } - - // record spending - if !spending.Total().IsZero() { - b.store.RecordContractSpending(jc.Request.Context(), []api.ContractSpendingRecord{ - { - ContractSpending: spending, - ContractID: fcid, - RevisionNumber: rev.RevisionNumber, - Size: rev.Filesize, - - MissedHostPayout: rev.MissedHostPayout(), - ValidRenterPayout: rev.ValidRenterPayout(), - }, - }) - } - - // return response - res := api.ContractPruneResponse{ - ContractSize: rev.Filesize, - Pruned: pruned, - Remaining: remaining, + var res api.ContractPruneResponse + if b.isPassedV2AllowHeight() { + res, err = b.pruneContractV2(pruneCtx, rk, c, host.V2SiamuxAddr(), gc, pending) + } else { + res, err = b.pruneContractV1(pruneCtx, rk, c, host.NetAddress, gc, pending) } - if err != nil { - res.Error = err.Error() + if jc.Check("failed to prune contract", err) != nil { + return } jc.Encode(res) } diff --git a/internal/rhp/v4/rhp.go b/internal/rhp/v4/rhp.go index 51bfde95e..f641ca90a 100644 --- a/internal/rhp/v4/rhp.go +++ b/internal/rhp/v4/rhp.go @@ -84,8 +84,12 @@ func (c *Client) VerifySector(ctx context.Context, prices rhp4.HostPrices, token } // FreeSectors removes sectors from a contract. -func (c *Client) FreeSectors(ctx context.Context, cs consensus.State, prices rhp4.HostPrices, sk types.PrivateKey, contract rhp.ContractRevision, indices []uint64) (rhp.RPCFreeSectorsResult, error) { - panic("not implemented") +func (c *Client) FreeSectors(ctx context.Context, hk types.PublicKey, hostIP string, cs consensus.State, prices rhp4.HostPrices, sk types.PrivateKey, contract rhp.ContractRevision, indices []uint64) (res rhp.RPCFreeSectorsResult, _ error) { + err := c.tpool.withTransport(ctx, hk, hostIP, func(t rhp.TransportClient) (err error) { + res, err = rhp.RPCFreeSectors(ctx, t, cs, prices, sk, contract, indices) + return + }) + return res, err } // AppendSectors appends sectors a host is storing to a contract. @@ -123,8 +127,12 @@ func (c *Client) LatestRevision(ctx context.Context, hk types.PublicKey, addr st } // SectorRoots returns the sector roots for a contract. -func (c *Client) SectorRoots(ctx context.Context, cs consensus.State, prices rhp4.HostPrices, signer rhp.ContractSigner, contract rhp.ContractRevision, offset, length uint64) (rhp.RPCSectorRootsResult, error) { - panic("not implemented") +func (c *Client) SectorRoots(ctx context.Context, hk types.PublicKey, addr string, cs consensus.State, prices rhp4.HostPrices, signer rhp.ContractSigner, contract rhp.ContractRevision, offset, length uint64) (res rhp.RPCSectorRootsResult, _ error) { + err := c.tpool.withTransport(ctx, hk, addr, func(c rhp.TransportClient) (err error) { + res, err = rhp.RPCSectorRoots(ctx, c, cs, prices, signer, contract, offset, length) + return err + }) + return res, err } // AccountBalance returns the balance of an account. diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index 0cc153492..dcd210c83 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -115,12 +115,7 @@ func (tc *TestCluster) ContractRoots(ctx context.Context, fcid types.FileContrac if h == nil { return nil, fmt.Errorf("no host found for contract %v", c) } - - roots, err := h.store.SectorRoots() - if err != nil { - return nil, err - } - return roots[c.ID], nil + return h.contracts.SectorRoots(fcid), nil } func (tc *TestCluster) ShutdownAutopilot(ctx context.Context) { diff --git a/internal/test/e2e/pruning_test.go b/internal/test/e2e/pruning_test.go index 6b7798a94..53e740de6 100644 --- a/internal/test/e2e/pruning_test.go +++ b/internal/test/e2e/pruning_test.go @@ -133,7 +133,7 @@ func TestSectorPruning(t *testing.T) { cRoots, err := cluster.ContractRoots(context.Background(), c.ID) tt.OK(err) if len(dbRoots) != len(cRoots) { - t.Fatal("unexpected number of roots", dbRoots, cRoots) + t.Fatal("unexpected number of roots", len(dbRoots), len(cRoots)) } for _, root := range dbRoots { if !hasRoot(cRoots, root) { @@ -187,7 +187,7 @@ func TestSectorPruning(t *testing.T) { res, err = b.PrunableData(context.Background()) tt.OK(err) if res.TotalPrunable != 0 { - t.Fatalf("unexpected prunable data: %d", n) + t.Fatalf("unexpected no prunable data: %d", n) } // assert spending was updated