Skip to content

Commit

Permalink
Add partitioning strategies for S3 storage source (#4805)
Browse files Browse the repository at this point in the history
This PR introduces configurable partitioning strategies for S3 input
sources, enabling distributed job executions to efficiently process
subsets of S3 objects. When a job is created with multiple executions (N
> 1), each execution is assigned a unique partition index (0 to N-1) and
will only process its designated subset of objects based on the
configured partitioning strategy.

## Motivation
- Enable parallel processing of large S3 datasets across multiple job
executions
- Allow users to control how objects are distributed based on their data
organization patterns
- Provide deterministic object distribution for reproducible results

## Features
- Multiple partitioning strategies:
- `none`: No partitioning, all objects available to all executions
(default)
  - `object`: Partition by complete object key using consistent hashing
  - `regex`: Partition using regex pattern matches from object keys
  - `substring`: Partition based on a specific portion of object keys
  - `date`: Partition based on dates found in object keys

- Hash-based partitioning using FNV-1a ensures:
  - Deterministic assignment of objects to partitions
  - Distribution based on the chosen strategy and input data patterns

- Robust handling of edge cases:
  - Fallback to partition 0 for unmatched objects
  - Proper handling of directories and empty paths
  - Unicode support for substring partitioning

## Example Usage

Basic object partitioning:
```yaml
  source:
      type: s3
      params:
        bucket: mybucket
        key: data/*
        partition:
          type: object
```

Regex partitioning with capture groups:
```yaml

  source:
    type: s3
    params:
        bucket: mybucket
        key: data/*
        partition:
          type: regex
          pattern: "data/(\d{4})/(\d{2})/.*\.csv"
```

Date-based partitioning:
```yaml
  source:
    type: s3
      params:
        bucket: mybucket
        key: logs/*
        partition:
          type: date
          dateFormat: "2006-01-02"
```


## Testing
- Unit tests covering all partitioning strategies
- Integration tests with actual S3 storage
- Edge case handling and error scenarios
- Distribution analysis with various input patterns


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

Based on the comprehensive summary of changes, here are the release
notes:

## Release Notes

- **New Features**
- Added S3 Object Partitioning system with support for multiple
partitioning strategies (Object, Regex, Substring, Date)
- Enhanced storage and compute modules to support execution-level
context

- **Improvements**
- Refined method signatures across multiple packages to include
execution context
- Updated error handling and message formatting in various storage and
compute modules
  - Improved flexibility in resource calculation and bidding strategies

- **Bug Fixes**
- Updated volume size calculation methods to handle more complex input
scenarios
  - Enhanced validation for storage and partitioning configurations

- **Documentation**
  - Added comprehensive documentation for S3 Object Partitioning system
  - Improved inline documentation for new features and method changes

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
wdbaruni authored Jan 12, 2025
1 parent 00b8981 commit 7bd50cc
Show file tree
Hide file tree
Showing 23 changed files with 2,123 additions and 85 deletions.
20 changes: 10 additions & 10 deletions pkg/compute/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewBidder(params BidderParams) Bidder {
}

func (b Bidder) RunBidding(ctx context.Context, execution *models.Execution) error {
bidResult, err := b.doBidding(ctx, execution.Job)
bidResult, err := b.doBidding(ctx, execution)
if err != nil {
return b.handleError(ctx, execution, err)
}
Expand All @@ -62,11 +62,11 @@ func (b Bidder) RunBidding(ctx context.Context, execution *models.Execution) err
// | true | true | true |
// | true | false | false |
// | false | N/A | false |
func (b Bidder) doBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) {
func (b Bidder) doBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) {
// NB(forrest): always run semantic bidding before resource bidding since generally there isn't much point in
// calling resource strategies that require DiskUsageCalculator.Calculate (a precursor to checking bidding) if
// semantically the job cannot run.
semanticResponse, err := b.runSemanticBidding(ctx, job)
semanticResponse, err := b.runSemanticBidding(ctx, execution)
if err != nil {
return nil, err
}
Expand All @@ -77,18 +77,18 @@ func (b Bidder) doBidding(ctx context.Context, job *models.Job) (*bidStrategyRes
}

// else the request is semantically biddable, calculate resource usage and check resource-based bidding.
resourceResponse, err := b.runResourceBidding(ctx, job)
resourceResponse, err := b.runResourceBidding(ctx, execution)
if err != nil {
return nil, err
}

return resourceResponse, nil
}

func (b Bidder) runSemanticBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) {
func (b Bidder) runSemanticBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) {
// ask the bidding strategy if we should bid on this job
request := bidstrategy.BidStrategyRequest{
Job: *job,
Job: *execution.Job,
}

// assume we are bidding unless a request is rejected
Expand Down Expand Up @@ -125,21 +125,21 @@ func (b Bidder) runSemanticBidding(ctx context.Context, job *models.Job) (*bidSt
}, nil
}

