Skip to content

Commit

Permalink
bus: implement rhp4 pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Dec 3, 2024
1 parent ce4b79d commit 4a1e13c
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 52 deletions.
184 changes: 184 additions & 0 deletions bus/pruning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package bus

import (
"context"
"errors"
"fmt"
"time"

rhpv4 "go.sia.tech/core/rhp/v4"
"go.sia.tech/core/types"
cRHP4 "go.sia.tech/coreutils/rhp/v4"
"go.sia.tech/renterd/api"
ibus "go.sia.tech/renterd/internal/bus"
"go.sia.tech/renterd/internal/gouging"
rhp2 "go.sia.tech/renterd/internal/rhp/v2"
)

func (b *Bus) pruneContractV1(ctx context.Context, rk types.PrivateKey, cm api.ContractMetadata, gc gouging.Checker, pendingUploads map[types.Hash256]struct{}) (api.ContractPruneResponse, error) {
// prune contract
rev, spending, pruned, remaining, err := b.rhp2Client.PruneContract(ctx, rk, gc, cm.HostIP, cm.HostKey, cm.ID, cm.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) {
indices, err := b.store.PrunableContractRoots(ctx, fcid, roots)
if err != nil {
return nil, err
} else if len(indices) > len(roots) {
return nil, fmt.Errorf("selected %d prunable roots but only %d were provided", len(indices), len(roots))
}

filtered := indices[:0]
for _, index := range indices {
_, ok := pendingUploads[roots[index]]
if !ok {
filtered = append(filtered, index)
}
}
indices = filtered
return indices, nil
})
if err != nil && !errors.Is(err, rhp2.ErrNoSectorsToPrune) && !errors.Is(err, context.Canceled) {
return api.ContractPruneResponse{}, err
}

// record spending
if !spending.Total().IsZero() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
b.store.RecordContractSpending(ctx, []api.ContractSpendingRecord{
{
ContractSpending: spending,
ContractID: cm.ID,
RevisionNumber: rev.RevisionNumber,
Size: rev.Filesize,

MissedHostPayout: rev.MissedHostPayout(),
ValidRenterPayout: rev.ValidRenterPayout(),
},
})
}

var pruneErr string
if err != nil {
pruneErr = err.Error()
}

return api.ContractPruneResponse{
ContractSize: rev.Filesize,
Pruned: pruned,
Remaining: remaining,
Error: pruneErr,
}, nil
}

func (b *Bus) pruneContractV2(ctx context.Context, rk types.PrivateKey, cm api.ContractMetadata, hostIP string, gc gouging.Checker, pendingUploads map[types.Hash256]struct{}) (api.ContractPruneResponse, error) {
signer := ibus.NewFormContractSigner(b.w, rk)

// get latest revision
rev, err := b.rhp4Client.LatestRevision(ctx, cm.HostKey, hostIP, cm.ID)
if err != nil {
return api.ContractPruneResponse{}, fmt.Errorf("failed to fetch revision for pruning: %w", err)
} else if rev.RevisionNumber < cm.RevisionNumber {
return api.ContractPruneResponse{}, fmt.Errorf("latest known revision %d is less than contract revision %d", rev.RevisionNumber, cm.RevisionNumber)
}

// get prices
settings, err := b.rhp4Client.Settings(ctx, cm.HostKey, hostIP)
if err != nil {
return api.ContractPruneResponse{}, fmt.Errorf("failed to fetch prices for pruning: %w", err)
}
prices := settings.Prices

// make sure they are sane
if gb := gc.CheckV2(settings); gb.Gouging() {
return api.ContractPruneResponse{}, fmt.Errorf("host for pruning is gouging: %v", gb.String())
}

// fetch all contract roots
numsectors := rev.Filesize / rhpv4.SectorSize
sectorRoots := make([]types.Hash256, 0, numsectors)
var rootsUsage rhpv4.Usage
for offset := uint64(0); offset < numsectors; {
// calculate the batch size
length := uint64(100) // TODO: rhpv4.MaxSectorBatchSize
if offset+length > numsectors {
length = numsectors - offset
}

// fetch the batch
res, err := b.rhp4Client.SectorRoots(ctx, cm.HostKey, hostIP, b.cm.TipState(), prices, signer, cRHP4.ContractRevision{
ID: cm.ID,
Revision: rev,
}, offset, length)
if err != nil {
return api.ContractPruneResponse{}, err
}

// update revision since it was revised
rev = res.Revision

// collect roots
sectorRoots = append(sectorRoots, res.Roots...)
offset += uint64(len(res.Roots))

// update the cost
rootsUsage = rootsUsage.Add(res.Usage)
}

// fetch indices to prune
indices, err := b.store.PrunableContractRoots(ctx, cm.ID, sectorRoots)
if err != nil {
return api.ContractPruneResponse{}, err
}

// avoid pruning pending uploads
toPrune := indices[:0]
for _, index := range indices {
_, ok := pendingUploads[sectorRoots[index]]
if !ok {
toPrune = append(toPrune, index)
}
}
totalToPrune := uint64(len(toPrune))

// cap at max batch size
batchSize := uint64(100) // TODO: rhpv4.MaxSectorBatchSize
if batchSize > uint64(len(toPrune)) {
batchSize = uint64(len(toPrune))
}
toPrune = toPrune[:batchSize]

// prune the batch
res, err := b.rhp4Client.FreeSectors(ctx, b.cm.TipState(), prices, rk, cRHP4.ContractRevision{
ID: cm.ID,
Revision: rev,
}, toPrune)
if err != nil {
return api.ContractPruneResponse{}, err
}
deleteUsage := res.Usage

// record spending
if !rootsUsage.Add(deleteUsage).RenterCost().IsZero() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
b.store.RecordContractSpending(ctx, []api.ContractSpendingRecord{
{
ContractSpending: api.ContractSpending{
Deletions: deleteUsage.RenterCost(),
SectorRoots: rootsUsage.RenterCost(),
},
ContractID: cm.ID,
RevisionNumber: rev.RevisionNumber,
Size: rev.Filesize,

MissedHostPayout: rev.MissedHostOutput().Value,
ValidRenterPayout: rev.RenterOutput.Value,
},
})
}

