Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for pruning using RHP4 #1711

Merged
merged 4 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/implement_support_for_pruning_using_rhp4.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
default: minor
---

# Implement support for pruning using RHP4

#1711 by @ChrisSchinnerl

Closes https://github.com/SiaFoundation/renterd/issues/1676
185 changes: 185 additions & 0 deletions bus/pruning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package bus

import (
"context"
"errors"
"fmt"
"time"

rhpv4 "go.sia.tech/core/rhp/v4"
"go.sia.tech/core/types"
cRHP4 "go.sia.tech/coreutils/rhp/v4"
"go.sia.tech/renterd/api"
ibus "go.sia.tech/renterd/internal/bus"
"go.sia.tech/renterd/internal/gouging"
rhp2 "go.sia.tech/renterd/internal/rhp/v2"
)

func (b *Bus) pruneContractV1(ctx context.Context, rk types.PrivateKey, cm api.ContractMetadata, hostIP string, gc gouging.Checker, pendingUploads map[types.Hash256]struct{}) (api.ContractPruneResponse, error) {
// prune contract
rev, spending, pruned, remaining, err := b.rhp2Client.PruneContract(ctx, rk, gc, hostIP, cm.HostKey, cm.ID, cm.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) {
indices, err := b.store.PrunableContractRoots(ctx, fcid, roots)
if err != nil {
return nil, err
} else if len(indices) > len(roots) {
return nil, fmt.Errorf("selected %d prunable roots but only %d were provided", len(indices), len(roots))
}

filtered := indices[:0]
for _, index := range indices {
_, ok := pendingUploads[roots[index]]
if !ok {
filtered = append(filtered, index)
}
}
indices = filtered
return indices, nil
})
if err != nil && !errors.Is(err, rhp2.ErrNoSectorsToPrune) && !errors.Is(err, context.Canceled) {
return api.ContractPruneResponse{}, err
}

// record spending
if !spending.Total().IsZero() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
b.store.RecordContractSpending(ctx, []api.ContractSpendingRecord{
{
ContractSpending: spending,
ContractID: cm.ID,
RevisionNumber: rev.RevisionNumber,
Size: rev.Filesize,

MissedHostPayout: rev.MissedHostPayout(),
ValidRenterPayout: rev.ValidRenterPayout(),
},
})
}

var pruneErr string
if err != nil {
pruneErr = err.Error()
}

return api.ContractPruneResponse{
ContractSize: rev.Filesize,
Pruned: pruned,
Remaining: remaining,
Error: pruneErr,
}, nil
}

