Skip to content

Commit

Permalink
Style: improve api clients comments (#780)
Browse files Browse the repository at this point in the history
  • Loading branch information
samlaf authored Oct 19, 2024
1 parent 0ee84f9 commit 78e4fc6
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 53 deletions.
9 changes: 6 additions & 3 deletions api/clients/codecs/blob_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (

type BlobEncodingVersion byte

// All blob encodings are IFFT'd before being dispersed
const (
// This minimal blob encoding includes a version byte, a length uint32, and 31 byte field element mapping.
// This minimal blob encoding contains a 32 byte header = [0x00, version byte, uint32 len of data, 0x00, 0x00,...]
// followed by the encoded data [0x00, 31 bytes of data, 0x00, 31 bytes of data,...]
DefaultBlobEncoding BlobEncodingVersion = 0x0
)

Expand All @@ -30,6 +30,9 @@ func GenericDecodeBlob(data []byte) ([]byte, error) {
if len(data) <= 32 {
return nil, fmt.Errorf("data is not of length greater than 32 bytes: %d", len(data))
}
// version byte is stored in [1], because [0] is always 0 to ensure the codecBlobHeader is a valid bn254 element
// see https://github.com/Layr-Labs/eigenda/blob/master/api/clients/codecs/default_blob_codec.go#L21
// TODO: we should prob be working over a struct with methods such as GetBlobEncodingVersion() to prevent index errors
version := BlobEncodingVersion(data[1])
codec, err := BlobEncodingVersionToCodec(version)
if err != nil {
Expand All @@ -38,7 +41,7 @@ func GenericDecodeBlob(data []byte) ([]byte, error) {

data, err = codec.DecodeBlob(data)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to decode blob: %w", err)
}

return data, nil
Expand Down
24 changes: 16 additions & 8 deletions api/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,33 @@ import (
)

type EigenDAClientConfig struct {
// RPC is the HTTP provider URL for the Data Availability node.
// RPC is the HTTP provider URL for the EigenDA Disperser
RPC string

// The total amount of time that the client will spend waiting for EigenDA to confirm a blob
// Timeout used when making dispersals to the EigenDA Disperser
// TODO: we should change this param as its name is quite confusing
ResponseTimeout time.Duration

// The total amount of time that the client will spend waiting for EigenDA
// to confirm a blob after it has been dispersed
// Note that reasonable values for this field will depend on the value of WaitForFinalization.
StatusQueryTimeout time.Duration

// The amount of time to wait between status queries of a newly dispersed blob
StatusQueryRetryInterval time.Duration

// The total amount of time that the client will waiting for a response from the EigenDA disperser
ResponseTimeout time.Duration
// If true, will wait for the blob to finalize, if false, will wait only for the blob to confirm.
WaitForFinalization bool

// The quorum IDs to write blobs to using this client. Should not include default quorums 0 or 1.
// TODO: should we change this to core.QuorumID instead? https://github.com/Layr-Labs/eigenda/blob/style--improve-api-clients-comments/core/data.go#L18
CustomQuorumIDs []uint

// Signer private key in hex encoded format. This key should not be associated with an Ethereum address holding any funds.
// Signer private key in hex encoded format. This key is currently purely used for authn/authz on the disperser.
// For security, it should not be associated with an Ethereum address holding any funds.
// This might change once we introduce payments.
// OPTIONAL: this value is optional, and if set to "", will result in a read-only eigenDA client,
// that can retrieve blobs but cannot disperse blobs.
SignerPrivateKeyHex string

// Whether to disable TLS for an insecure connection when connecting to a local EigenDA disperser instance.
Expand All @@ -37,9 +48,6 @@ type EigenDAClientConfig struct {
// the commitment. With this mode disabled, you will need to supply the entire blob to perform a verification
// that any part of the data matches the KZG commitment.
DisablePointVerificationMode bool

// If true, will wait for the blob to finalize, if false, will wait only for the blob to confirm.
WaitForFinalization bool
}

func (c *EigenDAClientConfig) CheckAndSetDefaults() error {
Expand Down
3 changes: 3 additions & 0 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
type Config struct {
Hostname string
Port string
// BlobDispersal Timeouts for both authenticated and unauthenticated dispersals
// GetBlobStatus and RetrieveBlob timeouts are hardcoded to 60seconds
// TODO: do we want to add config timeouts for those separate requests?
Timeout time.Duration
UseSecureGrpcFlag bool
}
Expand Down
54 changes: 32 additions & 22 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,26 @@ import (
"github.com/ethereum/go-ethereum/log"
)

// IEigenDAClient is a wrapper around the DisperserClient interface which
// encodes blobs before dispersing them, and decodes them after retrieving them.
type IEigenDAClient interface {
GetBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error)
PutBlob(ctx context.Context, txData []byte) (*grpcdisperser.BlobInfo, error)
GetCodec() codecs.BlobCodec
}

// EigenDAClient is a wrapper around the DisperserClient which
// encodes blobs before dispersing them, and decodes them after retrieving them.
type EigenDAClient struct {
// TODO: all of these should be private, to prevent users from using them directly,
// which breaks encapsulation and makes it hard for us to do refactors or changes
Config EigenDAClientConfig
Log log.Logger
Client DisperserClient
Codec codecs.BlobCodec
}

var _ IEigenDAClient = EigenDAClient{}
var _ IEigenDAClient = &EigenDAClient{}

func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClient, error) {
err := config.CheckAndSetDefaults()
Expand All @@ -46,6 +52,7 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
if len(config.SignerPrivateKeyHex) == 64 {
signer = auth.NewLocalBlobRequestSigner(config.SignerPrivateKeyHex)
} else if len(config.SignerPrivateKeyHex) == 0 {
// noop signer is used when we need a read-only eigenda client
signer = auth.NewLocalNoopSigner()
} else {
return nil, fmt.Errorf("invalid length for signer private key")
Expand Down Expand Up @@ -74,7 +81,9 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
}, nil
}

func (m EigenDAClient) GetCodec() codecs.BlobCodec {
// Deprecated: do not rely on this function. Do not use m.Codec directly either.
// These will eventually be removed and not exposed.
func (m *EigenDAClient) GetCodec() codecs.BlobCodec {
return m.Codec
}

Expand All @@ -84,28 +93,30 @@ func (m EigenDAClient) GetCodec() codecs.BlobCodec {
// data, which is necessary for generating KZG proofs for data's correctness.
// The function handles potential errors during blob retrieval, data length
// checks, and decoding processes.
func (m EigenDAClient) GetBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) {
func (m *EigenDAClient) GetBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) {
data, err := m.Client.RetrieveBlob(ctx, batchHeaderHash, blobIndex)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not retrieve blob: %w", err)
}

if len(data) == 0 {
return nil, fmt.Errorf("blob has length zero")
// This should never happen, because empty blobs are rejected from even entering the system:
// https://github.com/Layr-Labs/eigenda/blob/master/disperser/apiserver/server.go#L930
return nil, fmt.Errorf("blob has length zero - this should not be possible")
}

decodedData, err := m.Codec.DecodeBlob(data)
if err != nil {
return nil, fmt.Errorf("error getting blob: %w", err)
return nil, fmt.Errorf("error decoding blob: %w", err)
}

return decodedData, nil
}

// PutBlob encodes and writes a blob to EigenDA, waiting for it to be finalized
// before returning. This function is resiliant to transient failures and
// timeouts.
func (m EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperser.BlobInfo, error) {
// PutBlob encodes and writes a blob to EigenDA, waiting for a desired blob status
// to be reached (guarded by WaitForFinalization config param) before returning.
// This function is resilient to transient failures and timeouts.
func (m *EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperser.BlobInfo, error) {
resultChan, errorChan := m.PutBlobAsync(ctx, data)
select { // no timeout here because we depend on the configured timeout in PutBlobAsync
case result := <-resultChan:
Expand All @@ -115,14 +126,14 @@ func (m EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperser
}
}

func (m EigenDAClient) PutBlobAsync(ctx context.Context, data []byte) (resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
func (m *EigenDAClient) PutBlobAsync(ctx context.Context, data []byte) (resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
resultChan = make(chan *grpcdisperser.BlobInfo, 1)
errChan = make(chan error, 1)
go m.putBlob(ctx, data, resultChan, errChan)
return
}

func (m EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
m.Log.Info("Attempting to disperse blob to EigenDA")

// encode blob
Expand All @@ -142,6 +153,7 @@ func (m EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan c
customQuorumNumbers[i] = uint8(e)
}
// disperse blob
// TODO: would be nice to add a trace-id key to the context, to be able to follow requests from batcher->proxy->eigenda
blobStatus, requestID, err := m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers)
if err != nil {
errChan <- fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err)
Expand All @@ -150,13 +162,12 @@ func (m EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan c

// process response
if *blobStatus == disperser.Failed {
m.Log.Error("Unable to disperse blob to EigenDA, aborting", "err", err)
errChan <- fmt.Errorf("reply status is %d", blobStatus)
errChan <- fmt.Errorf("unable to disperse blob to eigenda (reply status %d): %w", blobStatus, err)
return
}

base64RequestID := base64.StdEncoding.EncodeToString(requestID)
m.Log.Info("Blob dispersed to EigenDA, now waiting for confirmation", "requestID", base64RequestID)
m.Log.Info("Blob accepted by EigenDA disperser, now polling for status updates", "requestID", base64RequestID)

ticker := time.NewTicker(m.Config.StatusQueryRetryInterval)
defer ticker.Stop()
Expand All @@ -175,25 +186,23 @@ func (m EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan c
case <-ticker.C:
statusRes, err := m.Client.GetBlobStatus(ctx, requestID)
if err != nil {
m.Log.Error("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
m.Log.Warn("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
continue
}

switch statusRes.Status {
case grpcdisperser.BlobStatus_PROCESSING, grpcdisperser.BlobStatus_DISPERSING:
// to prevent log clutter, we only log at info level once
if alreadyWaitingForDispersal {
m.Log.Debug("Blob submitted, waiting for dispersal from EigenDA", "requestID", base64RequestID)
m.Log.Debug("Blob is being processed by the EigenDA network", "requestID", base64RequestID)
} else {
m.Log.Info("Blob submitted, waiting for dispersal from EigenDA", "requestID", base64RequestID)
m.Log.Info("Blob is being processed by the EigenDA network", "requestID", base64RequestID)
alreadyWaitingForDispersal = true
}
case grpcdisperser.BlobStatus_FAILED:
m.Log.Error("EigenDA blob dispersal failed in processing", "requestID", base64RequestID, "err", err)
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing, requestID=%s: %w", base64RequestID, err)
return
case grpcdisperser.BlobStatus_INSUFFICIENT_SIGNATURES:
m.Log.Error("EigenDA blob dispersal failed in processing with insufficient signatures", "requestID", base64RequestID, "err", err)
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with insufficient signatures, requestID=%s: %w", base64RequestID, err)
return
case grpcdisperser.BlobStatus_CONFIRMED:
Expand All @@ -212,11 +221,12 @@ func (m EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan c
}
case grpcdisperser.BlobStatus_FINALIZED:
batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash))
m.Log.Info("Successfully dispersed blob to EigenDA", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex)
m.Log.Info("EigenDA blob finalized", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex)
resultChan <- statusRes.Info
return
default:
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with reply status %d", statusRes.Status)
// this should never happen. If it does, the blob is in a heisenberg state... it could either eventually get confirmed or fail
errChan <- fmt.Errorf("unknown reply status %d. ask for assistance from EigenDA team, using requestID %s", statusRes.Status, base64RequestID)
return
}
}
Expand Down
5 changes: 3 additions & 2 deletions api/clients/eigenda_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func TestPutRetrieveBlobIFFTNoDecodeSuccess(t *testing.T) {
(disperserClient.On("RetrieveBlob", mock.Anything, mock.Anything, mock.Anything).
Return(nil, nil).Once()) // pass nil in as the return blob to tell the mock to return the corresponding blob
logger := log.NewLogger(log.DiscardHandler())
ifftCodec := codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec())
eigendaClient := clients.EigenDAClient{
Log: logger,
Config: clients.EigenDAClientConfig{
Expand All @@ -138,7 +139,7 @@ func TestPutRetrieveBlobIFFTNoDecodeSuccess(t *testing.T) {
WaitForFinalization: true,
},
Client: disperserClient,
Codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()),
Codec: ifftCodec,
}
expectedBlob := []byte("dc49e7df326cfb2e7da5cf68f263e1898443ec2e862350606e7dfbda55ad10b5d61ed1d54baf6ae7a86279c1b4fa9c49a7de721dacb211264c1f5df31bade51c")
blobInfo, err := eigendaClient.PutBlob(context.Background(), expectedBlob)
Expand All @@ -148,7 +149,7 @@ func TestPutRetrieveBlobIFFTNoDecodeSuccess(t *testing.T) {

resultBlob, err := eigendaClient.GetBlob(context.Background(), []byte("mock-batch-header-hash"), 100)
require.NoError(t, err)
encodedBlob, err := eigendaClient.GetCodec().EncodeBlob(resultBlob)
encodedBlob, err := ifftCodec.EncodeBlob(resultBlob)
require.NoError(t, err)

resultBlob, err = codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()).DecodeBlob(encodedBlob)
Expand Down
5 changes: 2 additions & 3 deletions api/grpc/disperser/disperser.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions api/grpc/disperser/v2/disperser_v2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions api/proto/disperser/disperser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ message AuthenticationData {

message DisperseBlobRequest {
// The data to be dispersed.
// The size of data must be <= 2MiB. Every 32 bytes of data chunk is interpreted as an integer in big endian format
// The size of data must be <= 16MiB. Every 32 bytes of data is interpreted as an integer in big endian format
// where the lower address has more significant bits. The integer must stay in the valid range to be interpreted
// as a field element on the bn254 curve. The valid range is
// as a field element on the bn254 curve. The valid range is
// 0 <= x < 21888242871839275222246405745257275088548364400416034343698204186575808495617
// containing slightly less than 254 bits and more than 253 bits. If any one of the 32 bytes chunk is outside the range,
// the whole request is deemed as invalid, and rejected.
// If any one of the 32 bytes elements is outside the range, the whole request is deemed as invalid, and rejected.
bytes data = 1;
// The quorums to which the blob will be sent, in addition to the required quorums which are configured
// on the EigenDA smart contract. If required quorums are included here, an error will be returned.
Expand Down
7 changes: 3 additions & 4 deletions api/proto/disperser/v2/disperser_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ service Disperser {

message DisperseBlobRequest {
// The data to be dispersed.
// The size of data must be <= 2MiB. Every 32 bytes of data chunk is interpreted as an integer in big endian format
// The size of data must be <= 16MiB. Every 32 bytes of data is interpreted as an integer in big endian format
// where the lower address has more significant bits. The integer must stay in the valid range to be interpreted
// as a field element on the bn254 curve. The valid range is
// as a field element on the bn254 curve. The valid range is
// 0 <= x < 21888242871839275222246405745257275088548364400416034343698204186575808495617
// containing slightly less than 254 bits and more than 253 bits. If any one of the 32 bytes chunk is outside the range,
// the whole request is deemed as invalid, and rejected.
// If any one of the 32 bytes elements is outside the range, the whole request is deemed as invalid, and rejected.
bytes data = 1;
BlobHeader blob_header = 2;
// signature over keccak hash of the blob_header that can be verified by blob_header.account_id
Expand Down
8 changes: 4 additions & 4 deletions encoding/utils/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
)

// ConvertByPaddingEmptyByte takes bytes and insert an empty byte at the front of every 31 byte.
// The empty byte is padded at the low address, because we use big endian to interpret a fiedl element.
// This ensure every 32 bytes are within the valid range of a field element for bn254 curve.
// If the input data is not a multiple of 31, the reminder is added to the output by
// inserting a 0 and the reminder. The output does not necessarily be a multipler of 32
// The empty byte is padded at the low address, because we use big endian to interpret a field element.
// This ensures every 32 bytes is within the valid range of a field element for bn254 curve.
// If the input data is not a multiple of 31, the remainder is added to the output by
// inserting a 0 and the remainder. The output is thus not necessarily a multiple of 32.
func ConvertByPaddingEmptyByte(data []byte) []byte {
dataSize := len(data)
parseSize := encoding.BYTES_PER_SYMBOL - 1
Expand Down

0 comments on commit 78e4fc6

Please sign in to comment.