Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "S3 relay interface"" #853

Merged
merged 12 commits into from
Nov 1, 2024
102 changes: 91 additions & 11 deletions common/aws/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,48 @@ package aws
import (
"github.com/Layr-Labs/eigenda/common"
"github.com/urfave/cli"
"time"
)

var (
RegionFlagName = "aws.region"
AccessKeyIdFlagName = "aws.access-key-id"
SecretAccessKeyFlagName = "aws.secret-access-key"
EndpointURLFlagName = "aws.endpoint-url"
RegionFlagName = "aws.region"
AccessKeyIdFlagName = "aws.access-key-id"
SecretAccessKeyFlagName = "aws.secret-access-key"
EndpointURLFlagName = "aws.endpoint-url"
FragmentPrefixCharsFlagName = "aws.fragment-prefix-chars"
FragmentParallelismFactorFlagName = "aws.fragment-parallelism-factor"
FragmentParallelismConstantFlagName = "aws.fragment-parallelism-constant"
FragmentReadTimeoutFlagName = "aws.fragment-read-timeout"
FragmentWriteTimeoutFlagName = "aws.fragment-write-timeout"
)

type ClientConfig struct {
Region string
AccessKey string
// Region is the region to use when interacting with S3. Default is "us-east-2".
Region string
// AccessKey to use when interacting with S3.
AccessKey string
// SecretAccessKey to use when interacting with S3.
SecretAccessKey string
EndpointURL string
// EndpointURL of the S3 endpoint to use. If this is not set then the default AWS S3 endpoint will be used.
EndpointURL string

// FragmentPrefixChars is the number of characters of the key to use as the prefix for fragmented files.
// A value of "3" for the key "ABCDEFG" will result in the prefix "ABC". Default is 3.
FragmentPrefixChars int
// FragmentParallelismFactor helps determine the size of the pool of workers to help upload/download files.
// A non-zero value for this parameter adds a number of workers equal to the number of cores times this value.
// Default is 8. In general, the number of workers here can be a lot larger than the number of cores because the
// workers will be blocked on I/O most of the time.
FragmentParallelismFactor int
// FragmentParallelismConstant helps determine the size of the pool of workers to help upload/download files.
// A non-zero value for this parameter adds a constant number of workers. Default is 0.
FragmentParallelismConstant int
// FragmentReadTimeout is used to bound the maximum time to wait for a single fragmented read.
// Default is 30 seconds.
FragmentReadTimeout time.Duration
// FragmentWriteTimeout is used to bound the maximum time to wait for a single fragmented write.
// Default is 30 seconds.
FragmentWriteTimeout time.Duration
Comment on lines +33 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem like S3 specific configs, but they are in the common shared aws config. Should we consider keeping this one only for parameters that are shared between all the aws clients we use (s3, dynamo, etc)?

So perhaps create a config specific in aws/s3/config.go which would have these fragments and would embed the shared aws.ClientConfig params?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your suggestion makes sense. The core difficulty is that aws.ClientConfig is currently used to build an S3 client by disperser/cmd/apiserver, disperser/cmd/batcher, and disperser/cmd/dataapi. Even harder is the fact that these use cases use the aws.ClientConfig object to instantiate both an s3 client and a dynamoDB client, meaning that if I nest aws.ClientConfig inside S3 config then it gets duplicated.

I'm sure it's possible to untangle the mess I describe above, but my concern is scope creep for this PR. I've got some ideas for how we might simplify the way we manage our project's configuration, would you be interested in discussing this offline?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, interested in discussing, no need for scope creep I agree, since these are only used by us internally so its fine to have breaking changes.

That being said, I don't see why having the duplicate ClientConfig is bad. In fact, one could want an s3 client and dynamoDB client with different aws configs (say the dbs live in different regions for eg).

}

