Skip to content

Commit

Permalink
node: Fill split info for the first child correctly
Browse files Browse the repository at this point in the history
SDK RC8 does not attach a split ID to the first child in a split chain.

Signed-off-by: Pavel Karpy <carpawell@morphbits.io>
  • Loading branch information
notimetoname committed Jun 12, 2023
1 parent 82b9409 commit 9b23ffe
Showing 1 changed file with 121 additions and 3 deletions.
124 changes: 121 additions & 3 deletions pkg/services/object_manager/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ type payloadSizeLimiter struct {
objSlicer *slicerSDK.Slicer
targetInit TargetInitializer

_changedParentID *oid.ID
_changedParentID *oid.ID
_objectStreamInitializer *objStreamInitializer
}

// objStreamInitializer implements [slicerSDK.ObjectWriter].
Expand All @@ -37,6 +38,8 @@ type objStreamInitializer struct {
_psl *payloadSizeLimiter
_signer neofscrypto.Signer
_objType object.Type
_objBuf *object.Object
_splitID *object.SplitID
}

var (
Expand All @@ -45,7 +48,6 @@ var (
)

func (o *objStreamInitializer) InitDataStream(header object.Object) (io.Writer, error) {
stream := o.targetInit()
linkObj := len(header.Children()) > 0

// v1.0.0-rc.8 has a bug that does not allow any non-regular objects
Expand Down Expand Up @@ -113,7 +115,78 @@ func (o *objStreamInitializer) InitDataStream(header object.Object) (io.Writer,
}
}

err := stream.WriteHeader(&header)
// v1.0.0-rc.8 has a bug that breaks split field for the first child object,
// thus that check, see https://github.com/nspcc-dev/neofs-sdk-go/issues/448.
if o._objBuf == nil {
if o._splitID == nil {
// the first object, it is impossible to say
// if there will be any others so cache it now
// and generate split id for a potential object
// chain
o._objBuf = &header
o._splitID = object.NewSplitID()

return &_memoryObjStream{objInit: o}, nil
}

// not the first object, attach the missing split ID
// and heal its header the second time; it is non-optimal
// but the code here is already hard to read, and it
// is full of kludges so let it be as stupid as possible

header.SetSplitID(o._splitID)
err := _healHeader(o._signer, &header)
if err != nil {
return nil, fmt.Errorf("broken intermediate object: %w", err)
}

stream := o.targetInit()
err = stream.WriteHeader(&header)
if err != nil {
return nil, fmt.Errorf("broken intermediate object: streaming header: %w", err)
}

return &objStream{target: stream, _linkObj: linkObj}, nil
}

// more objects are here it _is_ an object chain,
// stream the cached one and continue chain handling

// cached object streaming (`o._objBuf`)

pl := o._objBuf.Payload()
hdr := o._objBuf.CutPayload()
hdr.SetSplitID(o._splitID)

err := _healHeader(o._signer, hdr)
if err != nil {
return nil, fmt.Errorf("broken first child: %w", err)
}

stream := o.targetInit()

err = stream.WriteHeader(hdr)
if err != nil {
return nil, fmt.Errorf("broken first child: cached header streaming: %w", err)
}

_, err = stream.Write(pl)
if err != nil {
return nil, fmt.Errorf("broken first child: cached payload streaming: %w", err)
}

_, err = stream.Close()
if err != nil {
return nil, fmt.Errorf("broken first child: stream for cached object closing: %w", err)
}

// mark the cached object as handled
o._objBuf = nil

// new object streaming (`header`)

stream = o.targetInit()
err = stream.WriteHeader(&header)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -162,6 +235,19 @@ func (o *objStream) Close() error {
return err
}

type _memoryObjStream struct {
objInit *objStreamInitializer
}

func (m *_memoryObjStream) Write(p []byte) (n int, err error) {
m.objInit._objBuf.SetPayload(append(m.objInit._objBuf.Payload(), p...))
return len(p), nil
}

func (m *_memoryObjStream) Close() error {
return nil
}

// NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length
// of the writing object and writes generated objects to targets from initializer.
//
Expand Down Expand Up @@ -218,6 +304,8 @@ func (s *payloadSizeLimiter) WriteHeader(hdr *object.Object) error {
return fmt.Errorf("initializing payload stream: %w", err)
}

s._objectStreamInitializer = streamInitializer

return nil
}

Expand All @@ -231,6 +319,36 @@ func (s *payloadSizeLimiter) Close() (*AccessIdentifiers, error) {
return nil, err
}

if singleObj := s._objectStreamInitializer._objBuf; singleObj != nil {
// we cached a single object (payload length has not exceeded
// the limit) so stream it now without any changes

stream := s.targetInit()
pl := singleObj.Payload()
hdr := singleObj.CutPayload()
id, _ := hdr.ID()

err = stream.WriteHeader(hdr)
if err != nil {
return nil, fmt.Errorf("single object: cached header streaming: %w", err)
}

_, err = stream.Write(pl)
if err != nil {
return nil, fmt.Errorf("single object: cached payload streaming: %w", err)
}

_, err = stream.Close()
if err != nil {
return nil, fmt.Errorf("single object: stream for cached object closing: %w", err)
}

ids := new(AccessIdentifiers)
ids.WithSelfID(id)

return ids, nil
}

id := s.stream.ID()

ids := new(AccessIdentifiers)
Expand Down

0 comments on commit 9b23ffe

Please sign in to comment.