func (b Bidder) runResourceBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) {
func (b Bidder) runResourceBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) {
// parse job resource config
parsedUsage, err := job.Task().ResourcesConfig.ToResources()
parsedUsage, err := execution.Job.Task().ResourcesConfig.ToResources()
if err != nil {
return nil, fmt.Errorf("parsing job resource config: %w", err)
}
// calculate resource usage of the job, failure here represents a compute failure.
resourceUsage, err := b.usageCalculator.Calculate(ctx, *job, *parsedUsage)
resourceUsage, err := b.usageCalculator.Calculate(ctx, execution, *parsedUsage)
if err != nil {
return nil, fmt.Errorf("calculating resource usage of job: %w", err)
}

// ask the bidding strategy if we should bid on this job
request := bidstrategy.BidStrategyRequest{
Job: *job,
Job: *execution.Job,
}

// assume we are bidding unless a request is rejected
Expand Down
6 changes: 3 additions & 3 deletions pkg/compute/capacity/calculators.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewDefaultsUsageCalculator(params DefaultsUsageCalculatorParams) *DefaultsU
}

func (c *DefaultsUsageCalculator) Calculate(
ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) {
ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) {
return parsedUsage.Merge(c.defaults), nil
}

Expand All @@ -40,10 +40,10 @@ func NewChainedUsageCalculator(params ChainedUsageCalculatorParams) *ChainedUsag
}

func (c *ChainedUsageCalculator) Calculate(
ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) {
ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) {
aggregatedUsage := &parsedUsage
for _, calculator := range c.calculators {
calculatedUsage, err := calculator.Calculate(ctx, job, parsedUsage)
calculatedUsage, err := calculator.Calculate(ctx, execution, parsedUsage)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/compute/capacity/disk/calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ func NewDiskUsageCalculator(params DiskUsageCalculatorParams) *DiskUsageCalculat
}
}

func (c *DiskUsageCalculator) Calculate(ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) {
func (c *DiskUsageCalculator) Calculate(
ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) {
requirements := &models.Resources{}

var totalDiskRequirements uint64 = 0
for _, input := range job.Task().InputSources {
for _, input := range execution.Job.Task().InputSources {
strg, err := c.storages.Get(ctx, input.Source.Type)
if err != nil {
return nil, err
}
volumeSize, err := strg.GetVolumeSize(ctx, *input)
volumeSize, err := strg.GetVolumeSize(ctx, execution, *input)
if err != nil {
return nil, bacerrors.Wrap(err, "error getting job disk space requirements")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/capacity/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type UsageTracker interface {
// UsageCalculator calculates the resource usage of a job.
// Can also be used to populate the resource usage of a job with default values if not defined
type UsageCalculator interface {
Calculate(ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error)
Calculate(ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error)
}

// Provider returns the available capacity of a compute node.
Expand Down
4 changes: 2 additions & 2 deletions pkg/s3/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func NewS3PublisherError(code bacerrors.ErrorCode, message string) bacerrors.Err
WithComponent(PublisherComponent)
}

func NewS3InputSourceError(code bacerrors.ErrorCode, message string) bacerrors.Error {
return bacerrors.New("%s", message).
func NewS3InputSourceError(code bacerrors.ErrorCode, format string, a ...any) bacerrors.Error {
return bacerrors.New(format, a...).
WithCode(code).
WithComponent(InputSourceComponent)
}
Expand Down
Loading

0 comments on commit 7bd50cc

Please sign in to comment.