Skip to content

Commit

Permalink
refactor allocate command
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Apr 15, 2024
1 parent 16a4de2 commit 52b46f9
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 120 deletions.
209 changes: 195 additions & 14 deletions cmd/boost/direct_deal.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bufio"
"fmt"
"os"
"strconv"
Expand All @@ -14,13 +15,13 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
verifreg13types "github.com/filecoin-project/go-state-types/builtin/v13/verifreg"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/verifreg"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/tablewriter"
"github.com/ipfs/go-cid"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -70,11 +71,166 @@ var directDealAllocate = &cli.Command{
"Default is 60 days.",
Value: verifreg13types.MaximumVerifiedAllocationExpiration,
},
&cli.StringFlag{
Name: "piece-file",
Usage: "file containing piece-info[s] to create the allocation. Each line in the file should be in the format 'pieceCid,pieceSize,miner,tmin,tmax,expiration'",
Required: true,
Aliases: []string{"pf"},
},
&cli.IntFlag{
Name: "batch-size",
Usage: "number of extend requests per batch. If set incorrectly, this will lead to out of gas error",
Value: 500,
},
&cli.IntFlag{
Name: "confidence",
Usage: "number of block confirmations to wait for",
Value: int(build.MessageConfidence),
},
},
Before: before,
Action: func(cctx *cli.Context) error {
ctx := bcli.ReqContext(cctx)

pieceFile := cctx.String("piece-file")
miners := cctx.StringSlice("miner")
pinfos := cctx.StringSlice("piece-info")
if pieceFile == "" && len(pinfos) < 1 {
return fmt.Errorf("must provide at least one --piece-info or use --piece-file")
}

if pieceFile == "" && len(miners) < 1 {
return fmt.Errorf("must provide at least one miner address or use --piece-file")
}

if pieceFile != "" && len(pinfos) > 0 {
return fmt.Errorf("cannot use both --piece-info and --piece-file flags at once")
}

var pieceInfos []util.PieceInfos

if pieceFile != "" {
// Read file line by line
loc, err := homedir.Expand(pieceFile)
if err != nil {
return err
}
file, err := os.Open(loc)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
// Extract pieceCid, pieceSize and MinerAddr from line
parts := strings.Split(line, ",")
if len(parts) != 6 {
return fmt.Errorf("invalid line format. Expected pieceCid, pieceSize, MinerAddr, TMin, TMax, Exp at %s", line)
}
if parts[0] == "" || parts[1] == "" || parts[2] == "" || parts[3] == "" || parts[4] == "" || parts[5] == "" {
return fmt.Errorf("empty column value in the input file at %s", line)
}

pieceCid, err := cid.Parse(parts[0])
if err != nil {
return fmt.Errorf("failed to parse CID: %w", err)
}
pieceSize, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse size %w", err)
}
maddr, err := address.NewFromString(parts[2])
if err != nil {
return fmt.Errorf("failed to parse miner address %w", err)
}

mid, err := address.IDFromAddress(maddr)
if err != nil {
return fmt.Errorf("failed to convert miner address %w", err)
}

tmin, err := strconv.ParseUint(parts[3], 10, 64)
if err != nil {
return fmt.Errorf("failed to tmin %w", err)
}

tmax, err := strconv.ParseUint(parts[4], 10, 64)
if err != nil {
return fmt.Errorf("failed to tmax %w", err)
}

exp, err := strconv.ParseUint(parts[5], 10, 64)
if err != nil {
return fmt.Errorf("failed to expiration %w", err)
}

if tmax < tmin {
return fmt.Errorf("maximum duration %d cannot be smaller than minimum duration %d", tmax, tmin)
}

pieceInfos = append(pieceInfos, util.PieceInfos{
Cid: pieceCid,
Size: pieceSize,
Miner: abi.ActorID(mid),
MinerAddr: maddr,
Tmin: abi.ChainEpoch(tmin),
Tmax: abi.ChainEpoch(tmax),
Exp: abi.ChainEpoch(exp),
})
if err := scanner.Err(); err != nil {
return err
}
}
} else {
for _, miner := range miners {
maddr, err := address.NewFromString(miner)
if err != nil {
return fmt.Errorf("failed to parse miner address %w", err)
}

mid, err := address.IDFromAddress(maddr)
if err != nil {
return fmt.Errorf("failed to convert miner address %w", err)
}
for _, p := range cctx.StringSlice("piece-info") {
pieceDetail := strings.Split(p, "=")
if len(pieceDetail) > 2 {
return fmt.Errorf("incorrect pieceInfo format: %s", pieceDetail)
}

size, err := strconv.ParseInt(pieceDetail[1], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse the piece size for %s for pieceCid %s: %w", pieceDetail[0], pieceDetail[1], err)
}
pcid, err := cid.Parse(pieceDetail[0])
if err != nil {
return fmt.Errorf("failed to parse the pieceCid for %s: %w", pieceDetail[0], err)
}

tmin := abi.ChainEpoch(cctx.Int64("term-min"))

tmax := abi.ChainEpoch(cctx.Int64("term-max"))

exp := abi.ChainEpoch(cctx.Int64("expiration"))

if tmax < tmin {
return fmt.Errorf("maximum duration %d cannot be smaller than minimum duration %d", tmax, tmin)
}

pieceInfos = append(pieceInfos, util.PieceInfos{
Cid: pcid,
Size: size,
Miner: abi.ActorID(mid),
MinerAddr: maddr,
Tmin: tmin,
Tmax: tmax,
Exp: exp,
})
}
}
}

