-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add partitioning strategies for S3 storage source #4805
Conversation
WalkthroughThe pull request introduces a comprehensive refactoring across multiple packages, primarily focusing on modifying method signatures to include an Changes
Sequence DiagramsequenceDiagram
participant Client
participant Bidder
participant ExecutionContext
participant ResourceCalculator
participant StorageProvider
Client->>Bidder: RunBidding(context, execution)
Bidder->>ExecutionContext: Extract Job Details
Bidder->>ResourceCalculator: Calculate(context, execution)
ResourceCalculator-->>Bidder: Calculated Resources
Bidder->>StorageProvider: GetVolumeSize(context, execution)
StorageProvider-->>Bidder: Volume Size
Bidder->>Client: Bidding Result
Poem
Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (17)
pkg/storage/s3/storage.go (1)
65-65
: Update Function Documentation for New ParameterThe
GetVolumeSize
method now includes a new parameterexecution *models.Execution
. Please update the function's documentation to reflect the addition of this parameter and explain its purpose.pkg/s3/partitioning.go (1)
276-279
: Redundant Check fortotalPartitions
The check
if totalPartitions <= 0
ingetPartitionIndex
might be redundant sincetotalPartitions
is already validated to be greater than zero inPartitionObjects
. Consider removing this check to simplify the code.pkg/compute/capacity/disk/calculator.go (2)
36-38
: Error Handling for Volume Size CalculationWhen calculating
volumeSize
, if an error occurs, it's wrapped with a generic message. Consider providing more context about which input source caused the error to aid in debugging.Modify the error handling:
- return nil, bacerrors.Wrap(err, "error getting job disk space requirements") + return nil, bacerrors.Wrap(err, fmt.Sprintf("error getting disk space requirements for input source: %s", input.Source))
31-31
: Optimize Disk Requirements CalculationInitialize
totalDiskRequirements
asparsedUsage.Disk
to include any pre-parsed disk usage. Then, accumulate the sizes of input sources to provide a complete disk requirement estimation.Apply this change:
- var totalDiskRequirements uint64 = 0 + totalDiskRequirements := parsedUsage.Diskpkg/storage/inline/storage_test.go (1)
31-31
: Consider adding partition-specific test casesWhile the tests are correctly updated to use mock.Execution(), consider adding test cases that verify behavior with different partition configurations, especially for S3 storage implementation.
func TestPlaintextInlineStorage(t *testing.T) { // ... existing test code ... + t.Run("with_partition_config", func(t *testing.T) { + execution := mock.ExecutionWithPartition(0, 2) // Mock with partition index + size, err := storage.GetVolumeSize(context.Background(), execution, inputSource) + require.NoError(t, err) + // Add assertions for partitioned behavior + }) }Also applies to: 58-58
pkg/storage/ipfs/storage_test.go (1)
Line range hint
55-64
: Consider adding test cases for execution-specific behaviorWhile the test has been updated to include the execution context, it doesn't verify if the execution context affects the volume size calculation.
Consider adding test cases that validate execution-specific scenarios, such as:
- Different execution configurations
- Edge cases with nil execution
pkg/storage/local_directory/storage.go (1)
Line range hint
51-64
: Consider utilizing execution context for volume size calculationThe execution context is currently ignored (
_
), but it could be valuable for:
- Implementing execution-specific path resolution
- Adding execution-based access controls
- Supporting volume size quotas per execution
pkg/s3/errors.go (1)
31-32
: LGTM! Consider adding examples in comments.The change to use variadic parameters for error message formatting is a good improvement. It allows for more flexible error messages, which will be useful for the new partitioning feature.
Consider adding examples in comments to show how to use the new format:
// Example usage: // NewS3InputSourceError(BadRequestErrorCode, "invalid partition strategy: %s", strategy)pkg/s3/types.go (1)
12-18
: Consider using non-pointer types for required fields.The
ObjectSummary
struct uses pointer types for string fields. While this allows for nil values, consider:
- Are these fields truly optional? If not, using non-pointer types would be safer.
- Document why pointer types are used (e.g., for JSON null handling).
pkg/storage/noop/noop.go (1)
72-72
: LGTM! Consider documenting the unused parameter.The execution parameter is marked as unused with
_
. Consider adding a comment explaining why this parameter is needed for interface compatibility but not used in the noop implementation.// _ *models.Execution is unused in noop implementation but required for interface compatibility
pkg/storage/ipfs/storage.go (2)
52-52
: LGTM! Consider documenting the unused parameter.The execution parameter is marked as unused with
_
. Consider adding a comment explaining why this parameter is needed for interface compatibility but not used in the IPFS implementation.// _ *models.Execution is unused in IPFS implementation but required for interface compatibility
Line range hint
55-56
: Consider creating an issue for the TODO comment.The TODO comment about timeout handling has been present for a while and should be addressed.
Would you like me to create a GitHub issue to track this TODO? The issue would focus on improving the timeout handling by:
- Making the timeout configurable via parameters
- Using context timeouts set by the caller
pkg/storage/s3/storage_test.go (1)
Line range hint
63-438
: Consider improving test organization for better maintainability.The test cases comprehensively cover various partitioning scenarios, but the test structure could be improved for better maintainability:
- Consider grouping test cases by partitioning strategy using subtests
- Extract common test setup into helper functions
- Add comments explaining the expected behavior for complex test cases
Example structure:
func (s *StorageTestSuite) TestStorage() { s.Run("No Partitioning", func() { // Test cases for no partitioning }) s.Run("Object Partitioning", func() { // Test cases for object partitioning }) s.Run("Regex Partitioning", func() { // Test cases for regex partitioning }) // ... other strategies }pkg/s3/partitioning_test.go (2)
1251-1268
: Consider splitting the verification logic for better maintainability.The
verifyPartitioning
function handles multiple verification tasks. Consider splitting it into smaller, focused functions:
- Extract the partition generation loop into a separate function
- Create a dedicated function for running all verifications
- Add error context to verification failures
Example refactor:
func (s *PartitionTestSuite) generatePartitions(spec SourceSpec, objects []ObjectSummary, totalPartitions int) ([][]ObjectSummary, error) { partitions := make([][]ObjectSummary, totalPartitions) for i := 0; i < totalPartitions; i++ { partition, err := PartitionObjects(objects, totalPartitions, i, spec) if err != nil { return nil, fmt.Errorf("failed to generate partition %d: %w", i, err) } partitions[i] = partition } return partitions, nil } func (s *PartitionTestSuite) runVerifications(spec SourceSpec, objects []ObjectSummary, partitions [][]ObjectSummary, totalPartitions int, expected [][]string) { s.verifyNoDirectories(partitions) s.verifyComplete(objects, partitions) s.verifyConsistency(spec, objects, totalPartitions) if expected != nil { s.verifyPartitionContents(partitions, expected) } } func (s *PartitionTestSuite) verifyPartitioning(spec SourceSpec, objects []ObjectSummary, totalPartitions int, expected [][]string) { s.Require().NotNil(expected, "expected partition contents must not be nil") s.Require().Equal(totalPartitions, len(expected), "expected partition count must match totalPartitions") partitions, err := s.generatePartitions(spec, objects, totalPartitions) s.Require().NoError(err, "failed to generate partitions") s.runVerifications(spec, objects, partitions, totalPartitions, expected) }
252-420
: Enhance test case documentation for better clarity.While the test cases are comprehensive, consider these improvements:
- Add comments explaining the purpose of each test case
- Use more descriptive test names that indicate the scenario being tested
- Document the rationale behind expected distributions
Example improvements:
func (s *PartitionTestSuite) TestPartitionByObject() { tests := []struct { name string paths []string prefix string totalPartitions int expected [][]string }{ { name: "evenly distributes files across partitions", // More descriptive name paths: []string{ "file1.txt", "file2.txt", "dir/", // directory "file3.txt", "file4.txt", }, prefix: "", totalPartitions: 2, expected: [][]string{ {"file1.txt", "file3.txt"}, // Explain: Files distributed based on hash {"file2.txt", "file4.txt"}, // Explain: Even distribution achieved }, }, // ... other test cases with similar improvements } // ... rest of the function }pkg/s3/partitioning.md (2)
165-182
: Fix markdown code block formatting.The code blocks are missing language specifiers. Add appropriate language specifiers for better syntax highlighting:
- ``` + ```text Original Key | Trimmed Key (used for partitioning) -------------------------------|-------------------------------- ...Also applies to: 175-182, 185-191, 200-204, 209-213, 218-222
🧰 Tools
🪛 Markdownlint (0.37.0)
165-165: null
Fenced code blocks should have a language specified(MD040, fenced-code-language)
175-175: null
Fenced code blocks should have a language specified(MD040, fenced-code-language)
1-258
: Enhance documentation with additional sections.The documentation is comprehensive but could benefit from these additions:
- Add a troubleshooting section covering common issues
- Include performance benchmarks or guidelines
- Add examples of combining multiple partitioning strategies
- Include a section on monitoring and metrics
Example additions:
## Troubleshooting ### Common Issues 1. Uneven Distribution - Check if prefix trimming is working as expected - Verify that the partitioning strategy matches your data pattern 2. Performance Issues - Consider using simpler strategies for large datasets - Monitor regex pattern complexity ## Performance Guidelines Strategy | Overhead | Use Case ------------|----------|---------- Object | Low | Random distribution Regex | High | Complex patterns Substring | Low | Fixed formats Date | Medium | Time series ## Monitoring - Monitor partition sizes for evenness - Track fallback occurrences - Measure processing time per partition🧰 Tools
🪛 LanguageTool
[uncategorized] ~18-~18: A period might be missing here.
Context: ...ng is not needed or when handling small datasets ### 2. Object (`PartitionKeyTypeObject...(AI_EN_LECTOR_MISSING_PUNCTUATION_PERIOD)
[grammar] ~111-~111: This phrase is duplicated. You should probably use “Regex Partitioning” only once.
Context: ... configurations ## Usage Examples ### Regex Partitioning #### Regex Partitioning with Capture Groups ```go config := Par...(PHRASE_REPETITION)
[uncategorized] ~138-~138: Possible missing comma found.
Context: ...rtition objects with keys starting with dates like "2024-01-15-data.csv" ### Substri...(AI_HYDRA_LEO_MISSING_COMMA)
🪛 Markdownlint (0.37.0)
165-165: null
Fenced code blocks should have a language specified(MD040, fenced-code-language)
175-175: null
Fenced code blocks should have a language specified(MD040, fenced-code-language)
185-185: null
Fenced code blocks should have a language specified(MD040, fenced-code-language)
200-200: null
Fenced code blocks should have a language specified(MD040, fenced-code-language)
209-209: null
Fenced code blocks should have a language specified(MD040, fenced-code-language)
218-218: null
Fenced code blocks should have a language specified(MD040, fenced-code-language)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (23)
pkg/compute/bidder.go
(4 hunks)pkg/compute/capacity/calculators.go
(2 hunks)pkg/compute/capacity/disk/calculator.go
(1 hunks)pkg/compute/capacity/types.go
(1 hunks)pkg/s3/errors.go
(1 hunks)pkg/s3/partitioning.go
(1 hunks)pkg/s3/partitioning.md
(1 hunks)pkg/s3/partitioning_test.go
(1 hunks)pkg/s3/types.go
(4 hunks)pkg/storage/inline/storage.go
(1 hunks)pkg/storage/inline/storage_test.go
(2 hunks)pkg/storage/ipfs/storage.go
(1 hunks)pkg/storage/ipfs/storage_test.go
(2 hunks)pkg/storage/local_directory/storage.go
(1 hunks)pkg/storage/local_directory/storage_test.go
(1 hunks)pkg/storage/noop/noop.go
(1 hunks)pkg/storage/s3/storage.go
(9 hunks)pkg/storage/s3/storage_test.go
(10 hunks)pkg/storage/s3/types.go
(2 hunks)pkg/storage/tracing/tracing.go
(1 hunks)pkg/storage/types.go
(1 hunks)pkg/storage/url/urldownload/storage.go
(1 hunks)pkg/storage/url/urldownload/storage_test.go
(2 hunks)
🧰 Additional context used
🪛 LanguageTool
pkg/s3/partitioning.md
[uncategorized] ~18-~18: A period might be missing here.
Context: ...ng is not needed or when handling small datasets ### 2. Object (`PartitionKeyTypeObject...
(AI_EN_LECTOR_MISSING_PUNCTUATION_PERIOD)
[grammar] ~111-~111: This phrase is duplicated. You should probably use “Regex Partitioning” only once.
Context: ... configurations ## Usage Examples ### Regex Partitioning #### Regex Partitioning with Capture Groups ```go config := Par...
(PHRASE_REPETITION)
[uncategorized] ~138-~138: Possible missing comma found.
Context: ...rtition objects with keys starting with dates like "2024-01-15-data.csv" ### Substri...
(AI_HYDRA_LEO_MISSING_COMMA)
🪛 Markdownlint (0.37.0)
pkg/s3/partitioning.md
165-165: null
Fenced code blocks should have a language specified
(MD040, fenced-code-language)
175-175: null
Fenced code blocks should have a language specified
(MD040, fenced-code-language)
185-185: null
Fenced code blocks should have a language specified
(MD040, fenced-code-language)
200-200: null
Fenced code blocks should have a language specified
(MD040, fenced-code-language)
209-209: null
Fenced code blocks should have a language specified
(MD040, fenced-code-language)
218-218: null
Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🔇 Additional comments (25)
pkg/storage/s3/storage.go (7)
80-84
: Proper Error Handling for Object PartitioningThe integration of
s3helper.PartitionObjects
enhances object partitioning based on execution parameters. Error handling is appropriately managed.
88-97
: Robust Checks for Negative Size and OverflowGood implementation of checks for negative object sizes and potential uint64 overflow. This ensures reliability when calculating the total volume size.
100-100
: Type Conversion ConsistencyThe conversion of
object.Size
touint64
is consistent after the negative size check. This maintains data integrity in the size calculation.
131-135
: Effective Partitioning in PrepareStorageThe addition of object partitioning in the
PrepareStorage
method ensures that only relevant objects are processed for the current execution partition. Error handling is correctly implemented.
158-158
: Updated Parameter Type for Object SummaryThe
downloadObject
function now acceptsobject s3helper.ObjectSummary
, aligning with the new object summary structure. This increases code maintainability by using a unified type.
189-194
: Ensure Non-Nil Fields in GetObjectInputWhile constructing
s3.GetObjectInput
, confirm thatobject.Key
,object.VersionID
, andobject.ETag
are not nil to avoid potential issues with the S3 API call.
162-162
:⚠️ Potential issuePotential Nil Pointer Dereference
Before dereferencing
object.Key
with*object.Key
, ensure that it is not nil to prevent a runtime panic.Consider adding a nil check:
if object.Key == nil { return fmt.Errorf("object key is nil") } objectTokens := strings.Split(*object.Key, "/")pkg/s3/partitioning.go (2)
70-75
: Validate Substring Partitioning IndicesIn the
Validate
method, ensure thatStartIndex
is less thanEndIndex
, and both are within acceptable bounds. The error messages are clear and help users correct their configurations.
270-272
: SanitizeKeyForPatternMatching Function LogicThe
sanitizeKeyForPatternMatching
function trims the prefix and leading slash fromobjectKey
. Ensure that this logic correctly handles keys where the prefix might not match or multiple slashes are present.pkg/compute/capacity/calculators.go (2)
24-24
: LGTM: Parameter change aligns with execution-centric designThe change from
job
toexecution
parameter provides necessary context for S3 partitioning while maintaining backward compatibility through the execution's job field.
43-46
: Verify execution context propagationThe execution context is correctly propagated to child calculators. However, we should verify that all calculator implementations handle the new parameter correctly.
✅ Verification successful
Context propagation verified successfully
All calculator implementations (
DefaultsUsageCalculator
,ChainedUsageCalculator
, andDiskUsageCalculator
) correctly handle the context parameter with proper method signatures and context propagation.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Find all implementations of Calculate method to verify parameter changes ast-grep --pattern 'func $_($$) Calculate(ctx context.Context, execution *models.Execution, $$$) ($$$)'Length of output: 105
Script:
#!/bin/bash # Find calculator interface definition and implementations echo "=== Looking for Calculator interface ===" rg "type.*Calculator.*interface" -A 5 echo -e "\n=== Looking for Calculate method implementations ===" rg "func.*Calculate.*\(ctx context\.Context.*\*models\.Execution"Length of output: 833
Script:
#!/bin/bash # List all .go files in capacity directory echo "=== Go files in capacity directory ===" fd -e go . pkg/compute/capacity/ echo -e "\n=== Looking for calculator struct types ===" rg "type.*Calculator.*struct" -A 2 pkg/compute/capacity/Length of output: 2018
Script:
#!/bin/bash echo "=== Calculate implementations in calculators.go ===" rg "func \(.*\) Calculate\(" -A 5 pkg/compute/capacity/calculators.go echo -e "\n=== Calculate implementations in disk/calculator.go ===" rg "func \(.*\) Calculate\(" -A 5 pkg/compute/capacity/disk/calculator.goLength of output: 1245
pkg/storage/types.go (1)
19-19
: Interface change impacts all storage implementationsThe addition of the execution parameter to
GetVolumeSize
is necessary for S3 partitioning but requires careful verification of all implementations.✅ Verification successful
All storage implementations updated correctly
All storage providers have been updated to match the new
GetVolumeSize
signature. Most implementations ignore the execution parameter (using_
) since it's only needed for S3 partitioning, while the S3 and tracing implementations properly utilize it.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Find all Storage interface implementations to verify compliance ast-grep --pattern 'type $_ struct { $$$ } func ($$) GetVolumeSize(context.Context, *models.Execution, models.InputSource) (uint64, error)'Length of output: 144
Script:
#!/bin/bash # Find all GetVolumeSize implementations with any signature echo "=== GetVolumeSize implementations ===" ast-grep --pattern 'func $_GetVolumeSize($$$) (uint64, error)' echo -e "\n=== Files potentially containing Storage implementations ===" fd -e go | rg -l "Storage" echo -e "\n=== Detailed view of GetVolumeSize implementations ===" rg "func.*GetVolumeSize.*\(" -A 1Length of output: 3120
pkg/storage/s3/types.go (2)
38-39
: LGTM: Consistent error message formattingThe error message format is clear and consistent with the rest of the codebase.
75-76
: LGTM: Error handling matches DecodeSourceSpecThe error handling is consistent between both decode functions, maintaining code symmetry.
pkg/compute/capacity/types.go (1)
42-42
: Verify all implementers of UsageCalculator interfaceThe interface change requires updates to all implementing types. Let's verify the changes are consistent across the codebase.
✅ Verification successful
All UsageCalculator implementations are properly updated
Found three implementations (DefaultsUsageCalculator, ChainedUsageCalculator, and DiskUsageCalculator) all using the new signature with
*models.Execution
parameter. No outdated implementations detected.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Find all implementations of UsageCalculator interface ast-grep --pattern 'type $_ struct { $$$ } func ($_ $_) Calculate(ctx context.Context, $_, $_ models.Resources) (*models.Resources, error) { $$$ }'Length of output: 154
Script:
#!/bin/bash # Find implementations of UsageCalculator with updated signature ast-grep --pattern 'type $_ struct { $$$ } func ($_ $_) Calculate(ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) { $$$ }' # Also search for any potentially outdated implementations ast-grep --pattern 'func ($_ $_) Calculate(ctx context.Context, $_, models.Resources) (*models.Resources, error)'Length of output: 304
Script:
#!/bin/bash # Find files containing Calculate method implementations rg -l "Calculate" --type go # Show Calculate method implementations with context rg "func.*Calculate" --type go -B 5 -A 5Length of output: 2645
pkg/storage/tracing/tracing.go (1)
41-45
: LGTM! Clean implementation of the decorator patternThe changes correctly propagate the execution context to the delegate storage implementation while maintaining the tracing functionality.
pkg/storage/ipfs/storage_test.go (1)
Line range hint
108-117
: LGTM! Timeout test properly updatedThe timeout test correctly includes the execution context while maintaining the original timeout verification logic.
pkg/storage/local_directory/storage.go (1)
Line range hint
66-82
: LGTM! Clean implementation of PrepareStorageThe method signature has been updated while maintaining the existing functionality. The ignored execution context (
_
) aligns with the current implementation needs.pkg/s3/types.go (1)
62-62
: LGTM! Error message formatting is consistent.The error message formatting in
DecodeSourceSpec
andDecodePreSignedResultSpec
follows the new variadic parameter pattern introduced inNewS3InputSourceError
.Also applies to: 80-81
pkg/storage/inline/storage.go (1)
67-67
: LGTM! Method signature updated to support execution context.The change aligns with the broader refactoring to support S3 partitioning strategies. The execution parameter is correctly ignored as inline storage doesn't require execution context.
pkg/compute/bidder.go (1)
49-49
: LGTM! Consistent refactoring to execution-centric approach.The changes properly transition from job-centric to execution-centric approach while maintaining the original bidding logic. All job references are correctly accessed via the execution object.
Also applies to: 65-65, 69-69, 80-80, 88-88, 91-91, 128-128, 130-130, 135-135, 142-142
pkg/storage/local_directory/storage_test.go (1)
209-209
: LGTM! Test updated to provide mock execution context.The test correctly uses
mock.Execution()
while maintaining the original test coverage.pkg/storage/url/urldownload/storage.go (1)
83-83
: LGTM! Method signature updated while preserving functionality.The change correctly adds the execution parameter to align with the interface changes. The parameter is properly ignored as URL download doesn't require execution context.
pkg/storage/url/urldownload/storage_test.go (2)
390-390
: LGTM!The test has been correctly updated to include the execution context parameter.
431-431
: LGTM!The error test case has been properly updated to include the execution context parameter.
This PR introduces configurable partitioning strategies for S3 input sources, enabling distributed job executions to efficiently process subsets of S3 objects. When a job is created with multiple executions (N > 1), each execution is assigned a unique partition index (0 to N-1) and will only process its designated subset of objects based on the configured partitioning strategy.
Motivation
Features
Multiple partitioning strategies:
none
: No partitioning, all objects available to all executions (default)object
: Partition by complete object key using consistent hashingregex
: Partition using regex pattern matches from object keyssubstring
: Partition based on a specific portion of object keysdate
: Partition based on dates found in object keysHash-based partitioning using FNV-1a ensures:
Robust handling of edge cases:
Example Usage
Basic object partitioning:
Regex partitioning with capture groups:
Date-based partitioning:
Testing
Summary by CodeRabbit
Based on the comprehensive summary of changes, here are the release notes:
Release Notes
New Features
Improvements
Bug Fixes
Documentation