Skip to content

Commit

Permalink
add eds store
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Jul 4, 2024
1 parent 5159e4f commit 265bee2
Show file tree
Hide file tree
Showing 15 changed files with 1,148 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.26.0
go.opentelemetry.io/otel/trace v1.26.0
go.opentelemetry.io/proto/otlp v1.2.0
go.uber.org/atomic v1.11.0
go.uber.org/fx v1.21.1
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.21.0
Expand Down Expand Up @@ -313,7 +314,6 @@ require (
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions share/new_eds/accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
type Accessor interface {
// Size returns square size of the Accessor.
Size(ctx context.Context) int
// DataHash returns data hash of the Accessor.
DataHash(ctx context.Context) (share.DataHash, error)
// Sample returns share and corresponding proof for row and column indices. Implementation can
// choose which axis to use for proof. Chosen axis for proof should be indicated in the returned
// Sample.
Expand Down
8 changes: 8 additions & 0 deletions share/new_eds/close_once.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ func (c *closeOnce) Size(ctx context.Context) int {
return c.f.Size(ctx)
}

// DataHash returns data hash of the Accessor.
func (c *closeOnce) DataHash(ctx context.Context) (share.DataHash, error) {
if c.closed.Load() {
return nil, errAccessorClosed
}
return c.f.DataHash(ctx)
}

func (c *closeOnce) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
if c.closed.Load() {
return shwap.Sample{}, errAccessorClosed
Expand Down
4 changes: 4 additions & 0 deletions share/new_eds/close_once_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (s *stubEdsAccessorCloser) Size(context.Context) int {
return 0
}

func (s *stubEdsAccessorCloser) DataHash(context.Context) (share.DataHash, error) {
return nil, nil
}

func (s *stubEdsAccessorCloser) Sample(context.Context, int, int) (shwap.Sample, error) {
return shwap.Sample{}, nil
}
Expand Down
5 changes: 5 additions & 0 deletions share/new_eds/proofs_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func (c *proofsCache) Size(ctx context.Context) int {
return int(size)
}

// DataHash returns data hash of the Accessor.
func (c *proofsCache) DataHash(ctx context.Context) (share.DataHash, error) {
return c.inner.DataHash(ctx)
}

func (c *proofsCache) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
axisType, axisIdx, shrIdx := rsmt2d.Row, rowIdx, colIdx
ax, err := c.axisWithProofs(ctx, axisType, axisIdx)
Expand Down
7 changes: 7 additions & 0 deletions share/new_eds/rsmt2d.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/rsmt2d"

Expand All @@ -24,6 +25,12 @@ func (eds *Rsmt2D) Size(context.Context) int {
return int(eds.Width())
}

// DataHash returns data hash of the Accessor.
func (eds *Rsmt2D) DataHash(context.Context) (share.DataHash, error) {
dah, _ := da.NewDataAvailabilityHeader(eds.ExtendedDataSquare)
return dah.Hash(), nil
}

// Sample returns share and corresponding proof for row and column indices.
func (eds *Rsmt2D) Sample(
_ context.Context,
Expand Down
4 changes: 4 additions & 0 deletions store/cache/accessor_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ func (m *mockAccessor) Size(context.Context) int {
panic("implement me")
}

func (m *mockAccessor) DataHash(context.Context) (share.DataHash, error) {
panic("implement me")
}

func (m *mockAccessor) Sample(context.Context, int, int) (shwap.Sample, error) {
panic("implement me")
}
Expand Down
5 changes: 5 additions & 0 deletions store/cache/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (n NoopFile) Size(context.Context) int {
return 0
}

// DataHash returns data hash of the Accessor.
func (n NoopFile) DataHash(context.Context) (share.DataHash, error) {
return nil, nil
}

func (n NoopFile) Sample(context.Context, int, int) (shwap.Sample, error) {
return shwap.Sample{}, nil
}
Expand Down
5 changes: 5 additions & 0 deletions store/file/ods.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (f *ODSFile) size() int {
return int(f.hdr.squareSize)
}

// DataHash returns data hash of the Accessor.
func (f *ODSFile) DataHash(context.Context) (share.DataHash, error) {
return f.hdr.datahash, nil
}

// Close closes the file.
func (f *ODSFile) Close() error {
return f.fl.Close()
Expand Down
5 changes: 5 additions & 0 deletions store/file/q1q4_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (f *Q1Q4File) Size(ctx context.Context) int {
return f.ods.Size(ctx)
}

// DataHash returns data hash of the Accessor.
func (f *Q1Q4File) DataHash(ctx context.Context) (share.DataHash, error) {
return f.ods.DataHash(ctx)
}

func (f *Q1Q4File) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
// use native AxisHalf implementation, to read axis from Q4 quandrant when possible
half, err := f.AxisHalf(ctx, rsmt2d.Row, rowIdx)
Expand Down
130 changes: 130 additions & 0 deletions store/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package store

import (
"context"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
failedKey = "failed"
sizeKey = "eds_size"
)

var meter = otel.Meter("store")

type metrics struct {
put metric.Float64Histogram
putExists metric.Int64Counter
get metric.Float64Histogram
has metric.Float64Histogram
remove metric.Float64Histogram
}

func (s *Store) WithMetrics() error {
put, err := meter.Float64Histogram("eds_store_put_time_histogram",
metric.WithDescription("eds store put time histogram(s)"))
if err != nil {
return err
}

putExists, err := meter.Int64Counter("eds_store_put_exists_counter",
metric.WithDescription("eds store put file exists"))
if err != nil {
return err
}

get, err := meter.Float64Histogram("eds_store_get_time_histogram",
metric.WithDescription("eds store get time histogram(s)"))
if err != nil {
return err
}

has, err := meter.Float64Histogram("eds_store_has_time_histogram",
metric.WithDescription("eds store has time histogram(s)"))
if err != nil {
return err
}

remove, err := meter.Float64Histogram("eds_store_remove_time_histogram",
metric.WithDescription("eds store remove time histogram(s)"))
if err != nil {
return err
}

if err = s.cache.EnableMetrics(); err != nil {
return err
}

s.metrics = &metrics{
put: put,
putExists: putExists,
get: get,
has: has,
remove: remove,
}
return nil
}

func (m *metrics) observePut(ctx context.Context, dur time.Duration, size uint, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.put.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed),
attribute.Int(sizeKey, int(size))))
}

func (m *metrics) observePutExist(ctx context.Context) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.putExists.Add(ctx, 1)
}

func (m *metrics) observeGet(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.get.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeHas(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.has.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeRemove(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.remove.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}
Loading

0 comments on commit 265bee2

Please sign in to comment.