n, err := clinode.Setup(cctx.String(cmd.FlagRepo.Name))
if err != nil {
return err
Expand All @@ -94,7 +250,7 @@ var directDealAllocate = &cli.Command{

log.Debugw("selected wallet", "wallet", walletAddr)

msg, err := util.CreateAllocationMsg(ctx, gapi, cctx.StringSlice("piece-info"), cctx.StringSlice("miner"), walletAddr, abi.ChainEpoch(cctx.Int64("term-min")), abi.ChainEpoch(cctx.Int64("term-max")), abi.ChainEpoch(cctx.Int64("expiration")))
msgs, err := util.CreateAllocationMsg(ctx, gapi, pieceInfos, walletAddr, cctx.Int("batch-size"))

if err != nil {
return err
Expand All @@ -105,24 +261,49 @@ var directDealAllocate = &cli.Command{
return fmt.Errorf("failed to get allocations: %w", err)
}

mcid, sent, err := lib.SignAndPushToMpool(cctx, ctx, gapi, n, msg)
if err != nil {
return err
var mcids []cid.Cid

for _, msg := range msgs {
mcid, sent, err := lib.SignAndPushToMpool(cctx, ctx, gapi, n, msg)
if err != nil {
return err
}
if !sent {
fmt.Printf("message %s with method %s not sent\n", msg.Cid(), msg.Method.String())
continue
}
mcids = append(mcids, mcid)
}
if !sent {
return nil

var mcidStr []string
for _, c := range mcids {
mcidStr = append(mcidStr, c.String())
}

log.Infow("submitted data cap allocation message", "cid", mcid.String())
log.Infow("submitted data cap allocation message[s]", mcidStr)
log.Info("waiting for message to be included in a block")

res, err := gapi.StateWaitMsg(ctx, mcid, 1, lapi.LookbackNoLimit, true)
if err != nil {
return fmt.Errorf("waiting for message to be included in a block: %w", err)
}
// wait for msgs to get mined into a block
eg := errgroup.Group{}
eg.SetLimit(10)
for _, msg := range mcids {
m := msg
eg.Go(func() error {
wait, err := gapi.StateWaitMsg(ctx, m, uint64(cctx.Int("confidence")), 2000, true)
if err != nil {
return fmt.Errorf("timeout waiting for message to land on chain %s", wait.Message)

}

if !res.Receipt.ExitCode.IsSuccess() {
return fmt.Errorf("failed to execute the message with error: %s", res.Receipt.ExitCode.Error())
if wait.Receipt.ExitCode.IsError() {
return fmt.Errorf("failed to execute message %s: %w", wait.Message, wait.Receipt.ExitCode)
}
return nil
})
}
err = eg.Wait()
if err != nil {
return err
}

// Return early of quiet flag is set
Expand Down
Loading

0 comments on commit 52b46f9

Please sign in to comment.