From 2b63a9d0bd6ca3a9545bea472e7fc3ea7fec26b1 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Mon, 11 Jul 2022 15:00:12 +0530 Subject: [PATCH 1/3] add destroyshard to wrapper --- markets/dagstore/wrapper.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index 3d92886598e..8dc10744aea 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -269,6 +269,22 @@ func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath s return nil } +func (w *Wrapper) DestroyShard(ctx context.Context, pieceCid cid.Cid, resch chan dagstore.ShardResult) error { + key := shard.KeyFromCID(pieceCid) + + opts := dagstore.DestroyOpts{} + + err := w.dagst.DestroyShard(ctx, key, resch, opts) + + if err != nil { + return xerrors.Errorf("failed to schedule destroy shard for piece CID %s: %w", pieceCid, err) + } + log.Debugf("successfully submitted destroy Shard request for piece CID %s", pieceCid) + + return nil + +} + func (w *Wrapper) MigrateDeals(ctx context.Context, deals []storagemarket.MinerDeal) (bool, error) { log := log.Named("migrator") From 89b311922055ceb17701a42da72960ec0c3d7f30 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Mon, 11 Jul 2022 17:11:35 +0530 Subject: [PATCH 2/3] mock test for DestroyShard --- markets/dagstore/wrapper_test.go | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index a4a6215e105..ff77bed9c37 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -23,7 +23,7 @@ import ( // TestWrapperAcquireRecovery verifies that if acquire shard returns a "not found" // error, the wrapper will attempt to register the shard then reacquire -func TestWrapperAcquireRecovery(t *testing.T) { +func TestWrapperAcquireRecoveryDestroy(t *testing.T) { ctx := context.Background() pieceCid, err := cid.Parse("bafkqaaa") require.NoError(t, err) @@ -48,6 +48,7 @@ func TestWrapperAcquireRecovery(t *testing.T) { Accessor: getShardAccessor(t), }, register: make(chan shard.Key, 1), + destroy: make(chan shard.Key, 1), } w.dagst = mock @@ -73,6 +74,27 @@ func TestWrapperAcquireRecovery(t *testing.T) { count++ } require.Greater(t, count, 0) + + // Destroy the shard + dr := make(chan dagstore.ShardResult, 1) + err = w.DestroyShard(ctx, pieceCid, dr) + + dctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + select { + case <-dctx.Done(): + require.Fail(t, "failed to call register") + case k := <-mock.destroy: + require.Equal(t, k.String(), pieceCid.String()) + } + + var dcount int + dch, err := mybs.AllKeysChan(ctx) + require.NoError(t, err) + for range dch { + count++ + } + require.Equal(t, dcount, 0) } // TestWrapperBackground verifies the behaviour of the background go routine @@ -130,11 +152,14 @@ type mockDagStore struct { gc chan struct{} recover chan shard.Key + destroy chan shard.Key close chan struct{} } func (m *mockDagStore) DestroyShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.DestroyOpts) error { - panic("implement me") + m.destroy <- key + out <- dagstore.ShardResult{Key: key} + return nil } func (m *mockDagStore) GetShardInfo(k shard.Key) (dagstore.ShardInfo, error) { From 0e2118e12e1a08d41cf971d8500eb6f5b00c34ee Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Mon, 11 Jul 2022 17:24:53 +0530 Subject: [PATCH 3/3] fix lint errors --- markets/dagstore/wrapper_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index ff77bed9c37..f201ea25296 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -78,12 +78,13 @@ func TestWrapperAcquireRecoveryDestroy(t *testing.T) { // Destroy the shard dr := make(chan dagstore.ShardResult, 1) err = w.DestroyShard(ctx, pieceCid, dr) + require.NoError(t, err) dctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() select { case <-dctx.Done(): - require.Fail(t, "failed to call register") + require.Fail(t, "failed to call destroy") case k := <-mock.destroy: require.Equal(t, k.String(), pieceCid.String()) }