Skip to content

Commit

Permalink
object: Stop supporting __NEOFS__NETMAP* X-headers
Browse files Browse the repository at this point in the history
They are not supported since
nspcc-dev/neofs-api#279. Now storage nodes
process the current epoch only.

Refs #1194.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 28, 2024
1 parent 3b0bc18 commit 448ac71
Show file tree
Hide file tree
Showing 11 changed files with 32 additions and 419 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ Changelog for NeoFS Node
### Changed

### Removed
- Supporting of `__NEOFS__NETMAP*` X-headers (#2751)

### Updated

### Updating from v0.40.1
Stop attaching `__NEOFS__NETMAP*` X-headers to NeoFS API requests. If your app
is somehow tied to them, do not update and create an issue please.

## [0.40.1] - 2024-02-22

Expand Down
35 changes: 10 additions & 25 deletions pkg/services/object/get/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,26 @@ func (exec *execCtx) executeOnContainer() {
return
}

lookupDepth := exec.netmapLookupDepth()

exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth),
)
exec.log.Debug("trying to execute in container...")

// initialize epoch number
ok := exec.initEpoch()
if !ok {
epoch, err := exec.svc.currentEpochReceiver.currentEpoch()
if err != nil {
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get current epoch number", zap.Error(err))

Check warning on line 23 in pkg/services/object/get/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/get/container.go#L21-L23

Added lines #L21 - L23 were not covered by tests
return
}

for {
if exec.processCurrentEpoch() {
break
}

// check the maximum depth has been reached
if lookupDepth == 0 {
break
}

lookupDepth--

// go to the previous epoch
exec.curProcEpoch--
}
exec.processEpoch(epoch)
}

func (exec *execCtx) processCurrentEpoch() bool {
func (exec *execCtx) processEpoch(epoch uint64) bool {
exec.log.Debug("process epoch",
zap.Uint64("number", exec.curProcEpoch),
zap.Uint64("number", epoch),
)

traverser, ok := exec.generateTraverser(exec.address())
traverser, ok := exec.generateTraverser(exec.address(), epoch)
if !ok {
return true
}
Expand Down
38 changes: 2 additions & 36 deletions pkg/services/object/get/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ type execCtx struct {
curOff uint64

head bool

curProcEpoch uint64
}

type execOption func(*execCtx)
Expand Down Expand Up @@ -153,42 +151,10 @@ func (exec *execCtx) headOnly() bool {
return exec.head
}

func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}

func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}

func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}

e, err := exec.svc.currentEpochReceiver.currentEpoch()

switch {
default:
exec.status = statusUndefined
exec.err = err

exec.log.Debug("could not get current epoch number",
zap.String("error", err.Error()),
)

return false
case err == nil:
exec.curProcEpoch = e
return true
}
}

func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
func (exec *execCtx) generateTraverser(addr oid.Address, epoch uint64) (*placement.Traverser, bool) {
obj := addr.Object()

t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, exec.curProcEpoch)
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, epoch)

switch {
default:
Expand Down
127 changes: 0 additions & 127 deletions pkg/services/object/get/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,130 +1117,3 @@ func TestGetRemoteSmall(t *testing.T) {
})
})
}

func TestGetFromPastEpoch(t *testing.T) {
ctx := context.Background()

var cnr container.Container
cnr.SetPlacementPolicy(netmaptest.PlacementPolicy())

var idCnr cid.ID
cnr.CalculateID(&idCnr)

addr := oidtest.Address()
addr.SetContainer(idCnr)

payloadSz := uint64(10)
payload := make([]byte, payloadSz)
_, _ = rand.Read(payload)

obj := generateObject(addr, nil, payload)

ns, as := testNodeMatrix(t, []int{2, 2})

c11 := newTestClient()
c11.addResult(addr, nil, errors.New("any error"))

c12 := newTestClient()
c12.addResult(addr, nil, errors.New("any error"))

c21 := newTestClient()
c21.addResult(addr, nil, errors.New("any error"))

c22 := newTestClient()
c22.addResult(addr, obj, nil)

svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
svc.assembly = true

const curEpoch = 13

svc.traverserGenerator = &testTraverserGenerator{
c: cnr,
b: map[uint64]placement.Builder{
curEpoch: &testPlacementBuilder{
vectors: map[string][][]netmap.NodeInfo{
addr.EncodeToString(): ns[:1],
},
},
curEpoch - 1: &testPlacementBuilder{
vectors: map[string][][]netmap.NodeInfo{
addr.EncodeToString(): ns[1:],
},
},
},
}

svc.clientCache = &testClientCache{
clients: map[string]*testClient{
as[0][0]: c11,
as[0][1]: c12,
as[1][0]: c21,
as[1][1]: c22,
},
}

svc.currentEpochReceiver = testEpochReceiver(curEpoch)

w := NewSimpleObjectWriter()

commonPrm := new(util.CommonPrm)

p := Prm{}
p.SetObjectWriter(w)
p.SetCommonParameters(commonPrm)
p.WithAddress(addr)

err := svc.Get(ctx, p)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))

