Skip to content

Commit

Permalink
sector import: Remote Commit2
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Sep 9, 2022
1 parent 374d6f5 commit bcaa395
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 20 deletions.
10 changes: 10 additions & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ type RemoteSectorMeta struct {
// todo better doc
RemoteCommit1Endpoint string

RemoteCommit2Endpoint string

// todo OnDone / OnStateChange
}

Expand All @@ -577,3 +579,11 @@ type RemoteCommit1Params struct {

ProofType abi.RegisteredSealProof
}

type RemoteCommit2Params struct {
Sector abi.SectorID
ProofType abi.RegisteredSealProof

// todo spec better
Commit1Out storiface.Commit1Out
}
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion cmd/lotus-miner/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ var stateList = []stateMeta{
{col: color.FgRed, state: sealing.SealPreCommit2Failed},
{col: color.FgRed, state: sealing.PreCommitFailed},
{col: color.FgRed, state: sealing.ComputeProofFailed},
{col: color.FgRed, state: sealing.RemoteCommit1Failed},
{col: color.FgRed, state: sealing.RemoteCommitFailed},
{col: color.FgRed, state: sealing.CommitFailed},
{col: color.FgRed, state: sealing.CommitFinalizeFailed},
{col: color.FgRed, state: sealing.PackingFailed},
Expand Down
3 changes: 2 additions & 1 deletion documentation/en/api-v0-methods-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -3240,7 +3240,8 @@ Inputs:
}
]
},
"RemoteCommit1Endpoint": "string value"
"RemoteCommit1Endpoint": "string value",
"RemoteCommit2Endpoint": "string value"
}
]
```
Expand Down
2 changes: 1 addition & 1 deletion itests/sector_import_full_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func TestSectorImport(t *testing.T) {
require.Len(t, ng, 1)
require.Equal(t, snum, ng[0])

miner.WaitSectorsProvingAllowFails(ctx, map[abi.SectorNumber]struct{}{snum: {}}, map[api.SectorState]struct{}{api.SectorState(sealing.RemoteCommit1Failed): {}})
miner.WaitSectorsProvingAllowFails(ctx, map[abi.SectorNumber]struct{}{snum: {}}, map[api.SectorState]struct{}{api.SectorState(sealing.RemoteCommitFailed): {}})
}
}

Expand Down
29 changes: 29 additions & 0 deletions itests/sector_import_simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,14 @@ func TestSectorImportAfterPC2(t *testing.T) {
m := mux.NewRouter()
m.HandleFunc("/sectors/{type}/{id}", remoteGetSector(sectorDir)).Methods("GET")
m.HandleFunc("/sectors/{id}/commit1", remoteCommit1(sealer)).Methods("POST")
m.HandleFunc("/commit2", remoteCommit2(sealer)).Methods("POST")
srv := httptest.NewServer(m)

unsealedURL := fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum)
sealedURL := fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum)
cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum)
remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum)
remoteC2URL := fmt.Sprintf("%s/commit2", srv.URL)

////////
// import the sector and continue sealing
Expand Down Expand Up @@ -172,6 +174,7 @@ func TestSectorImportAfterPC2(t *testing.T) {
},

RemoteCommit1Endpoint: remoteC1URL,
RemoteCommit2Endpoint: remoteC2URL,
})
require.NoError(t, err)

Expand Down Expand Up @@ -232,6 +235,32 @@ func remoteCommit1(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Req
}
}

func remoteCommit2(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var params api.RemoteCommit2Params
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
w.WriteHeader(500)
return
}

sref := storiface.SectorRef{
ID: params.Sector,
ProofType: params.ProofType,
}

p, err := s.SealCommit2(r.Context(), sref, params.Commit1Out)
if err != nil {
fmt.Println("c2 error: ", err)
w.WriteHeader(500)
return
}

if _, err := w.Write(p); err != nil {
fmt.Println("c2 write error")
}
}
}

func remoteGetSector(sectorRoot string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {

Expand Down
36 changes: 35 additions & 1 deletion storage/pipeline/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions storage/pipeline/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryComputeProof{}, Committing),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
),
RemoteCommit1Failed: planOne(
RemoteCommitFailed: planOne(
on(SectorRetryComputeProof{}, Committing),
),
CommitFinalizeFailed: planOne(
Expand Down Expand Up @@ -542,8 +542,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handlePreCommitFailed, processed, nil
case ComputeProofFailed:
return m.handleComputeProofFailed, processed, nil
case RemoteCommit1Failed:
return m.handleRemoteCommit1Failed, processed, nil
case RemoteCommitFailed:
return m.handleRemoteCommitFailed, processed, nil
case CommitFailed:
return m.handleCommitFailed, processed, nil
case CommitFinalizeFailed:
Expand Down Expand Up @@ -671,8 +671,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
return uint64(i + 1), nil
case SectorComputeProofFailed:
state.State = ComputeProofFailed
case SectorRemoteCommit1Failed:
state.State = RemoteCommit1Failed
case SectorRemoteCommit1Failed, SectorRemoteCommit2Failed:
state.State = RemoteCommitFailed
case SectorSealPreCommit1Failed:
state.State = SealPreCommit1Failed
case SectorCommitFailed:
Expand Down
5 changes: 5 additions & 0 deletions storage/pipeline/fsm_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ type SectorRemoteCommit1Failed struct{ error }
func (evt SectorRemoteCommit1Failed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorRemoteCommit1Failed) apply(*SectorInfo) {}

type SectorRemoteCommit2Failed struct{ error }

func (evt SectorRemoteCommit2Failed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorRemoteCommit2Failed) apply(*SectorInfo) {}

type SectorComputeProofFailed struct{ error }

func (evt SectorComputeProofFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
Expand Down
20 changes: 19 additions & 1 deletion storage/pipeline/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sealing
import (
"bytes"
"context"
"net/url"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -177,7 +178,24 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta
}
info.RemoteDataSealed = meta.DataSealed // todo make head requests to check?
info.RemoteDataCache = meta.DataCache
info.RemoteCommit1Endpoint = meta.RemoteCommit1Endpoint

if meta.RemoteCommit1Endpoint != "" {
// validate the url
if _, err := url.Parse(meta.RemoteCommit1Endpoint); err != nil {
return SectorInfo{}, xerrors.Errorf("parsing remote c1 endpoint url: %w", err)
}

info.RemoteCommit1Endpoint = meta.RemoteCommit1Endpoint
}

if meta.RemoteCommit2Endpoint != "" {
// validate the url
if _, err := url.Parse(meta.RemoteCommit2Endpoint); err != nil {
return SectorInfo{}, xerrors.Errorf("parsing remote c2 endpoint url: %w", err)
}

info.RemoteCommit2Endpoint = meta.RemoteCommit2Endpoint
}

// If we get a sector after PC2, and remote C1 endpoint is set, assume that we're getting finalized sector data
if info.RemoteCommit1Endpoint != "" {
Expand Down
4 changes: 2 additions & 2 deletions storage/pipeline/sector_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var ExistSectorStateList = map[SectorState]struct{}{
SealPreCommit2Failed: {},
PreCommitFailed: {},
ComputeProofFailed: {},
RemoteCommit1Failed: {},
RemoteCommitFailed: {},
CommitFailed: {},
PackingFailed: {},
FinalizeFailed: {},
Expand Down Expand Up @@ -125,7 +125,7 @@ const (
SealPreCommit2Failed SectorState = "SealPreCommit2Failed"
PreCommitFailed SectorState = "PreCommitFailed"
ComputeProofFailed SectorState = "ComputeProofFailed"
RemoteCommit1Failed SectorState = "RemoteCommit1Failed"
RemoteCommitFailed SectorState = "RemoteCommitFailed"
CommitFailed SectorState = "CommitFailed"
PackingFailed SectorState = "PackingFailed" // TODO: deprecated, remove
FinalizeFailed SectorState = "FinalizeFailed"
Expand Down
4 changes: 2 additions & 2 deletions storage/pipeline/states_failed.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect
return ctx.Send(SectorRetryComputeProof{})
}

func (m *Sealing) handleRemoteCommit1Failed(ctx statemachine.Context, sector SectorInfo) error {
func (m *Sealing) handleRemoteCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
if err := failedCooldown(ctx, sector); err != nil {
return err
}

if sector.InvalidProofs > 1 {
log.Errorw("consecutive remote commit1 fails", "sector", sector.SectorNumber, "c1url", sector.RemoteCommit1Endpoint)
log.Errorw("consecutive remote commit fails", "sector", sector.SectorNumber, "c1url", sector.RemoteCommit1Endpoint, "c2url", sector.RemoteCommit2Endpoint)
}

return ctx.Send(SectorRetryComputeProof{})
Expand Down
56 changes: 50 additions & 6 deletions storage/pipeline/states_sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)

var c2in storiface.Commit1Out
if sector.RemoteCommit1Endpoint == "" {
// Local Commit1
cids := storiface.SectorCids{
Unsealed: *sector.CommD,
Sealed: *sector.CommR,
Expand All @@ -576,6 +577,8 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(1): %w", err)})
}
} else {
// Remote Commit1

reqData := api.RemoteCommit1Params{
Ticket: sector.TicketValue,
Seed: sector.SeedValue,
Expand Down Expand Up @@ -611,9 +614,50 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
}
}

proof, err := m.sealer.SealCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), c2in)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
var porepProof storiface.Proof

if sector.RemoteCommit2Endpoint == "" {
// Local Commit2

porepProof, err = m.sealer.SealCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), c2in)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
}
} else {
// Remote Commit2

reqData := api.RemoteCommit2Params{
ProofType: sector.SectorType,
Sector: m.minerSectorID(sector.SectorNumber),

Commit1Out: c2in,
}
reqBody, err := json.Marshal(&reqData)
if err != nil {
return xerrors.Errorf("marshaling remote commit2 request: %w", err)
}

req, err := http.NewRequest("POST", sector.RemoteCommit2Endpoint, bytes.NewReader(reqBody))
if err != nil {
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("creating new remote commit2 request: %w", err)})
}
req.Header.Set("Content-Type", "application/json")
req = req.WithContext(ctx.Context())
resp, err := http.DefaultClient.Do(req)
if err != nil {
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("requesting remote commit2: %w", err)})
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("remote commit2 received non-200 http response %s", resp.Status)})
}

porepProof, err = io.ReadAll(resp.Body) // todo some len constraint
if err != nil {
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("reading commit2 response: %w", err)})
}
}

{
Expand All @@ -623,19 +667,19 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return nil
}

if err := m.checkCommit(ctx.Context(), sector, proof, ts.Key()); err != nil {
if err := m.checkCommit(ctx.Context(), sector, porepProof, ts.Key()); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}
}

if cfg.FinalizeEarly {
return ctx.Send(SectorProofReady{
Proof: proof,
Proof: porepProof,
})
}

return ctx.Send(SectorCommitted{
Proof: proof,
Proof: porepProof,
})
}

Expand Down
1 change: 1 addition & 0 deletions storage/pipeline/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type SectorInfo struct {
RemoteDataSealed *storiface.SectorData
RemoteDataCache *storiface.SectorData
RemoteCommit1Endpoint string
RemoteCommit2Endpoint string
RemoteDataFinalized bool

// Debug
Expand Down

0 comments on commit bcaa395

Please sign in to comment.