Skip to content

Commit

Permalink
feat: EmbedMessage WrapV1 option & Reader#ReadEmbeddedMessage
Browse files Browse the repository at this point in the history
EmbedMessage sets the second-to-leftmost bit to indicate that there is a
length-prefixed dag-cbor message object directly after the header. If that bit
is set, Reader#ReadEmbeddedMessage will decode and return that message.

Ref: ipld/js-car#89
  • Loading branch information
rvagg committed Jul 13, 2022
1 parent 771fb74 commit 2fb245d
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 54 deletions.
8 changes: 8 additions & 0 deletions v2/block_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ func TestMaxHeaderLength(t *testing.T) {
require.EqualError(t, err, "invalid header data, length of read beyond allowable maximum")
}

func TestCanReadMessagingCar(t *testing.T) {
car, err := carv2.NewBlockReader(requireReaderFromPath(t, "testdata/messaging.car"))
require.NoError(t, err)
readBlock, err := car.Next()
require.NoError(t, err)
require.Equal(t, "random meaningless bytes", string(readBlock.RawData()))
}

func requireReaderFromPath(t *testing.T, path string) io.Reader {
f, err := os.Open(path)
require.NoError(t, err)
Expand Down
22 changes: 21 additions & 1 deletion v2/car.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ type (
}
)

// fullyIndexedCharPos is the position of Characteristics.Hi bit that specifies whether the index is a catalog af all CIDs or not.
// fullyIndexedCharPos is the position of Characteristics.Hi bit that specifies whether the index is
// a catalog af all CIDs or not.
const fullyIndexedCharPos = 7 // left-most bit
// messageAfterHeaderCharPos is the position of Characteristics.Hi bit that specifies whether there
// is a length-prefixed dag-cbor message object directly after the header.
const messageAfterHeaderCharPos = 6 // second-to-left-most bit

// WriteTo writes this characteristics to the given w.
func (c Characteristics) WriteTo(w io.Writer) (n int64, err error) {
Expand Down Expand Up @@ -83,6 +87,22 @@ func (c *Characteristics) SetFullyIndexed(b bool) {
}
}

// IsMessageAfterHeader specifies whether there is a length-prefixed dag-cbor message embedded
// directly after the CARv2 header that can optionally be decoded.
func (c *Characteristics) IsMessageAfterHeader() bool {
return isBitSet(c.Hi, messageAfterHeaderCharPos)
}

// SetMessageAfterHeader sets whether there is a length-prefixed dag-cbor message embedded directly
// after the CARv2 header.
func (c *Characteristics) SetMessageAfterHeader(b bool) {
if b {
c.Hi = setBit(c.Hi, messageAfterHeaderCharPos)
} else {
c.Hi = unsetBit(c.Hi, messageAfterHeaderCharPos)
}
}

func setBit(n uint64, pos uint) uint64 {
n |= 1 << pos
return n
Expand Down
104 changes: 65 additions & 39 deletions v2/car_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,43 +205,69 @@ func TestNewHeaderHasExpectedValues(t *testing.T) {
assert.Equal(t, want, got, "NewHeader got = %v, want = %v", got, want)
}

func TestCharacteristics_StoreIdentityCIDs(t *testing.T) {
subject := carv2.Characteristics{}
require.False(t, subject.IsFullyIndexed())

subject.SetFullyIndexed(true)
require.True(t, subject.IsFullyIndexed())

var buf bytes.Buffer
written, err := subject.WriteTo(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), written)
require.Equal(t, []byte{
0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
}, buf.Bytes())

var decodedSubject carv2.Characteristics
read, err := decodedSubject.ReadFrom(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), read)
require.True(t, decodedSubject.IsFullyIndexed())

buf.Reset()
subject.SetFullyIndexed(false)
require.False(t, subject.IsFullyIndexed())

written, err = subject.WriteTo(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), written)
require.Equal(t, []byte{
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
}, buf.Bytes())

