Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement v2 client GET functionality #972

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4625e96
Write GET tests
litt3 Dec 9, 2024
f07e820
Merge branch 'master' into client-v2-get
litt3 Dec 10, 2024
885c131
Respond to PR comments
litt3 Dec 10, 2024
6848663
Create new V2 client config
litt3 Dec 10, 2024
a48afb1
Respond to more PR comments
litt3 Dec 11, 2024
225f2a3
Fix failing unit test
litt3 Dec 12, 2024
d265f6a
Merge branch 'master' into client-v2-get
litt3 Dec 12, 2024
e9d91c5
Adopt new package structure
litt3 Dec 12, 2024
dd3c262
Use new test random util
litt3 Dec 12, 2024
88df865
Implement relay call timeout
litt3 Dec 12, 2024
505a1f0
Use correct error join method
litt3 Dec 12, 2024
2b87633
Merge branch 'master' into client-v2-get
litt3 Jan 8, 2025
cf1cd80
Make updates required by upstream changes
litt3 Jan 8, 2025
53893d8
Update how FFT and IFFT are referred to
litt3 Jan 13, 2025
0373dd7
Implement GetPayload
litt3 Jan 13, 2025
826a026
Remove GetBlob, leaving only GetPayload
litt3 Jan 13, 2025
975b6e5
Remove unnecessary codec mock
litt3 Jan 13, 2025
0666d24
Use more reasonable line breaks for logs
litt3 Jan 13, 2025
0a49aa5
Test malicious cert
litt3 Jan 13, 2025
1193ce7
Merge branch 'master' into client-v2-get
litt3 Jan 13, 2025
496e277
Merge branch 'master' into client-v2-get
litt3 Jan 14, 2025
2d392ff
Finish test coverage
litt3 Jan 14, 2025
db51291
Fix commitment length check
litt3 Jan 14, 2025
4f3280c
Merge branch 'master' into client-v2-get
litt3 Jan 16, 2025
aaa1342
Call VerifyBlobV2
litt3 Jan 17, 2025
9be51e6
Simply verify blob
litt3 Jan 17, 2025
cc6b9a1
Merge branch 'master' into client-v2-get
litt3 Jan 17, 2025
ae926c7
Clean up
litt3 Jan 17, 2025
f82d128
Merge branch 'master' into client-v2-get
litt3 Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions api/clients/codecs/mock/BlobCodec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package mock
litt3 marked this conversation as resolved.
Show resolved Hide resolved

import mock "github.com/stretchr/testify/mock"

// BlobCodec is an autogenerated mock type for the BlobCodec type
type BlobCodec struct {
mock.Mock
}

// DecodeBlob provides a mock function with given fields: encodedData
func (_m *BlobCodec) DecodeBlob(encodedData []byte) ([]byte, error) {
ret := _m.Called(encodedData)

if len(ret) == 0 {
litt3 marked this conversation as resolved.
Show resolved Hide resolved
panic("no return value specified for DecodeBlob")
}

var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok {
return rf(encodedData)
}
if rf, ok := ret.Get(0).(func([]byte) []byte); ok {
r0 = rf(encodedData)
} else {
if ret.Get(0) != nil {
litt3 marked this conversation as resolved.
Show resolved Hide resolved
r0 = ret.Get(0).([]byte)
litt3 marked this conversation as resolved.
Show resolved Hide resolved
}
}

if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(encodedData)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// EncodeBlob provides a mock function with given fields: rawData
func (_m *BlobCodec) EncodeBlob(rawData []byte) ([]byte, error) {
ret := _m.Called(rawData)

if len(ret) == 0 {
panic("no return value specified for EncodeBlob")
}

var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok {
return rf(rawData)
}
if rf, ok := ret.Get(0).(func([]byte) []byte); ok {
r0 = rf(rawData)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}

if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(rawData)
} else {
r1 = ret.Error(1)
}

return r0, r1
}
17 changes: 17 additions & 0 deletions api/clients/config_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package clients

import (
"github.com/Layr-Labs/eigenda/api/clients/codecs"
)

// EigenDAClientConfigV2 contains configuration values for EigenDAClientV2
type EigenDAClientConfigV2 struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting this thread to discuss package structure. I personally don't like that all our clients are in the same package. I think I would prefer if each client was in its own separate package, because that allows to have cleaner names, and make sure to prevent clashes. For eg:

import "...clients/v2/disperser"
import edaclient ".../v2/eigenda"
func main() {
   dispClient := disperser.New(disperser.Config{...})
   edaclient.New(edaclient.Config{...}, dispClient)
}