return api.ContractPruneResponse{
ContractSize: rev.Filesize,
Pruned: uint64(len(toPrune) * rhpv4.SectorSize),
Remaining: (totalToPrune - uint64(len(toPrune))) * rhpv4.SectorSize,
}, nil
}
63 changes: 13 additions & 50 deletions bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.sia.tech/renterd/stores/sql"

"go.sia.tech/renterd/internal/gouging"
rhp2 "go.sia.tech/renterd/internal/rhp/v2"

"go.sia.tech/core/gateway"
"go.sia.tech/core/types"
Expand Down Expand Up @@ -963,6 +962,12 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) {
return
}

// fetch the corresponding host
host, err := b.store.Host(ctx, c.HostKey)
if jc.Check("failed to fetch host for pruning", err) != nil {
return
}

// build map of uploading sectors
pending := make(map[types.Hash256]struct{})
for _, root := range b.sectors.Sectors() {
Expand All @@ -971,56 +976,14 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) {

// prune the contract
rk := b.masterKey.DeriveContractKey(c.HostKey)
rev, spending, pruned, remaining, err := b.rhp2Client.PruneContract(pruneCtx, rk, gc, c.HostIP, c.HostKey, fcid, c.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) {
indices, err := b.store.PrunableContractRoots(ctx, fcid, roots)
if err != nil {
return nil, err
} else if len(indices) > len(roots) {
return nil, fmt.Errorf("selected %d prunable roots but only %d were provided", len(indices), len(roots))
}

filtered := indices[:0]
for _, index := range indices {
_, ok := pending[roots[index]]
if !ok {
filtered = append(filtered, index)
}
}
indices = filtered
return indices, nil
})

if errors.Is(err, rhp2.ErrNoSectorsToPrune) {
err = nil // ignore error
} else if !errors.Is(err, context.Canceled) {
if jc.Check("couldn't prune contract", err) != nil {
return
}
}

// record spending
if !spending.Total().IsZero() {
b.store.RecordContractSpending(jc.Request.Context(), []api.ContractSpendingRecord{
{
ContractSpending: spending,
ContractID: fcid,
RevisionNumber: rev.RevisionNumber,
Size: rev.Filesize,

MissedHostPayout: rev.MissedHostPayout(),
ValidRenterPayout: rev.ValidRenterPayout(),
},
})
}

// return response
res := api.ContractPruneResponse{
ContractSize: rev.Filesize,
Pruned: pruned,
Remaining: remaining,
var res api.ContractPruneResponse
if b.isPassedV2AllowHeight() {
res, err = b.pruneContractV2(pruneCtx, rk, c, host.V2SiamuxAddr(), gc, pending)
} else {
res, err = b.pruneContractV1(pruneCtx, rk, c, gc, pending)
}
if err != nil {
res.Error = err.Error()
if jc.Check("failed to prune contract", err) != nil {
return
}
jc.Encode(res)
}
Expand Down
8 changes: 6 additions & 2 deletions internal/rhp/v4/rhp.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ func (c *Client) LatestRevision(ctx context.Context, hk types.PublicKey, addr st
}

// SectorRoots returns the sector roots for a contract.
func (c *Client) SectorRoots(ctx context.Context, cs consensus.State, prices rhp4.HostPrices, signer rhp.ContractSigner, contract rhp.ContractRevision, offset, length uint64) (rhp.RPCSectorRootsResult, error) {
panic("not implemented")
func (c *Client) SectorRoots(ctx context.Context, hk types.PublicKey, addr string, cs consensus.State, prices rhp4.HostPrices, signer rhp.ContractSigner, contract rhp.ContractRevision, offset, length uint64) (res rhp.RPCSectorRootsResult, _ error) {
err := c.tpool.withTransport(ctx, hk, addr, func(c rhp.TransportClient) (err error) {
res, err = rhp.RPCSectorRoots(ctx, c, cs, prices, signer, contract, offset, length)
return err
})
return res, err
}

// AccountBalance returns the balance of an account.
Expand Down

0 comments on commit 4a1e13c

Please sign in to comment.