func ClientFlags(envPrefix string, flagPrefix string) []cli.Flag {
Expand Down Expand Up @@ -48,14 +76,66 @@ func ClientFlags(envPrefix string, flagPrefix string) []cli.Flag {
Value: "",
EnvVar: common.PrefixEnvVar(envPrefix, "AWS_ENDPOINT_URL"),
},
cli.IntFlag{
Name: common.PrefixFlag(flagPrefix, FragmentPrefixCharsFlagName),
Usage: "The number of characters of the key to use as the prefix for fragmented files",
Required: false,
Value: 3,
EnvVar: common.PrefixEnvVar(envPrefix, "FRAGMENT_PREFIX_CHARS"),
},
cli.IntFlag{
Name: common.PrefixFlag(flagPrefix, FragmentParallelismFactorFlagName),
Usage: "Add this many threads times the number of cores to the worker pool",
Required: false,
Value: 8,
EnvVar: common.PrefixEnvVar(envPrefix, "FRAGMENT_PARALLELISM_FACTOR"),
},
cli.IntFlag{
Name: common.PrefixFlag(flagPrefix, FragmentParallelismConstantFlagName),
Usage: "Add this many threads to the worker pool",
Required: false,
Value: 0,
EnvVar: common.PrefixEnvVar(envPrefix, "FRAGMENT_PARALLELISM_CONSTANT"),
},
cli.DurationFlag{
Name: common.PrefixFlag(flagPrefix, FragmentReadTimeoutFlagName),
Usage: "The maximum time to wait for a single fragmented read",
Required: false,
Value: 30 * time.Second,
EnvVar: common.PrefixEnvVar(envPrefix, "FRAGMENT_READ_TIMEOUT"),
},
cli.DurationFlag{
Name: common.PrefixFlag(flagPrefix, FragmentWriteTimeoutFlagName),
Usage: "The maximum time to wait for a single fragmented write",
Required: false,
Value: 30 * time.Second,
EnvVar: common.PrefixEnvVar(envPrefix, "FRAGMENT_WRITE_TIMEOUT"),
},
}
}

func ReadClientConfig(ctx *cli.Context, flagPrefix string) ClientConfig {
return ClientConfig{
Region: ctx.GlobalString(common.PrefixFlag(flagPrefix, RegionFlagName)),
AccessKey: ctx.GlobalString(common.PrefixFlag(flagPrefix, AccessKeyIdFlagName)),
SecretAccessKey: ctx.GlobalString(common.PrefixFlag(flagPrefix, SecretAccessKeyFlagName)),
EndpointURL: ctx.GlobalString(common.PrefixFlag(flagPrefix, EndpointURLFlagName)),
Region: ctx.GlobalString(common.PrefixFlag(flagPrefix, RegionFlagName)),
AccessKey: ctx.GlobalString(common.PrefixFlag(flagPrefix, AccessKeyIdFlagName)),
SecretAccessKey: ctx.GlobalString(common.PrefixFlag(flagPrefix, SecretAccessKeyFlagName)),
EndpointURL: ctx.GlobalString(common.PrefixFlag(flagPrefix, EndpointURLFlagName)),
FragmentPrefixChars: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentPrefixCharsFlagName)),
FragmentParallelismFactor: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentParallelismFactorFlagName)),
FragmentParallelismConstant: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentParallelismConstantFlagName)),
FragmentReadTimeout: ctx.GlobalDuration(common.PrefixFlag(flagPrefix, FragmentReadTimeoutFlagName)),
FragmentWriteTimeout: ctx.GlobalDuration(common.PrefixFlag(flagPrefix, FragmentWriteTimeoutFlagName)),
}
}

// DefaultClientConfig returns a new ClientConfig with default values.
func DefaultClientConfig() *ClientConfig {
return &ClientConfig{
Region: "us-east-2",
FragmentPrefixChars: 3,
FragmentParallelismFactor: 8,
FragmentParallelismConstant: 0,
FragmentReadTimeout: 30 * time.Second,
FragmentWriteTimeout: 30 * time.Second,
}
}
Loading
Loading