func (b *Bus) pruneContractV2(ctx context.Context, rk types.PrivateKey, cm api.ContractMetadata, hostIP string, gc gouging.Checker, pendingUploads map[types.Hash256]struct{}) (api.ContractPruneResponse, error) {
signer := ibus.NewFormContractSigner(b.w, rk)

// get latest revision
rev, err := b.rhp4Client.LatestRevision(ctx, cm.HostKey, hostIP, cm.ID)
if err != nil {
return api.ContractPruneResponse{}, fmt.Errorf("failed to fetch revision for pruning: %w", err)
} else if rev.RevisionNumber < cm.RevisionNumber {
return api.ContractPruneResponse{}, fmt.Errorf("latest known revision %d is less than contract revision %d", rev.RevisionNumber, cm.RevisionNumber)
}

// get prices
settings, err := b.rhp4Client.Settings(ctx, cm.HostKey, hostIP)
if err != nil {
return api.ContractPruneResponse{}, fmt.Errorf("failed to fetch prices for pruning: %w", err)
}
prices := settings.Prices

// make sure they are sane
if gb := gc.CheckV2(settings); gb.Gouging() {
return api.ContractPruneResponse{}, fmt.Errorf("host for pruning is gouging: %v", gb.String())
}

// fetch all contract roots
numsectors := rev.Filesize / rhpv4.SectorSize
sectorRoots := make([]types.Hash256, 0, numsectors)
var rootsUsage rhpv4.Usage
for offset := uint64(0); offset < numsectors; {
// calculate the batch size
length := uint64(rhpv4.MaxSectorBatchSize)
if offset+length > numsectors {
length = numsectors - offset
}

// fetch the batch
res, err := b.rhp4Client.SectorRoots(ctx, cm.HostKey, hostIP, b.cm.TipState(), prices, signer, cRHP4.ContractRevision{
ID: cm.ID,
Revision: rev,
}, offset, length)
if err != nil {
return api.ContractPruneResponse{}, err
}

// update revision since it was revised
rev = res.Revision

// collect roots
sectorRoots = append(sectorRoots, res.Roots...)
offset += uint64(len(res.Roots))

// update the cost
rootsUsage = rootsUsage.Add(res.Usage)
}

// fetch indices to prune
indices, err := b.store.PrunableContractRoots(ctx, cm.ID, sectorRoots)
if err != nil {
return api.ContractPruneResponse{}, err
}

// avoid pruning pending uploads
toPrune := indices[:0]
for _, index := range indices {
_, ok := pendingUploads[sectorRoots[index]]
if !ok {
toPrune = append(toPrune, index)
}
}
totalToPrune := uint64(len(toPrune))

// cap at max batch size
batchSize := rhpv4.MaxSectorBatchSize
if batchSize > len(toPrune) {
batchSize = len(toPrune)
}
toPrune = toPrune[:batchSize]

// prune the batch
res, err := b.rhp4Client.FreeSectors(ctx, cm.HostKey, hostIP, b.cm.TipState(), prices, rk, cRHP4.ContractRevision{
ID: cm.ID,
Revision: rev,
}, toPrune)
if err != nil {
return api.ContractPruneResponse{}, err
}
deleteUsage := res.Usage
rev = res.Revision // update rev

// record spending
if !rootsUsage.Add(deleteUsage).RenterCost().IsZero() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
b.store.RecordContractSpending(ctx, []api.ContractSpendingRecord{
{
ContractSpending: api.ContractSpending{
Deletions: deleteUsage.RenterCost(),
SectorRoots: rootsUsage.RenterCost(),
},
ContractID: cm.ID,
RevisionNumber: rev.RevisionNumber,
Size: rev.Filesize,

MissedHostPayout: rev.MissedHostOutput().Value,
ValidRenterPayout: rev.RenterOutput.Value,
},
})
}

return api.ContractPruneResponse{
ContractSize: rev.Filesize,
Pruned: uint64(len(toPrune) * rhpv4.SectorSize),
Remaining: (totalToPrune - uint64(len(toPrune))) * rhpv4.SectorSize,
}, nil
}
63 changes: 10 additions & 53 deletions bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.sia.tech/renterd/stores/sql"

"go.sia.tech/renterd/internal/gouging"
rhp2 "go.sia.tech/renterd/internal/rhp/v2"

"go.sia.tech/core/gateway"
"go.sia.tech/core/types"
Expand Down Expand Up @@ -926,9 +925,9 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) {
return
}

// fetch the host from the bus
host, err := b.store.Host(jc.Request.Context(), c.HostKey)
if jc.Check("failed to fetch host for contract", err) != nil {
// fetch the corresponding host
host, err := b.store.Host(ctx, c.HostKey)
if jc.Check("failed to fetch host for pruning", err) != nil {
return
}

Expand All @@ -940,56 +939,14 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) {

// prune the contract
rk := b.masterKey.DeriveContractKey(c.HostKey)
rev, spending, pruned, remaining, err := b.rhp2Client.PruneContract(pruneCtx, rk, gc, host.NetAddress, c.HostKey, fcid, c.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) {
indices, err := b.store.PrunableContractRoots(ctx, fcid, roots)
if err != nil {
return nil, err
} else if len(indices) > len(roots) {
return nil, fmt.Errorf("selected %d prunable roots but only %d were provided", len(indices), len(roots))
}

filtered := indices[:0]
for _, index := range indices {
_, ok := pending[roots[index]]
if !ok {
filtered = append(filtered, index)
}
}
indices = filtered
return indices, nil
})

if errors.Is(err, rhp2.ErrNoSectorsToPrune) {
err = nil // ignore error
} else if !errors.Is(err, context.Canceled) {
if jc.Check("couldn't prune contract", err) != nil {
return
}
}

// record spending
if !spending.Total().IsZero() {
b.store.RecordContractSpending(jc.Request.Context(), []api.ContractSpendingRecord{
{
ContractSpending: spending,
ContractID: fcid,
RevisionNumber: rev.RevisionNumber,
Size: rev.Filesize,

MissedHostPayout: rev.MissedHostPayout(),
ValidRenterPayout: rev.ValidRenterPayout(),
},
})
}

