Skip to content

Commit

Permalink
WIP initial test exists
Browse files Browse the repository at this point in the history
sector storage test
  • Loading branch information
ZenGround0 committed Nov 10, 2021
1 parent 40d5c6f commit 1f79db0
Show file tree
Hide file tree
Showing 22 changed files with 322 additions and 14 deletions.
1 change: 1 addition & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type StorageMiner interface {
ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error //perm:admin retry:true
ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error //perm:admin retry:true
ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
Expand Down
1 change: 1 addition & 0 deletions api/api_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Worker interface {
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) //perm:admin
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) //perm:admin
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) //perm:admin
MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
UnsealPiece(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
Expand Down
26 changes: 26 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
25 changes: 25 additions & 0 deletions documentation/en/api-v0-methods-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
* [ReturnMoveStorage](#ReturnMoveStorage)
* [ReturnReadPiece](#ReturnReadPiece)
* [ReturnReleaseUnsealed](#ReturnReleaseUnsealed)
* [ReturnReplicaUpdate](#ReturnReplicaUpdate)
* [ReturnSealCommit1](#ReturnSealCommit1)
* [ReturnSealCommit2](#ReturnSealCommit2)
* [ReturnSealPreCommit1](#ReturnSealPreCommit1)
Expand Down Expand Up @@ -1513,6 +1514,30 @@ Response: `{}`
### ReturnReleaseUnsealed


Perms: admin

Inputs:
```json
[
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
},
{
"Code": 0,
"Message": "string value"
}
]
```

Response: `{}`

### ReturnReplicaUpdate


Perms: admin

Inputs:
Expand Down
35 changes: 35 additions & 0 deletions documentation/en/api-v0-methods-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* [ProcessSession](#ProcessSession)
* [Release](#Release)
* [ReleaseUnsealed](#ReleaseUnsealed)
* [Replica](#Replica)
* [ReplicaUpdate](#ReplicaUpdate)
* [Seal](#Seal)
* [SealCommit1](#SealCommit1)
* [SealCommit2](#SealCommit2)
Expand Down Expand Up @@ -268,6 +270,39 @@ Response: `"07070707-0707-0707-0707-070707070707"`
### ReleaseUnsealed


Perms: admin

Inputs:
```json
[
{
"ID": {
"Miner": 1000,
"Number": 9
},
"ProofType": 8
},
null
]
```

Response:
```json
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
}
```

## Replica


### ReplicaUpdate


Perms: admin

Inputs:
Expand Down
6 changes: 6 additions & 0 deletions extern/sector-storage/ffiwrapper/sealer_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,12 @@ func (sb *Sealer) SealCommit2(ctx context.Context, sector storage.SectorRef, pha
return ffi.SealCommitPhase2(phase1Out, sector.ID.Number, sector.ID.Miner)
}

func (sb *Sealer) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storage.ReplicaUpdateProof, error) {
panic("implemente me")
// XXX once we have an ffi call put it in here
//return nil, nil
}

func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
Expand Down
60 changes: 60 additions & 0 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sectorstorage
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"sync"
Expand Down Expand Up @@ -601,6 +602,61 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
return err
}

func (m *Manager) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (out storage.ReplicaUpdateProof, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
fmt.Printf("Manager, getting worker\n")

wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTReplicaUpdate, sector, pieces)
if err != nil {
return nil, xerrors.Errorf("getWork: %w", err)
}
defer cancel()

var waitErr error
waitRes := func() {
p, werr := m.waitWork(ctx, wk)
if werr != nil {
waitErr = werr
return
}
if p != nil {
out = p.([]byte)
}
}

if wait { // already in progress
waitRes()
return out, waitErr
}

if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed, storiface.FTUpdate); err != nil {
return nil, xerrors.Errorf("acquiring sector lock: %w", err)
}

selector := newAllocSelector(m.index, storiface.FTUpdate, storiface.PathSealing)

// XXX: do I need to include FTUpdate in schedFetch when this is being allocated. Does Schedule and the selector for this handle allocating already?
// XXX: is AcquireMove the right thing to be doing here?
fmt.Printf("Manager, scheduling\n")
err = m.sched.Schedule(ctx, sector, sealtasks.TTReplicaUpdate, selector, m.schedFetch(sector, storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {

fmt.Printf("Manager starting work\n")
err := m.startWork(ctx, w, wk)(w.ReplicaUpdate(ctx, sector, pieces))
if err != nil {
return err
}

waitRes()
return nil
})
if err != nil {
return nil, err
}

return out, waitErr
}

func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return m.returnResult(ctx, callID, pi, err)
}
Expand Down Expand Up @@ -629,6 +685,10 @@ func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.Ca
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) ReturnReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}
Expand Down
81 changes: 81 additions & 0 deletions extern/sector-storage/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -162,6 +163,86 @@ func TestSimple(t *testing.T) {
require.NoError(t, err)
}

type Reader struct{}

func (Reader) Read(out []byte) (int, error) {
for i := range out {
out[i] = 0
}
return len(out), nil
}

type NullReader struct {
*io.LimitedReader
}

func NewNullReader(size abi.UnpaddedPieceSize) io.Reader {
return &NullReader{(io.LimitReader(&Reader{}, int64(size))).(*io.LimitedReader)}
}

func (m NullReader) NullBytes() int64 {
return m.N
}

func TestSnapDeals(t *testing.T) {
logging.SetAllLoggers(logging.LevelWarn)

ctx := context.Background()
m, lstor, stor, idx, cleanup := newTestMgr(ctx, t, datastore.NewMapDatastore())
defer cleanup()

localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1, sealtasks.TTCommit2, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTReplicaUpdate,
}
wds := datastore.NewMapDatastore()

w := NewLocalWorker(WorkerConfig{TaskTypes: localTasks}, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)

sid := storage.SectorRef{
ID: abi.SectorID{Miner: 1000, Number: 1},
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}

// Pack sector with no pieces
p0, err := m.AddPiece(ctx, sid, nil, 2*1016, NewNullReader(2*1016))
require.NoError(t, err)
ccPieces := []abi.PieceInfo{p0}

// Precommit and Seal a CC sector
fmt.Printf("PC1\n")
ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9}
pc1Out, err := m.SealPreCommit1(ctx, sid, ticket, ccPieces)
require.NoError(t, err)
fmt.Printf("PC2\n")
pc2Out, err := m.SealPreCommit2(ctx, sid, pc1Out)
require.NoError(t, err)
seed := abi.InteractiveSealRandomness{1, 1, 1, 1, 1, 1, 1}
fmt.Printf("C1\n")
c1Out, err := m.SealCommit1(ctx, sid, ticket, seed, nil, pc2Out)
require.NoError(t, err)
fmt.Printf("C2\n")
_, err = m.SealCommit2(ctx, sid, c1Out)
require.NoError(t, err)

// Now do a snap deals replica update

p1, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), p1.Size)

p2, err := m.AddPiece(ctx, sid, nil, 1016, bytes.NewReader(make([]byte, 1016)[:]))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), p2.Size)

pieces := []abi.PieceInfo{p1, p2}

out, err := m.ReplicaUpdate(ctx, sid, pieces)
_ = out
require.NoError(t, err)
}

func TestRedoPC1(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)

Expand Down
12 changes: 12 additions & 0 deletions extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ func (mgr *SectorMgr) SealCommit2(ctx context.Context, sid storage.SectorRef, ph
return out[:], nil
}

func (mgr *SectorMgr) ReplicaUpdate(ctx context.Context, sid storage.SectorRef, pieces []abi.PieceInfo) (storage.ReplicaUpdateProof, error) {
out := make([]byte, 1)
out[0] = 0xff
return out, nil
}

// Test Instrumentation Methods

func (mgr *SectorMgr) MarkFailed(sid storage.SectorRef, failed bool) error {
Expand Down Expand Up @@ -456,6 +462,8 @@ func (mgr *SectorMgr) CheckProvable(ctx context.Context, pp abi.RegisteredPoStPr
return bad, nil
}

var _ storiface.WorkerReturn = &SectorMgr{}

func (mgr *SectorMgr) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
panic("not supported")
}
Expand Down Expand Up @@ -500,6 +508,10 @@ func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID,
panic("not supported")
}

func (mgr *SectorMgr) ReturnReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported")
}

func (m mockVerifProver) VerifySeal(svi proof5.SealVerifyInfo) (bool, error) {
plen, err := svi.SealProof.ProofSize()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions extern/sector-storage/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (s *schedTestWorker) AddPiece(ctx context.Context, sector storage.SectorRef
panic("implement me")
}

func (s *schedTestWorker) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, peices []abi.PieceInfo) (storiface.CallID, error) {
panic("implement me")
}

func (s *schedTestWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
panic("implement me")
}
Expand Down
Loading

0 comments on commit 1f79db0

Please sign in to comment.