Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use keys that do not collide when data is stored in a single bucket. #865

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions common/aws/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ type ClientConfig struct {
// EndpointURL of the S3 endpoint to use. If this is not set then the default AWS S3 endpoint will be used.
EndpointURL string

// FragmentPrefixChars is the number of characters of the key to use as the prefix for fragmented files.
// A value of "3" for the key "ABCDEFG" will result in the prefix "ABC". Default is 3.
FragmentPrefixChars int
// FragmentParallelismFactor helps determine the size of the pool of workers to help upload/download files.
// A non-zero value for this parameter adds a number of workers equal to the number of cores times this value.
// Default is 8. In general, the number of workers here can be a lot larger than the number of cores because the
Expand Down Expand Up @@ -120,7 +117,6 @@ func ReadClientConfig(ctx *cli.Context, flagPrefix string) ClientConfig {
AccessKey: ctx.GlobalString(common.PrefixFlag(flagPrefix, AccessKeyIdFlagName)),
SecretAccessKey: ctx.GlobalString(common.PrefixFlag(flagPrefix, SecretAccessKeyFlagName)),
EndpointURL: ctx.GlobalString(common.PrefixFlag(flagPrefix, EndpointURLFlagName)),
FragmentPrefixChars: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentPrefixCharsFlagName)),
FragmentParallelismFactor: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentParallelismFactorFlagName)),
FragmentParallelismConstant: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentParallelismConstantFlagName)),
FragmentReadTimeout: ctx.GlobalDuration(common.PrefixFlag(flagPrefix, FragmentReadTimeoutFlagName)),
Expand All @@ -132,7 +128,6 @@ func ReadClientConfig(ctx *cli.Context, flagPrefix string) ClientConfig {
func DefaultClientConfig() *ClientConfig {
return &ClientConfig{
Region: "us-east-2",
FragmentPrefixChars: 3,
FragmentParallelismFactor: 8,
FragmentParallelismConstant: 0,
FragmentReadTimeout: 30 * time.Second,
Expand Down
6 changes: 3 additions & 3 deletions common/aws/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type client struct {

var _ Client = (*client)(nil)

func NewClient(ctx context.Context, cfg commonaws.ClientConfig, logger logging.Logger) (*client, error) {
func NewClient(ctx context.Context, cfg commonaws.ClientConfig, logger logging.Logger) (Client, error) {
var err error
once.Do(func() {
customResolver := aws.EndpointResolverWithOptionsFunc(
Expand Down Expand Up @@ -196,7 +196,7 @@ func (s *client) FragmentedUploadObject(
data []byte,
fragmentSize int) error {

fragments, err := breakIntoFragments(key, data, s.cfg.FragmentPrefixChars, fragmentSize)
fragments, err := breakIntoFragments(key, data, fragmentSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func (s *client) FragmentedDownloadObject(
return nil, errors.New("fragmentSize must be greater than 0")
}

fragmentKeys, err := getFragmentKeys(key, s.cfg.FragmentPrefixChars, getFragmentCount(fileSize, fragmentSize))
fragmentKeys, err := getFragmentKeys(key, getFragmentCount(fileSize, fragmentSize))
if err != nil {
return nil, err
}
Expand Down
27 changes: 10 additions & 17 deletions common/aws/s3/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,12 @@ func getFragmentCount(fileSize int, fragmentSize int) int {

// getFragmentKey returns the key for the fragment at the given index.
//
// Fragment keys take the form of "prefix/body-index[f]". The prefix is the first prefixLength characters
// of the file key. The body is the file key. The index is the index of the fragment. The character "f" is appended
// to the key of the last fragment in the series.
// Fragment keys take the form of "body-index[f]". The index is the index of the fragment. The character "f" is
// appended to the key of the last fragment in the series.
//
// Example: fileKey="abc123", prefixLength=2, fragmentCount=3
// The keys will be "ab/abc123-0", "ab/abc123-1", "ab/abc123-2f"
func getFragmentKey(fileKey string, prefixLength int, fragmentCount int, index int) (string, error) {
var prefix string
if prefixLength > len(fileKey) {
prefix = fileKey
} else {
prefix = fileKey[:prefixLength]
}
// Example: fileKey="abc123", fragmentCount=3
// The keys will be "abc123-0", "abc123-1", "abc123-2f"
func getFragmentKey(fileKey string, fragmentCount int, index int) (string, error) {

postfix := ""
if fragmentCount-1 == index {
Expand All @@ -42,7 +35,7 @@ func getFragmentKey(fileKey string, prefixLength int, fragmentCount int, index i
return "", fmt.Errorf("index %d is too high for fragment count %d", index, fragmentCount)
}

return fmt.Sprintf("%s/%s-%d%s", prefix, fileKey, index, postfix), nil
return fmt.Sprintf("%s-%d%s", fileKey, index, postfix), nil
}

// Fragment is a subset of a file.
Expand All @@ -53,7 +46,7 @@ type Fragment struct {
}

// breakIntoFragments breaks a file into fragments of the given size.
func breakIntoFragments(fileKey string, data []byte, prefixLength int, fragmentSize int) ([]*Fragment, error) {
func breakIntoFragments(fileKey string, data []byte, fragmentSize int) ([]*Fragment, error) {
fragmentCount := getFragmentCount(len(data), fragmentSize)
fragments := make([]*Fragment, fragmentCount)
for i := 0; i < fragmentCount; i++ {
Expand All @@ -63,7 +56,7 @@ func breakIntoFragments(fileKey string, data []byte, prefixLength int, fragmentS
end = len(data)
}

fragmentKey, err := getFragmentKey(fileKey, prefixLength, fragmentCount, i)
fragmentKey, err := getFragmentKey(fileKey, fragmentCount, i)
if err != nil {
return nil, err
}
Expand All @@ -77,10 +70,10 @@ func breakIntoFragments(fileKey string, data []byte, prefixLength int, fragmentS
}

// getFragmentKeys returns the keys for all fragments of a file.
func getFragmentKeys(fileKey string, prefixLength int, fragmentCount int) ([]string, error) {
func getFragmentKeys(fileKey string, fragmentCount int) ([]string, error) {
keys := make([]string, fragmentCount)
for i := 0; i < fragmentCount; i++ {
fragmentKey, err := getFragmentKey(fileKey, prefixLength, fragmentCount, i)
fragmentKey, err := getFragmentKey(fileKey, fragmentCount, i)
if err != nil {
return nil, err
}
Expand Down
90 changes: 24 additions & 66 deletions common/aws/s3/fragment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,6 @@ func TestGetFragmentCount(t *testing.T) {
assert.Equal(t, expectedFragmentCount, fragmentCount)
}

// Fragment keys take the form of "prefix/body-index[f]". Verify the prefix part of the key.
func TestPrefix(t *testing.T) {
tu.InitializeRandom()

keyLength := rand.Intn(10) + 10
key := tu.RandomString(keyLength)

for i := 0; i < keyLength*2; i++ {
fragmentCount := rand.Intn(10) + 10
fragmentIndex := rand.Intn(fragmentCount)
fragmentKey, err := getFragmentKey(key, i, fragmentCount, fragmentIndex)
assert.NoError(t, err)

parts := strings.Split(fragmentKey, "/")
assert.Equal(t, 2, len(parts))
prefix := parts[0]

if i >= keyLength {
assert.Equal(t, key, prefix)
} else {
assert.Equal(t, key[:i], prefix)
}
}
}

// Fragment keys take the form of "prefix/body-index[f]". Verify the body part of the key.
func TestKeyBody(t *testing.T) {
tu.InitializeRandom()
Expand All @@ -86,12 +61,10 @@ func TestKeyBody(t *testing.T) {
key := tu.RandomString(keyLength)
fragmentCount := rand.Intn(10) + 10
fragmentIndex := rand.Intn(fragmentCount)
fragmentKey, err := getFragmentKey(key, rand.Intn(10), fragmentCount, fragmentIndex)
fragmentKey, err := getFragmentKey(key, fragmentCount, fragmentIndex)
assert.NoError(t, err)

parts := strings.Split(fragmentKey, "/")
assert.Equal(t, 2, len(parts))
parts = strings.Split(parts[1], "-")
parts := strings.Split(fragmentKey, "-")
assert.Equal(t, 2, len(parts))
body := parts[0]

Expand All @@ -106,12 +79,10 @@ func TestKeyIndex(t *testing.T) {
for i := 0; i < 10; i++ {
fragmentCount := rand.Intn(10) + 10
index := rand.Intn(fragmentCount)
fragmentKey, err := getFragmentKey(tu.RandomString(10), rand.Intn(10), fragmentCount, index)
fragmentKey, err := getFragmentKey(tu.RandomString(10), fragmentCount, index)
assert.NoError(t, err)

parts := strings.Split(fragmentKey, "/")
assert.Equal(t, 2, len(parts))
parts = strings.Split(parts[1], "-")
parts := strings.Split(fragmentKey, "-")
assert.Equal(t, 2, len(parts))
indexStr := parts[1]
assert.True(t, strings.HasPrefix(indexStr, fmt.Sprintf("%d", index)))
Expand All @@ -126,7 +97,7 @@ func TestKeyPostfix(t *testing.T) {
segmentCount := rand.Intn(10) + 10

for i := 0; i < segmentCount; i++ {
fragmentKey, err := getFragmentKey(tu.RandomString(10), rand.Intn(10), segmentCount, i)
fragmentKey, err := getFragmentKey(tu.RandomString(10), segmentCount, i)
assert.NoError(t, err)

if i == segmentCount-1 {
Expand All @@ -139,41 +110,35 @@ func TestKeyPostfix(t *testing.T) {

// TestExampleInGodoc tests the example provided in the documentation for getFragmentKey().
//
// Example: fileKey="abc123", prefixLength=2, fragmentCount=3
// The keys will be "ab/abc123-0", "ab/abc123-1", "ab/abc123-2f"
// Example: fileKey="abc123", fragmentCount=3
// The keys will be "abc123-0", "abc123-1", "abc123-2f"
func TestExampleInGodoc(t *testing.T) {
fileKey := "abc123"
prefixLength := 2
fragmentCount := 3
fragmentKeys, err := getFragmentKeys(fileKey, prefixLength, fragmentCount)
fragmentKeys, err := getFragmentKeys(fileKey, fragmentCount)
assert.NoError(t, err)
assert.Equal(t, 3, len(fragmentKeys))
assert.Equal(t, "ab/abc123-0", fragmentKeys[0])
assert.Equal(t, "ab/abc123-1", fragmentKeys[1])
assert.Equal(t, "ab/abc123-2f", fragmentKeys[2])
assert.Equal(t, "abc123-0", fragmentKeys[0])
assert.Equal(t, "abc123-1", fragmentKeys[1])
assert.Equal(t, "abc123-2f", fragmentKeys[2])
}

func TestGetFragmentKeys(t *testing.T) {
tu.InitializeRandom()

fileKey := tu.RandomString(10)
prefixLength := rand.Intn(3) + 1
fragmentCount := rand.Intn(10) + 10

fragmentKeys, err := getFragmentKeys(fileKey, prefixLength, fragmentCount)
fragmentKeys, err := getFragmentKeys(fileKey, fragmentCount)
assert.NoError(t, err)
assert.Equal(t, fragmentCount, len(fragmentKeys))

for i := 0; i < fragmentCount; i++ {
expectedKey, err := getFragmentKey(fileKey, prefixLength, fragmentCount, i)
expectedKey, err := getFragmentKey(fileKey, fragmentCount, i)
assert.NoError(t, err)
assert.Equal(t, expectedKey, fragmentKeys[i])

parts := strings.Split(fragmentKeys[i], "/")
assert.Equal(t, 2, len(parts))
parsedPrefix := parts[0]
assert.Equal(t, fileKey[:prefixLength], parsedPrefix)
parts = strings.Split(parts[1], "-")
parts := strings.Split(fragmentKeys[i], "-")
assert.Equal(t, 2, len(parts))
parsedKey := parts[0]
assert.Equal(t, fileKey, parsedKey)
Expand All @@ -192,17 +157,16 @@ func TestGetFragments(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(1000)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
assert.Equal(t, getFragmentCount(len(data), fragmentSize), len(fragments))

totalSize := 0

for i, fragment := range fragments {
fragmentKey, err := getFragmentKey(fileKey, prefixLength, len(fragments), i)
fragmentKey, err := getFragmentKey(fileKey, len(fragments), i)
assert.NoError(t, err)
assert.Equal(t, fragmentKey, fragment.FragmentKey)

Expand All @@ -224,14 +188,13 @@ func TestGetFragmentsSmallFile(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(10)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
assert.Equal(t, 1, len(fragments))

fragmentKey, err := getFragmentKey(fileKey, prefixLength, 1, 0)
fragmentKey, err := getFragmentKey(fileKey, 1, 0)
assert.NoError(t, err)
assert.Equal(t, fragmentKey, fragments[0].FragmentKey)
assert.Equal(t, data, fragments[0].Data)
Expand All @@ -244,13 +207,12 @@ func TestGetFragmentsExactlyOnePerfectlySizedFile(t *testing.T) {
fileKey := tu.RandomString(10)
fragmentSize := rand.Intn(100) + 100
data := tu.RandomBytes(fragmentSize)
prefixLength := rand.Intn(3) + 1

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
assert.Equal(t, 1, len(fragments))

fragmentKey, err := getFragmentKey(fileKey, prefixLength, 1, 0)
fragmentKey, err := getFragmentKey(fileKey, 1, 0)
assert.NoError(t, err)
assert.Equal(t, fragmentKey, fragments[0].FragmentKey)
assert.Equal(t, data, fragments[0].Data)
Expand All @@ -262,10 +224,9 @@ func TestRecombineFragments(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(1000)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
recombinedData, err := recombineFragments(fragments)
assert.NoError(t, err)
Expand All @@ -287,10 +248,9 @@ func TestRecombineFragmentsSmallFile(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(10)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
assert.Equal(t, 1, len(fragments))
recombinedData, err := recombineFragments(fragments)
Expand All @@ -303,10 +263,9 @@ func TestMissingFragment(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(1000)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)

fragmentIndexToSkip := rand.Intn(len(fragments))
Expand All @@ -321,10 +280,9 @@ func TestMissingFinalFragment(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(1000)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
fragments = fragments[:len(fragments)-1]

Expand Down
55 changes: 55 additions & 0 deletions common/aws/s3/scoped_keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package s3

import (
"fmt"
v2 "github.com/Layr-Labs/eigenda/core/v2"
)

const (
// prefixLength is the number of characters to use from the base key to form the prefix.
// Assuming keys take the form of a random hash in hex, 3 will yield 16^3 = 4096 possible prefixes.
// This is currently hard coded because it is not expected to change, and it would require migration
// to change it that we have not yet implemented.
prefixLength = 3

// blobNamespace is the namespace for a blob key.
blobNamespace = "blob"

// chunkNamespace is the namespace for a chunk key.
chunkNamespace = "chunk"

// proofNamespace is the namespace for a proof key.
proofNamespace = "proof"
)

// ScopedKey returns a key that is scoped to a "namespace". Keys take the form of "prefix/namespace/baseKey".
// Although there is no runtime enforcement, neither the base key nor the namespace should contain any
// non-alphanumeric characters.
func ScopedKey(namespace string, baseKey string, prefixLength int) string {
var prefix string
if prefixLength > len(baseKey) {
prefix = baseKey
} else {
prefix = baseKey[:prefixLength]
}

return fmt.Sprintf("%s/%s/%s", prefix, namespace, baseKey)
}

// ScopedBlobKey returns a key scoped to the blob namespace. Used to name files containing blobs in S3.
// A key scoped for blobs will never collide with a key scoped for chunks or proofs.
func ScopedBlobKey(blobKey v2.BlobKey) string {
return ScopedKey(blobNamespace, blobKey.Hex(), prefixLength)
}

// ScopedChunkKey returns a key scoped to the chunk namespace. Used to name files containing chunks in S3.
// A key scoped for chunks will never collide with a key scoped for blobs or proofs.
func ScopedChunkKey(blobKey v2.BlobKey) string {
return ScopedKey(chunkNamespace, blobKey.Hex(), prefixLength)
}

// ScopedProofKey returns a key scoped to the proof namespace. Used to name files containing proofs in S3.
// A key scoped for proofs will never collide with a key scoped for blobs or chunks.
func ScopedProofKey(blobKey v2.BlobKey) string {
return ScopedKey(proofNamespace, blobKey.Hex(), prefixLength)
}
Loading
Loading