commonPrm.SetNetmapLookupDepth(1)

err = svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, obj, w.Object())

rp := RangePrm{}
rp.SetChunkWriter(w)
commonPrm.SetNetmapLookupDepth(0)
rp.SetCommonParameters(commonPrm)
rp.WithAddress(addr)

off, ln := payloadSz/3, payloadSz/3

r := objectSDK.NewRange()
r.SetOffset(off)
r.SetLength(ln)

rp.SetRange(r)

err = svc.GetRange(ctx, rp)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))

w = NewSimpleObjectWriter()
rp.SetChunkWriter(w)
commonPrm.SetNetmapLookupDepth(1)

err = svc.GetRange(ctx, rp)
require.NoError(t, err)
require.Equal(t, payload[off:off+ln], w.Object().Payload())

hp := HeadPrm{}
hp.SetHeaderWriter(w)
commonPrm.SetNetmapLookupDepth(0)
hp.SetCommonParameters(commonPrm)
hp.WithAddress(addr)

err = svc.Head(ctx, hp)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))

w = NewSimpleObjectWriter()
hp.SetHeaderWriter(w)
commonPrm.SetNetmapLookupDepth(1)

err = svc.Head(ctx, hp)
require.NoError(t, err)
require.Equal(t, obj.CutPayload(), w.Object())
}
3 changes: 0 additions & 3 deletions pkg/services/object/get/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
Expand Down Expand Up @@ -134,7 +133,6 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
Expand Down Expand Up @@ -181,7 +179,6 @@ func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Objec
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
Expand Down
7 changes: 0 additions & 7 deletions pkg/services/object/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,6 @@ type readPrmCommon struct {
commonPrm
}

// SetNetmapEpoch sets the epoch number to be used to locate the object.
//
// By default current epoch on the server will be used.
func (x *readPrmCommon) SetNetmapEpoch(_ uint64) {
// FIXME: (neofs-node#1194) not supported by client
}

// GetObjectPrm groups parameters of GetObject operation.
type GetObjectPrm struct {
readPrmCommon
Expand Down
35 changes: 10 additions & 25 deletions pkg/services/object/search/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,29 @@ func (exec *execCtx) executeOnContainer() {
return
}

lookupDepth := exec.netmapLookupDepth()

exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth),
)
exec.log.Debug("trying to execute in container...")

// initialize epoch number
ok := exec.initEpoch()
if !ok {
epoch, err := exec.svc.currentEpochReceiver.currentEpoch()
if err != nil {
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get current epoch number", zap.Error(err))

Check warning on line 25 in pkg/services/object/search/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/search/container.go#L23-L25

Added lines #L23 - L25 were not covered by tests
return
}

for {
if exec.processCurrentEpoch() {
break
}

// check the maximum depth has been reached
if lookupDepth == 0 {
break
}

lookupDepth--

// go to the previous epoch
exec.curProcEpoch--
}
exec.processEpoch(epoch)

exec.status = statusOK
exec.err = nil
}

func (exec *execCtx) processCurrentEpoch() bool {
func (exec *execCtx) processEpoch(epoch uint64) bool {
exec.log.Debug("process epoch",
zap.Uint64("number", exec.curProcEpoch),
zap.Uint64("number", epoch),
)

traverser, ok := exec.generateTraverser(exec.containerID())
traverser, ok := exec.generateTraverser(exec.containerID(), epoch)
if !ok {
return true
}
Expand Down
38 changes: 2 additions & 36 deletions pkg/services/object/search/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ type execCtx struct {
statusError

log *zap.Logger

curProcEpoch uint64
}

const (
Expand Down Expand Up @@ -66,40 +64,8 @@ func (exec *execCtx) searchFilters() object.SearchFilters {
return exec.prm.filters
}

func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}

func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}

func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}

e, err := exec.svc.currentEpochReceiver.currentEpoch()

switch {
default:
exec.status = statusUndefined
exec.err = err

exec.log.Debug("could not get current epoch number",
zap.String("error", err.Error()),
)

return false
case err == nil:
exec.curProcEpoch = e
return true
}
}

func (exec *execCtx) generateTraverser(cnr cid.ID) (*placement.Traverser, bool) {
t, err := exec.svc.traverserGenerator.generateTraverser(cnr, exec.curProcEpoch)
func (exec *execCtx) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, bool) {
t, err := exec.svc.traverserGenerator.generateTraverser(cnr, epoch)

switch {
default:
Expand Down
Loading

0 comments on commit 448ac71

Please sign in to comment.