// return response
res := api.ContractPruneResponse{
ContractSize: rev.Filesize,
Pruned: pruned,
Remaining: remaining,
var res api.ContractPruneResponse
if b.isPassedV2AllowHeight() {
res, err = b.pruneContractV2(pruneCtx, rk, c, host.V2SiamuxAddr(), gc, pending)
} else {
res, err = b.pruneContractV1(pruneCtx, rk, c, host.NetAddress, gc, pending)
}
if err != nil {
res.Error = err.Error()
if jc.Check("failed to prune contract", err) != nil {
return
}
jc.Encode(res)
}
Expand Down
16 changes: 12 additions & 4 deletions internal/rhp/v4/rhp.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ func (c *Client) VerifySector(ctx context.Context, prices rhp4.HostPrices, token
}

// FreeSectors removes sectors from a contract.
func (c *Client) FreeSectors(ctx context.Context, cs consensus.State, prices rhp4.HostPrices, sk types.PrivateKey, contract rhp.ContractRevision, indices []uint64) (rhp.RPCFreeSectorsResult, error) {
panic("not implemented")
func (c *Client) FreeSectors(ctx context.Context, hk types.PublicKey, hostIP string, cs consensus.State, prices rhp4.HostPrices, sk types.PrivateKey, contract rhp.ContractRevision, indices []uint64) (res rhp.RPCFreeSectorsResult, _ error) {
err := c.tpool.withTransport(ctx, hk, hostIP, func(t rhp.TransportClient) (err error) {
res, err = rhp.RPCFreeSectors(ctx, t, cs, prices, sk, contract, indices)
return
})
return res, err
}

// AppendSectors appends sectors a host is storing to a contract.
Expand Down Expand Up @@ -123,8 +127,12 @@ func (c *Client) LatestRevision(ctx context.Context, hk types.PublicKey, addr st
}

// SectorRoots returns the sector roots for a contract.
func (c *Client) SectorRoots(ctx context.Context, cs consensus.State, prices rhp4.HostPrices, signer rhp.ContractSigner, contract rhp.ContractRevision, offset, length uint64) (rhp.RPCSectorRootsResult, error) {
panic("not implemented")
func (c *Client) SectorRoots(ctx context.Context, hk types.PublicKey, addr string, cs consensus.State, prices rhp4.HostPrices, signer rhp.ContractSigner, contract rhp.ContractRevision, offset, length uint64) (res rhp.RPCSectorRootsResult, _ error) {
err := c.tpool.withTransport(ctx, hk, addr, func(c rhp.TransportClient) (err error) {
res, err = rhp.RPCSectorRoots(ctx, c, cs, prices, signer, contract, offset, length)
return err
})
return res, err
}

// AccountBalance returns the balance of an account.
Expand Down
7 changes: 1 addition & 6 deletions internal/test/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,7 @@ func (tc *TestCluster) ContractRoots(ctx context.Context, fcid types.FileContrac
if h == nil {
return nil, fmt.Errorf("no host found for contract %v", c)
}

roots, err := h.store.SectorRoots()
if err != nil {
return nil, err
}
return roots[c.ID], nil
return h.contracts.SectorRoots(fcid), nil
}

func (tc *TestCluster) ShutdownAutopilot(ctx context.Context) {
Expand Down
4 changes: 2 additions & 2 deletions internal/test/e2e/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestSectorPruning(t *testing.T) {
cRoots, err := cluster.ContractRoots(context.Background(), c.ID)
tt.OK(err)
if len(dbRoots) != len(cRoots) {
t.Fatal("unexpected number of roots", dbRoots, cRoots)
t.Fatal("unexpected number of roots", len(dbRoots), len(cRoots))
}
for _, root := range dbRoots {
if !hasRoot(cRoots, root) {
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestSectorPruning(t *testing.T) {
res, err = b.PrunableData(context.Background())
tt.OK(err)
if res.TotalPrunable != 0 {
t.Fatalf("unexpected prunable data: %d", n)
t.Fatalf("unexpected no prunable data: %d", n)
}

// assert spending was updated
Expand Down
Loading