Skip to content

Commit

Permalink
feat(shwap): Add eds store (#3545)
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss authored Jul 18, 2024
1 parent 198f24e commit 2e80fc6
Show file tree
Hide file tree
Showing 19 changed files with 1,013 additions and 15 deletions.
2 changes: 2 additions & 0 deletions share/new_eds/accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
type Accessor interface {
// Size returns square size of the Accessor.
Size(ctx context.Context) int
// DataRoot returns data hash of the Accessor.
DataRoot(ctx context.Context) (share.DataHash, error)
// Sample returns share and corresponding proof for row and column indices. Implementation can
// choose which axis to use for proof. Chosen axis for proof should be indicated in the returned
// Sample.
Expand Down
8 changes: 8 additions & 0 deletions share/new_eds/close_once.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ func (c *closeOnce) Size(ctx context.Context) int {
return c.f.Size(ctx)
}

// DataRoot returns root hash of Accessor's underlying EDS.
func (c *closeOnce) DataRoot(ctx context.Context) (share.DataHash, error) {
if c.closed.Load() {
return nil, errAccessorClosed
}
return c.f.DataRoot(ctx)
}

func (c *closeOnce) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
if c.closed.Load() {
return shwap.Sample{}, errAccessorClosed
Expand Down
4 changes: 4 additions & 0 deletions share/new_eds/close_once_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (s *stubEdsAccessorCloser) Size(context.Context) int {
return 0
}

func (s *stubEdsAccessorCloser) DataRoot(context.Context) (share.DataHash, error) {
return nil, nil
}

func (s *stubEdsAccessorCloser) Sample(context.Context, int, int) (shwap.Sample, error) {
return shwap.Sample{}, nil
}
Expand Down
4 changes: 4 additions & 0 deletions share/new_eds/proofs_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (c *proofsCache) Size(ctx context.Context) int {
return int(size)
}

func (c *proofsCache) DataRoot(ctx context.Context) (share.DataHash, error) {
return c.inner.DataRoot(ctx)
}

func (c *proofsCache) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
axisType, axisIdx, shrIdx := rsmt2d.Row, rowIdx, colIdx
ax, err := c.axisWithProofs(ctx, axisType, axisIdx)
Expand Down
9 changes: 9 additions & 0 deletions share/new_eds/rsmt2d.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ func (eds *Rsmt2D) Size(context.Context) int {
return int(eds.Width())
}

// DataRoot returns data hash of the Accessor.
func (eds *Rsmt2D) DataRoot(context.Context) (share.DataHash, error) {
dah, err := share.NewRoot(eds.ExtendedDataSquare)
if err != nil {
return share.DataHash{}, fmt.Errorf("while creating data root: %w", err)
}
return dah.Hash(), nil
}

// Sample returns share and corresponding proof for row and column indices.
func (eds *Rsmt2D) Sample(
_ context.Context,
Expand Down
5 changes: 2 additions & 3 deletions store/cache/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,9 @@ func (bc *AccessorCache) Remove(height uint64) error {
}

// EnableMetrics enables metrics for the cache.
func (bc *AccessorCache) EnableMetrics() error {
var err error
func (bc *AccessorCache) EnableMetrics() (unreg func() error, err error) {
bc.metrics, err = newMetrics(bc)
return err
return bc.metrics.reg.Unregister, err
}

func (s *accessor) addRef() error {
Expand Down
4 changes: 4 additions & 0 deletions store/cache/accessor_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ func (m *mockAccessor) Size(context.Context) int {
panic("implement me")
}

func (m *mockAccessor) DataRoot(context.Context) (share.DataHash, error) {
panic("implement me")
}

func (m *mockAccessor) Sample(context.Context, int, int) (shwap.Sample, error) {
panic("implement me")
}
Expand Down
2 changes: 1 addition & 1 deletion store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ type Cache interface {
Remove(height uint64) error

// EnableMetrics enables metrics in Cache
EnableMetrics() error
EnableMetrics() (unreg func() error, err error)
}
18 changes: 14 additions & 4 deletions store/cache/doublecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"errors"
"fmt"

eds "github.com/celestiaorg/celestia-node/share/new_eds"
)
Expand Down Expand Up @@ -43,9 +44,18 @@ func (mc *DoubleCache) Second() Cache {
return mc.second
}

func (mc *DoubleCache) EnableMetrics() error {
if err := mc.first.EnableMetrics(); err != nil {
return err
func (mc *DoubleCache) EnableMetrics() (unreg func() error, err error) {
unreg1, err := mc.first.EnableMetrics()
if err != nil {
return nil, fmt.Errorf("while enabling metrics for first cache: %w", err)
}
return mc.second.EnableMetrics()
unreg2, err := mc.second.EnableMetrics()
if err != nil {
return unreg1, fmt.Errorf("while enabling metrics for second cache: %w", err)
}

unregFn := func() error {
return errors.Join(unreg1(), unreg2())
}
return unregFn, nil
}
10 changes: 8 additions & 2 deletions store/cache/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ func (n NoopCache) Remove(uint64) error {
return nil
}

func (n NoopCache) EnableMetrics() error {
return nil
func (n NoopCache) EnableMetrics() (unreg func() error, err error) {
noop := func() error { return nil }
return noop, nil
}

var _ eds.AccessorStreamer = NoopFile{}
Expand All @@ -45,6 +46,11 @@ func (n NoopFile) Size(context.Context) int {
return 0
}

// DataRoot returns root hash of Accessor's underlying EDS.
func (n NoopFile) DataRoot(context.Context) (share.DataHash, error) {
return nil, nil
}

func (n NoopFile) Sample(context.Context, int, int) (shwap.Sample, error) {
return shwap.Sample{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion store/file/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func BenchmarkCodec(b *testing.B) {
}
}

func newShards(b require.TestingT, size int, fillParity bool) [][]byte {
func newShards(b testing.TB, size int, fillParity bool) [][]byte {
shards := make([][]byte, size)
original := sharetest.RandShares(b, size/2)
copy(shards, original)
Expand Down
7 changes: 6 additions & 1 deletion store/file/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package file

import (
"encoding/binary"
"errors"
"fmt"
"io"

Expand Down Expand Up @@ -42,7 +43,11 @@ const (
func readHeader(r io.Reader) (*headerV0, error) {
// read first byte to determine the fileVersion
var version headerVersion
if err := binary.Read(r, binary.LittleEndian, &version); err != nil {
err := binary.Read(r, binary.LittleEndian, &version)
if err != nil {
if errors.Is(err, io.EOF) {
return nil, ErrEmptyFile
}
return nil, fmt.Errorf("readHeader: %w", err)
}

Expand Down
13 changes: 12 additions & 1 deletion store/file/ods.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package file

import (
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -16,6 +17,10 @@ import (

var _ eds.AccessorStreamer = (*ODSFile)(nil)

// ErrEmptyFile signals that the ODS file is empty.
// This helps avoid storing empty block EDSes.
var ErrEmptyFile = errors.New("file is empty")

type ODSFile struct {
path string
hdr *headerV0
Expand Down Expand Up @@ -61,7 +66,8 @@ func CreateODSFile(
datahash share.DataHash,
eds *rsmt2d.ExtendedDataSquare,
) (*ODSFile, error) {
f, err := os.Create(path)
mod := os.O_RDWR | os.O_CREATE | os.O_EXCL // ensure we fail if already exist
f, err := os.OpenFile(path, mod, 0o666)
if err != nil {
return nil, fmt.Errorf("file create: %w", err)
}
Expand Down Expand Up @@ -114,6 +120,11 @@ func (f *ODSFile) size() int {
return int(f.hdr.squareSize)
}

// DataRoot returns root hash of Accessor's underlying EDS.
func (f *ODSFile) DataRoot(context.Context) (share.DataHash, error) {
return f.hdr.datahash, nil
}

// Close closes the file.
func (f *ODSFile) Close() error {
return f.fl.Close()
Expand Down
8 changes: 6 additions & 2 deletions store/file/q1q4_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ func OpenQ1Q4File(path string) (*Q1Q4File, error) {
}, nil
}

func CreateQ1Q4File(path string, datahash share.DataHash, eds *rsmt2d.ExtendedDataSquare,
) (*Q1Q4File, error) {
func CreateQ1Q4File(path string, datahash share.DataHash, eds *rsmt2d.ExtendedDataSquare) (*Q1Q4File, error) {
ods, err := CreateODSFile(path, datahash, eds)
if err != nil {
return nil, err
Expand All @@ -54,6 +53,11 @@ func (f *Q1Q4File) Size(ctx context.Context) int {
return f.ods.Size(ctx)
}

// DataRoot returns root hash of Accessor's underlying EDS.
func (f *Q1Q4File) DataRoot(ctx context.Context) (share.DataHash, error) {
return f.ods.DataRoot(ctx)
}

func (f *Q1Q4File) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
// use native AxisHalf implementation, to read axis from Q4 quandrant when possible
half, err := f.AxisHalf(ctx, rsmt2d.Row, rowIdx)
Expand Down
141 changes: 141 additions & 0 deletions store/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package store

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
failedKey = "failed"
sizeKey = "eds_size"
)

var meter = otel.Meter("store")

type metrics struct {
put metric.Float64Histogram
putExists metric.Int64Counter
get metric.Float64Histogram
has metric.Float64Histogram
remove metric.Float64Histogram
unreg func() error
}

func (s *Store) WithMetrics() error {
put, err := meter.Float64Histogram("eds_store_put_time_histogram",
metric.WithDescription("eds store put time histogram(s)"))
if err != nil {
return err
}

putExists, err := meter.Int64Counter("eds_store_put_exists_counter",
metric.WithDescription("eds store put file exists"))
if err != nil {
return err
}

get, err := meter.Float64Histogram("eds_store_get_time_histogram",
metric.WithDescription("eds store get time histogram(s)"))
if err != nil {
return err
}

has, err := meter.Float64Histogram("eds_store_has_time_histogram",
metric.WithDescription("eds store has time histogram(s)"))
if err != nil {
return err
}

remove, err := meter.Float64Histogram("eds_store_remove_time_histogram",
metric.WithDescription("eds store remove time histogram(s)"))
if err != nil {
return err
}

unreg, err := s.cache.EnableMetrics()
if err != nil {
return fmt.Errorf("while enabling metrics for cache: %w", err)
}

s.metrics = &metrics{
put: put,
putExists: putExists,
get: get,
has: has,
remove: remove,
unreg: unreg,
}
return nil
}

func (m *metrics) observePut(ctx context.Context, dur time.Duration, size uint, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.put.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed),
attribute.Int(sizeKey, int(size))))
}

func (m *metrics) observePutExist(ctx context.Context) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.putExists.Add(ctx, 1)
}

func (m *metrics) observeGet(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.get.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeHas(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.has.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeRemove(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.remove.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) close() error {
if m == nil {
return nil
}
return m.unreg()
}
Loading

0 comments on commit 2e80fc6

Please sign in to comment.