var decodedSubjectAgain carv2.Characteristics
read, err = decodedSubjectAgain.ReadFrom(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), read)
require.False(t, decodedSubjectAgain.IsFullyIndexed())
func TestCharacteristics(t *testing.T) {
tests := []struct {
name string
isset func(carv2.Characteristics) bool
set func(*carv2.Characteristics, bool)
expectBytes []byte
}{
{
"FullyIndexed",
func(c carv2.Characteristics) bool { return c.IsFullyIndexed() },
func(c *carv2.Characteristics, s bool) { c.SetFullyIndexed(s) },
[]byte{
0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
},
},
{
"MessageAfterHeader",
func(c carv2.Characteristics) bool { return c.IsMessageAfterHeader() },
func(c *carv2.Characteristics, s bool) { c.SetMessageAfterHeader(s) },
[]byte{
0x40, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
subject := carv2.Characteristics{}
require.False(t, tt.isset(subject))
tt.set(&subject, true)
require.True(t, tt.isset(subject))

var buf bytes.Buffer
written, err := subject.WriteTo(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), written)
require.Equal(t, tt.expectBytes, buf.Bytes())

var decodedSubject carv2.Characteristics
read, err := decodedSubject.ReadFrom(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), read)
require.True(t, tt.isset(decodedSubject))

buf.Reset()
tt.set(&subject, false)
require.False(t, tt.isset(subject))

written, err = subject.WriteTo(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), written)
require.Equal(t, []byte{
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
}, buf.Bytes())

var decodedSubjectAgain carv2.Characteristics
read, err = decodedSubjectAgain.ReadFrom(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), read)
require.False(t, tt.isset(decodedSubjectAgain))
})
}
}
12 changes: 12 additions & 0 deletions v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"math"

"github.com/ipld/go-car/v2/index"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/multiformats/go-multicodec"

Expand Down Expand Up @@ -63,6 +64,8 @@ type Options struct {

MaxAllowedHeaderSize uint64
MaxAllowedSectionSize uint64

EmbeddedMessage datamodel.Node
}

// ApplyOptions applies given opts and returns the resulting Options.
Expand Down Expand Up @@ -172,3 +175,12 @@ func MaxAllowedSectionSize(max uint64) Option {
o.MaxAllowedSectionSize = max
}
}

// EmbedMessage writes a length-prefixed dag-cbor message after the CARv2 header
// and sets the 'MessageAfterHeader' characteristic bit is set for the resulting
// CAR
func EmbedMessage(msg datamodel.Node) Option {
return func(o *Options) {
o.EmbeddedMessage = msg
}
}
23 changes: 23 additions & 0 deletions v2/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"github.com/ipld/go-car/v2/internal/carv1"
"github.com/ipld/go-car/v2/internal/carv1/util"
internalio "github.com/ipld/go-car/v2/internal/io"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-varint"
Expand Down Expand Up @@ -349,6 +352,26 @@ func (r *Reader) Inspect(validateBlockHash bool) (Stats, error) {
return stats, nil
}

// ReadEmbeddedMessage reads a length-prefixed dag-cbor message embedded after the CARv2 header
// if the 'MessageAfterHeader' characteristic bit is set for this CAR and the message exists.
func (r *Reader) ReadEmbeddedMessage() (datamodel.Node, error) {
if !r.Header.Characteristics.IsMessageAfterHeader() {
return nil, errors.New("'MessageAfterHeader' bit is not set in the characteristics for this CAR")
}

msgStart := int64(PragmaSize + HeaderSize)
gapLen := int64(r.Header.DataOffset) - msgStart
if gapLen <= 0 {
return nil, errors.New("invalid MessageAfterHeader, no space after header")
}
msgReader := io.NewSectionReader(r.r, msgStart, gapLen)
byts, err := util.LdRead(msgReader, false, r.opts.MaxAllowedSectionSize)
if err != nil {
return nil, err
}
return ipld.Decode(byts, dagcbor.Decode)
}