is way cleaner than having long names all under clients package.

I'm fine if people prefer to keep all the clients in same namespace, but at least I think we should separate the v1 and v2 namespace. Thoughts? @ian-shim @epociask @bxue-l2 @jianoaix

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. We are getting a lot "v2" in the names around.

// The blob encoding version to use when writing blobs from the high level interface.
PutBlobEncodingVersion codecs.BlobEncodingVersion
litt3 marked this conversation as resolved.
Show resolved Hide resolved

// Point verification mode does an IFFT on data before it is written, and does an FFT on data after it is read.
// This makes it possible to open points on the KZG commitment to prove that the field elements correspond to
// the commitment. With this mode disabled, you will need to supply the entire blob to perform a verification
// that any part of the data matches the KZG commitment.
DisablePointVerificationMode bool
litt3 marked this conversation as resolved.
Show resolved Hide resolved
}
148 changes: 148 additions & 0 deletions api/clients/eigenda_client_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package clients

import (
"context"
"fmt"
"github.com/Layr-Labs/eigenda/api/clients/codecs"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/cockroachdb/errors/join"
"math/rand"
)

// EigenDAClientV2 provides the ability to get blobs from the relay subsystem, and to send new blobs to the disperser.
type EigenDAClientV2 struct {
litt3 marked this conversation as resolved.
Show resolved Hide resolved
log logging.Logger
// doesn't need to be cryptographically secure, as it's only used to distribute load across relays

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of scope for this PR but curious if we'd ever wanna let users define their own retrieval policies when communicating with relays

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly something to consider, at the latest if/when users have to pay relays for retrieval

random *rand.Rand
config *EigenDAClientConfigV2
codec codecs.BlobCodec
relayClient RelayClient
}

// BuildEigenDAClientV2 builds an EigenDAClientV2 from config structs.
func BuildEigenDAClientV2(
log logging.Logger,
config *EigenDAClientConfigV2,
relayClientConfig *RelayClientConfig) (*EigenDAClientV2, error) {

relayClient, err := NewRelayClient(relayClientConfig, log)
if err != nil {
return nil, fmt.Errorf("new relay client: %w", err)
}

codec, err := createCodec(config)
if err != nil {
return nil, err
}

return NewEigenDAClientV2(log, rand.New(rand.NewSource(rand.Int63())), config, relayClient, codec)
}

// NewEigenDAClientV2 assembles an EigenDAClientV2 from subcomponents that have already been constructed and initialized.
func NewEigenDAClientV2(
log logging.Logger,
random *rand.Rand,
config *EigenDAClientConfigV2,
relayClient RelayClient,
codec codecs.BlobCodec) (*EigenDAClientV2, error) {

return &EigenDAClientV2{
log: log,
random: random,
config: config,
codec: codec,
relayClient: relayClient,
}, nil
}

// GetBlob iteratively attempts to retrieve a given blob with key blobKey from the relays listed in the blobCertificate.
//
// The relays are attempted in random order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why this choice. How is the relayKey list populated? Shouldnt we let the person populating be able to dictate some preference on the relays by iterating through them in normal order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My logic (which may be based on incorrect assumptions) is that, since the relayKey list is part of the cert, the order would the same for every client.

It seems we wouldn't want all clients to attempt the first relay in the list, rather we should distribute load across all available relays.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. I think this makes sense. Although now I just realized I'm confused myself. @ian-shim does this make sense? Are the relayKeys meant to be for data replication or sharding? Aka do we need to hit all the relayKeys to get all the blobs, or any ONE should do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The disperser implementation itself shuffles the relay keys, so attempting each relay in order is actually fine. But it's good that we're not assuming that relay keys aren't ordered in any particular way.

Aka do we need to hit all the relayKeys to get all the blobs, or any ONE should do?

Any one should do!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attempting each relay in order is actually fine

@ian-shim The randomization is done while creating the cert, and all clients will be given the same cert from the disperser, right?

So, for a given blob that can be served by relays [X, Y, Z], if all clients were to attempt the relays in order, then they all would try to get the blob from relay X, and relays [Y, Z] wouldn't receive any load.

Is this the correct understanding, and if so, are you saying this behavior would be ok?

