Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): Add eds store #3545

Merged
merged 20 commits into from
Jul 18, 2024
Merged
2 changes: 2 additions & 0 deletions share/new_eds/accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
type Accessor interface {
// Size returns square size of the Accessor.
Size(ctx context.Context) int
// DataRoot returns data hash of the Accessor.
DataRoot(ctx context.Context) (share.DataHash, error)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
// Sample returns share and corresponding proof for row and column indices. Implementation can
// choose which axis to use for proof. Chosen axis for proof should be indicated in the returned
// Sample.
Expand Down
8 changes: 8 additions & 0 deletions share/new_eds/close_once.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ func (c *closeOnce) Size(ctx context.Context) int {
return c.f.Size(ctx)
}

// DataRoot returns root hash of Accessor's underlying EDS.
func (c *closeOnce) DataRoot(ctx context.Context) (share.DataHash, error) {
if c.closed.Load() {
return nil, errAccessorClosed
}
return c.f.DataRoot(ctx)
}

func (c *closeOnce) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
if c.closed.Load() {
return shwap.Sample{}, errAccessorClosed
Expand Down
4 changes: 4 additions & 0 deletions share/new_eds/close_once_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (s *stubEdsAccessorCloser) Size(context.Context) int {
return 0
}

func (s *stubEdsAccessorCloser) DataRoot(context.Context) (share.DataHash, error) {
return nil, nil
}

func (s *stubEdsAccessorCloser) Sample(context.Context, int, int) (shwap.Sample, error) {
return shwap.Sample{}, nil
}
Expand Down
4 changes: 4 additions & 0 deletions share/new_eds/proofs_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (c *proofsCache) Size(ctx context.Context) int {
return int(size)
}

func (c *proofsCache) DataRoot(ctx context.Context) (share.DataHash, error) {
return c.inner.DataRoot(ctx)
}

func (c *proofsCache) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
axisType, axisIdx, shrIdx := rsmt2d.Row, rowIdx, colIdx
ax, err := c.axisWithProofs(ctx, axisType, axisIdx)
Expand Down
9 changes: 9 additions & 0 deletions share/new_eds/rsmt2d.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ func (eds *Rsmt2D) Size(context.Context) int {
return int(eds.Width())
}

// DataRoot returns data hash of the Accessor.
func (eds *Rsmt2D) DataRoot(context.Context) (share.DataHash, error) {
dah, err := share.NewRoot(eds.ExtendedDataSquare)
if err != nil {
return share.DataHash{}, fmt.Errorf("while creating data root: %w", err)
}
return dah.Hash(), nil
}

// Sample returns share and corresponding proof for row and column indices.
func (eds *Rsmt2D) Sample(
_ context.Context,
Expand Down
5 changes: 2 additions & 3 deletions store/cache/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,9 @@ func (bc *AccessorCache) Remove(height uint64) error {
}

// EnableMetrics enables metrics for the cache.
func (bc *AccessorCache) EnableMetrics() error {
var err error
func (bc *AccessorCache) EnableMetrics() (unreg func() error, err error) {
bc.metrics, err = newMetrics(bc)
return err
return bc.metrics.reg.Unregister, err
}

func (s *accessor) addRef() error {
Expand Down
4 changes: 4 additions & 0 deletions store/cache/accessor_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ func (m *mockAccessor) Size(context.Context) int {
panic("implement me")
}

func (m *mockAccessor) DataRoot(context.Context) (share.DataHash, error) {
panic("implement me")
}

func (m *mockAccessor) Sample(context.Context, int, int) (shwap.Sample, error) {
panic("implement me")
}
Expand Down
2 changes: 1 addition & 1 deletion store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ type Cache interface {
Remove(height uint64) error

// EnableMetrics enables metrics in Cache
EnableMetrics() error
EnableMetrics() (unreg func() error, err error)
}
18 changes: 14 additions & 4 deletions store/cache/doublecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"errors"
"fmt"

eds "github.com/celestiaorg/celestia-node/share/new_eds"
)
Expand Down Expand Up @@ -43,9 +44,18 @@ func (mc *DoubleCache) Second() Cache {
return mc.second
}

func (mc *DoubleCache) EnableMetrics() error {
if err := mc.first.EnableMetrics(); err != nil {
return err
func (mc *DoubleCache) EnableMetrics() (unreg func() error, err error) {
unreg1, err := mc.first.EnableMetrics()
if err != nil {
return nil, fmt.Errorf("while enabling metrics for first cache: %w", err)
}
return mc.second.EnableMetrics()
unreg2, err := mc.second.EnableMetrics()
if err != nil {
return unreg1, fmt.Errorf("while enabling metrics for second cache: %w", err)
}

unregFn := func() error {
return errors.Join(unreg1(), unreg2())
}
return unregFn, nil
}
10 changes: 8 additions & 2 deletions store/cache/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ func (n NoopCache) Remove(uint64) error {
return nil
}

func (n NoopCache) EnableMetrics() error {
return nil
func (n NoopCache) EnableMetrics() (unreg func() error, err error) {
noop := func() error { return nil }
return noop, nil
}

var _ eds.AccessorStreamer = NoopFile{}
Expand All @@ -45,6 +46,11 @@ func (n NoopFile) Size(context.Context) int {
return 0
}

// DataRoot returns root hash of Accessor's underlying EDS.
func (n NoopFile) DataRoot(context.Context) (share.DataHash, error) {
return nil, nil
}

func (n NoopFile) Sample(context.Context, int, int) (shwap.Sample, error) {
return shwap.Sample{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion store/file/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func BenchmarkCodec(b *testing.B) {
}
}

func newShards(b require.TestingT, size int, fillParity bool) [][]byte {
func newShards(b testing.TB, size int, fillParity bool) [][]byte {
shards := make([][]byte, size)
original := sharetest.RandShares(b, size/2)
copy(shards, original)
Expand Down
7 changes: 6 additions & 1 deletion store/file/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package file

import (
"encoding/binary"
"errors"
"fmt"
"io"

Expand Down Expand Up @@ -42,7 +43,11 @@ const (
func readHeader(r io.Reader) (*headerV0, error) {
// read first byte to determine the fileVersion
var version headerVersion
if err := binary.Read(r, binary.LittleEndian, &version); err != nil {
err := binary.Read(r, binary.LittleEndian, &version)
if err != nil {
if errors.Is(err, io.EOF) {
return nil, ErrEmptyFile
}
return nil, fmt.Errorf("readHeader: %w", err)
}

Expand Down
13 changes: 12 additions & 1 deletion store/file/ods.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package file

import (
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -16,6 +17,10 @@ import (

var _ eds.AccessorStreamer = (*ODSFile)(nil)

// ErrEmptyFile signals that the ODS file is empty.
// This helps avoid storing empty block EDSes.
var ErrEmptyFile = errors.New("file is empty")

type ODSFile struct {
path string
hdr *headerV0
Expand Down Expand Up @@ -61,7 +66,8 @@ func CreateODSFile(
datahash share.DataHash,
eds *rsmt2d.ExtendedDataSquare,
) (*ODSFile, error) {
f, err := os.Create(path)
mod := os.O_RDWR | os.O_CREATE | os.O_EXCL // ensure we fail if already exist
f, err := os.OpenFile(path, mod, 0o666)
if err != nil {
return nil, fmt.Errorf("file create: %w", err)
}
Expand Down Expand Up @@ -114,6 +120,11 @@ func (f *ODSFile) size() int {
return int(f.hdr.squareSize)
}

// DataRoot returns root hash of Accessor's underlying EDS.
func (f *ODSFile) DataRoot(context.Context) (share.DataHash, error) {
return f.hdr.datahash, nil
}

// Close closes the file.
func (f *ODSFile) Close() error {
return f.fl.Close()
Expand Down
8 changes: 6 additions & 2 deletions store/file/q1q4_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ func OpenQ1Q4File(path string) (*Q1Q4File, error) {
}, nil
}

func CreateQ1Q4File(path string, datahash share.DataHash, eds *rsmt2d.ExtendedDataSquare,
) (*Q1Q4File, error) {
func CreateQ1Q4File(path string, datahash share.DataHash, eds *rsmt2d.ExtendedDataSquare) (*Q1Q4File, error) {
ods, err := CreateODSFile(path, datahash, eds)
if err != nil {
return nil, err
Expand All @@ -54,6 +53,11 @@ func (f *Q1Q4File) Size(ctx context.Context) int {
return f.ods.Size(ctx)
}

// DataRoot returns root hash of Accessor's underlying EDS.
func (f *Q1Q4File) DataRoot(ctx context.Context) (share.DataHash, error) {
return f.ods.DataRoot(ctx)
}

func (f *Q1Q4File) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
// use native AxisHalf implementation, to read axis from Q4 quandrant when possible
half, err := f.AxisHalf(ctx, rsmt2d.Row, rowIdx)
Expand Down
141 changes: 141 additions & 0 deletions store/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package store

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
failedKey = "failed"
sizeKey = "eds_size"
)

var meter = otel.Meter("store")

type metrics struct {
put metric.Float64Histogram
putExists metric.Int64Counter
get metric.Float64Histogram
has metric.Float64Histogram
remove metric.Float64Histogram
unreg func() error
}

func (s *Store) WithMetrics() error {
put, err := meter.Float64Histogram("eds_store_put_time_histogram",
metric.WithDescription("eds store put time histogram(s)"))
if err != nil {
return err
}

putExists, err := meter.Int64Counter("eds_store_put_exists_counter",
metric.WithDescription("eds store put file exists"))
if err != nil {
return err
}

get, err := meter.Float64Histogram("eds_store_get_time_histogram",
metric.WithDescription("eds store get time histogram(s)"))
if err != nil {
return err
}

has, err := meter.Float64Histogram("eds_store_has_time_histogram",
metric.WithDescription("eds store has time histogram(s)"))
if err != nil {
return err
}

remove, err := meter.Float64Histogram("eds_store_remove_time_histogram",
metric.WithDescription("eds store remove time histogram(s)"))
if err != nil {
return err
}

unreg, err := s.cache.EnableMetrics()
if err != nil {
return fmt.Errorf("while enabling metrics for cache: %w", err)
}

s.metrics = &metrics{
put: put,
putExists: putExists,
get: get,
has: has,
remove: remove,
unreg: unreg,
}
return nil
}

func (m *metrics) observePut(ctx context.Context, dur time.Duration, size uint, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.put.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed),
attribute.Int(sizeKey, int(size))))
}

func (m *metrics) observePutExist(ctx context.Context) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.putExists.Add(ctx, 1)
}

func (m *metrics) observeGet(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.get.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeHas(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.has.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeRemove(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.remove.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) close() error {
if m == nil {
return nil
}
return m.unreg()
}
Loading