// Close closes the underlying reader if it was opened by OpenReader.
func (r *Reader) Close() error {
if r.closer != nil {
Expand Down
56 changes: 56 additions & 0 deletions v2/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/index"
"github.com/ipld/go-car/v2/internal/carv1"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -424,6 +426,31 @@ func TestInspect(t *testing.T) {
MinCidLength: 25,
},
},
// CARv2 with embedded message
{
name: "CarWithMessage",
path: "testdata/messaging.car",
zerLenAsEOF: true,
expectedStats: carv2.Stats{
Version: 2,
Header: carv2.Header{
Characteristics: carv2.Characteristics{Hi: 64},
DataOffset: 158,
DataSize: 120,
},
Roots: []cid.Cid{mustCidDecode("bafkreihwkf6mtnjobdqrkiksr7qhp6tiiqywux64aylunbvmfhzeql2coa")},
RootsPresent: true,
AvgBlockLength: 24,
MinBlockLength: 24,
MaxBlockLength: 24,
AvgCidLength: 36,
MinCidLength: 36,
MaxCidLength: 36,
BlockCount: 1,
CodecCounts: map[multicodec.Code]uint64{multicodec.Raw: 1},
MhTypeCounts: map[multicodec.Code]uint64{multicodec.Sha2_256: 1},
},
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -560,3 +587,32 @@ func mustCidDecode(s string) cid.Cid {
}
return c
}

func TestEmbeddedMessage(t *testing.T) {
t.Run("has", func(t *testing.T) {
r, err := carv2.OpenReader("testdata/messaging.car")
require.NoError(t, err)
msg, err := r.ReadEmbeddedMessage()
require.NoError(t, err)
enc, err := ipld.Encode(msg, dagjson.Encode)
require.NoError(t, err)
require.Equal(t,
`{"expectedRoot":{"/":"bafkreihwkf6mtnjobdqrkiksr7qhp6tiiqywux64aylunbvmfhzeql2coa"},"sneaky":"sending a message outside of CARv1 payload"}`,
string(enc),
)
})
t.Run("hasnot-v1", func(t *testing.T) {
r, err := carv2.OpenReader("testdata/sample-v1.car")
require.NoError(t, err)
_, err = r.ReadEmbeddedMessage()
require.NotNil(t, err)
require.Equal(t, err.Error(), "'MessageAfterHeader' bit is not set in the characteristics for this CAR")
})
t.Run("hasnot-v2", func(t *testing.T) {
r, err := carv2.OpenReader("testdata/sample-wrapped-v2.car")
require.NoError(t, err)
_, err = r.ReadEmbeddedMessage()
require.NotNil(t, err)
require.Equal(t, err.Error(), "'MessageAfterHeader' bit is not set in the characteristics for this CAR")
})
}
Binary file added v2/testdata/messaging.car
Binary file not shown.
63 changes: 49 additions & 14 deletions v2/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"github.com/ipld/go-car/v2/index"
"github.com/ipld/go-car/v2/internal/carv1"
internalio "github.com/ipld/go-car/v2/internal/io"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/multiformats/go-varint"
)

// ErrAlreadyV1 signals that the given payload is already in CARv1 format.
Expand Down Expand Up @@ -46,21 +49,27 @@ func WrapV1File(srcPath, dstPath string) error {
return nil
}

// WrapV1 takes a CARv1 file and wraps it as a CARv2 file with an index.
// WrapV1 takes a CARv1 file and wraps it as a CARv2 file with an index (unless
// the WithoutIndex option is supplied).
// The resulting CARv2 file's inner CARv1 payload is left unmodified,
// and does not use any padding before the innner CARv1 or index.
// and does not use any padding before the inner CARv1 or index.
// The EmbedMessage option may be used to insert an additional message after the
// CARv2 header.
func WrapV1(src io.ReadSeeker, dst io.Writer, opts ...Option) error {
// TODO: verify src is indeed a CARv1 to prevent misuse.
// GenerateIndex should probably be in charge of that.

o := ApplyOptions(opts...)
idx, err := index.New(o.IndexCodec)
if err != nil {
return err
}

if err := LoadIndex(idx, src, opts...); err != nil {
return err
var idx index.Index
var err error
if o.IndexCodec != index.CarIndexNone {
idx, err = index.New(o.IndexCodec)
if err != nil {
return err
}
if err := LoadIndex(idx, src, opts...); err != nil {
return err
}
}

// Use Seek to learn the size of the CARv1 before reading it.
Expand All @@ -74,18 +83,44 @@ func WrapV1(src io.ReadSeeker, dst io.Writer, opts ...Option) error {

// Similar to the writer API, write all components of a CARv2 to the
// destination file: Pragma, Header, CARv1, Index.
v2Header := NewHeader(uint64(v1Size))
if _, err := dst.Write(Pragma); err != nil {
return err
}
if _, err := v2Header.WriteTo(dst); err != nil {
return err
v2Header := NewHeader(uint64(v1Size))
if o.IndexCodec == index.CarIndexNone {
v2Header.IndexOffset = 0
}

if o.EmbeddedMessage != nil {
v2Header.Characteristics.SetMessageAfterHeader(true)
msgBytes, err := ipld.Encode(o.EmbeddedMessage, dagcbor.Encode)
if err != nil {
return err
}
lenBuf := make([]byte, 8)
lenLen := varint.PutUvarint(lenBuf, uint64(len(msgBytes)))
v2Header.DataOffset += uint64(lenLen + len(msgBytes))
if _, err := v2Header.WriteTo(dst); err != nil {
return err
}
if _, err = dst.Write(lenBuf[:lenLen]); err != nil {
return err
}
if _, err = dst.Write(msgBytes); err != nil {
return err
}
} else {
if _, err := v2Header.WriteTo(dst); err != nil {
return err
}
}
if _, err := io.Copy(dst, src); err != nil {
return err
}
if _, err := index.WriteTo(idx, dst); err != nil {
return err
if idx != nil {
if _, err := index.WriteTo(idx, dst); err != nil {
return err
}
}

return nil
Expand Down
Loading

0 comments on commit 2fb245d

Please sign in to comment.