//
// The returned blob is decoded.
func (c *EigenDAClientV2) GetBlob(
ctx context.Context,
blobKey corev2.BlobKey,
blobCertificate corev2.BlobCertificate) ([]byte, error) {

relayKeyCount := len(blobCertificate.RelayKeys)
samlaf marked this conversation as resolved.
Show resolved Hide resolved

if relayKeyCount == 0 {
return nil, fmt.Errorf("relay key count is zero")
litt3 marked this conversation as resolved.
Show resolved Hide resolved
}

var indices []int
litt3 marked this conversation as resolved.
Show resolved Hide resolved
// create a randomized array of indices, so that it isn't always the first relay in the list which gets hit
for i := 0; i < relayKeyCount; i++ {
indices = append(indices, i)
}
litt3 marked this conversation as resolved.
Show resolved Hide resolved
c.random.Shuffle(len(indices), func(i int, j int) {
indices[i], indices[j] = indices[j], indices[i]
})

// TODO (litt3): consider creating a utility which can deprioritize relays that fail to respond (or respond maliciously)
litt3 marked this conversation as resolved.
Show resolved Hide resolved

// iterate over relays in random order, until we are able to get the blob from someone
for _, val := range indices {
relayKey := blobCertificate.RelayKeys[val]

// TODO: does this need a timeout?
litt3 marked this conversation as resolved.
Show resolved Hide resolved
data, err := c.relayClient.GetBlob(ctx, relayKey, blobKey)
litt3 marked this conversation as resolved.
Show resolved Hide resolved

// if GetBlob returned an error, try calling a different relay
if err != nil {
c.log.Warn("blob couldn't be retrieved from relay", "blobKey", blobKey, "relayKey", relayKey, "error", err)
continue
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the circumstance where the error is transient and the # of relay keys == 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting that we have an additional timeout, during which the client repeatedly retries all relays?

I could implement this if it's the way we want to go- but I don't see how the case relay keys == 1 is unique?


// An honest relay should never send an empty blob
if len(data) == 0 {
c.log.Warn("blob received from relay had length 0", "blobKey", blobKey, "relayKey", relayKey, "error", err)
litt3 marked this conversation as resolved.
Show resolved Hide resolved
continue
}

// An honest relay should never send a blob which cannot be decoded

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To expand these invariants: an honest relay should never send a blob which doesn't respect its polynomial commitments. The thing is though this check would get caught upstream (i.e, within proxy directly) and probably cause the request to fail. The proxy client would trigger a retry which would probably route to another relay.

this isn't a big problem rn and we can just document it somewhere for circle back sometime in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason not to check this invariant here, included in this PR? Seems like it wouldn't be hard to add

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commitment are being checked in the most recent iteration.

decodedData, err := c.codec.DecodeBlob(data)
if err != nil {
c.log.Warn("error decoding blob", "blobKey", blobKey, "relayKey", relayKey, "error", err)
litt3 marked this conversation as resolved.
Show resolved Hide resolved
continue
}

return decodedData, nil
}

return nil, fmt.Errorf("unable to retrieve blob from any relay")
litt3 marked this conversation as resolved.
Show resolved Hide resolved
}

// GetCodec returns the codec the client uses for encoding and decoding blobs
func (c *EigenDAClientV2) GetCodec() codecs.BlobCodec {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this getter?

Copy link
Contributor Author

@litt3 litt3 Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I carried it forward from v1 client - will remove as it may not be necessary for v2 (nevermind, see comment from Sam below)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll still need it. We use it in proxy to get the codec and IFFT blobs when computing commitments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this method for now, but I don't think that the proxy will end up needing it. This client will be responsible for computing commitments when dispersing a payload

return c.codec
}

// Close is responsible for calling close on all internal clients. This method will do its best to close all internal
// clients, even if some closes fail.
//
// Any and all errors returned from closing internal clients will be joined and returned.
//
// This method should only be called once.
func (c *EigenDAClientV2) Close() error {
relayClientErr := c.relayClient.Close()

// TODO: this is using join, since there will be more subcomponents requiring closing after adding PUT functionality
return join.Join(relayClientErr)
}

// createCodec creates the codec based on client config values
func createCodec(config *EigenDAClientConfigV2) (codecs.BlobCodec, error) {
lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.PutBlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("create low level codec: %w", err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

knit:

Suggested change
return nil, fmt.Errorf("create low level codec: %w", err)
return nil, fmt.Errorf("could not create low level codec: %w", err)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

if config.DisablePointVerificationMode {
return codecs.NewNoIFFTCodec(lowLevelCodec), nil
} else {
return codecs.NewIFFTCodec(lowLevelCodec), nil
}
}
Loading
Loading