diff --git a/pkg/services/object_manager/transformer/transformer.go b/pkg/services/object_manager/transformer/transformer.go index d922b333dd5..0645903073b 100644 --- a/pkg/services/object_manager/transformer/transformer.go +++ b/pkg/services/object_manager/transformer/transformer.go @@ -27,7 +27,8 @@ type payloadSizeLimiter struct { objSlicer *slicerSDK.Slicer targetInit TargetInitializer - _changedParentID *oid.ID + _changedParentID *oid.ID + _objectStreamInitializer *objStreamInitializer } // objStreamInitializer implements [slicerSDK.ObjectWriter]. @@ -37,6 +38,8 @@ type objStreamInitializer struct { _psl *payloadSizeLimiter _signer neofscrypto.Signer _objType object.Type + _objBuf *object.Object + _splitID *object.SplitID } var ( @@ -45,7 +48,6 @@ var ( ) func (o *objStreamInitializer) InitDataStream(header object.Object) (io.Writer, error) { - stream := o.targetInit() linkObj := len(header.Children()) > 0 // v1.0.0-rc.8 has a bug that does not allow any non-regular objects @@ -113,12 +115,83 @@ func (o *objStreamInitializer) InitDataStream(header object.Object) (io.Writer, } } - err := stream.WriteHeader(&header) - if err != nil { - return nil, err - } + // v1.0.0-rc.8 has a bug that breaks split field for the first child object, + // thus that check, see https://github.com/nspcc-dev/neofs-sdk-go/issues/448. + if o._objBuf == nil { + if o._splitID == nil { + // the first object, it is impossible to say + // if there will be any others so cache it now + // and generate split id for a potential object + // chain + o._objBuf = &header + o._splitID = object.NewSplitID() + + return &_memoryObjStream{objInit: o}, nil + } + + // not the first object, attach the missing split ID + // and heal its header the second time; it is non-optimal + // but the code here is already hard to read, and it + // is full of kludges so let it be as stupid as possible + + header.SetSplitID(o._splitID) + err := _healHeader(o._signer, &header) + if err != nil { + return nil, fmt.Errorf("broken intermediate object: %w", err) + } + + stream := o.targetInit() + err = stream.WriteHeader(&header) + if err != nil { + return nil, fmt.Errorf("broken intermediate object: streaming header: %w", err) + } + + return &objStream{target: stream, _linkObj: linkObj}, nil + } else { + // more objects are here it _is_ an object chain, + // stream the cached one and continue chain handling + + // cached object streaming (`o._objBuf`) + + pl := o._objBuf.Payload() + hdr := o._objBuf.CutPayload() + hdr.SetSplitID(o._splitID) + + err := _healHeader(o._signer, hdr) + if err != nil { + return nil, fmt.Errorf("broken first child: %w", err) + } + + stream := o.targetInit() + + err = stream.WriteHeader(hdr) + if err != nil { + return nil, fmt.Errorf("broken first child: cached header streaming: %w", err) + } + + _, err = stream.Write(pl) + if err != nil { + return nil, fmt.Errorf("broken first child: cached payload streaming: %w", err) + } + + _, err = stream.Close() + if err != nil { + return nil, fmt.Errorf("broken first child: stream for cached object closing: %w", err) + } - return &objStream{target: stream, _linkObj: linkObj}, nil + // mark the cached object as handled + o._objBuf = nil + + // new object streaming (`header`) + + stream = o.targetInit() + err = stream.WriteHeader(&header) + if err != nil { + return nil, err + } + + return &objStream{target: stream, _linkObj: linkObj}, nil + } } // _healHeader recalculates all signature related fields that are @@ -162,6 +235,19 @@ func (o *objStream) Close() error { return err } +type _memoryObjStream struct { + objInit *objStreamInitializer +} + +func (m *_memoryObjStream) Write(p []byte) (n int, err error) { + m.objInit._objBuf.SetPayload(append(m.objInit._objBuf.Payload(), p...)) + return len(p), nil +} + +func (m *_memoryObjStream) Close() error { + return nil +} + // NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length // of the writing object and writes generated objects to targets from initializer. // @@ -218,6 +304,8 @@ func (s *payloadSizeLimiter) WriteHeader(hdr *object.Object) error { return fmt.Errorf("initializing payload stream: %w", err) } + s._objectStreamInitializer = streamInitializer + return nil } @@ -231,6 +319,36 @@ func (s *payloadSizeLimiter) Close() (*AccessIdentifiers, error) { return nil, err } + if singleObj := s._objectStreamInitializer._objBuf; singleObj != nil { + // we cached a single object (payload length has not exceeded + // the limit) so stream it now without any changes + + stream := s.targetInit() + pl := singleObj.Payload() + hdr := singleObj.CutPayload() + id, _ := hdr.ID() + + err = stream.WriteHeader(hdr) + if err != nil { + return nil, fmt.Errorf("single object: cached header streaming: %w", err) + } + + _, err = stream.Write(pl) + if err != nil { + return nil, fmt.Errorf("single object: cached payload streaming: %w", err) + } + + _, err = stream.Close() + if err != nil { + return nil, fmt.Errorf("single object: stream for cached object closing: %w", err) + } + + ids := new(AccessIdentifiers) + ids.WithSelfID(id) + + return ids, nil + } + id := s.stream.ID() ids := new(AccessIdentifiers)