Skip to content

Commit

Permalink
Add partitioning strategies for S3 storage source
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni committed Jan 12, 2025
1 parent 00b8981 commit b241601
Show file tree
Hide file tree
Showing 23 changed files with 2,123 additions and 85 deletions.
20 changes: 10 additions & 10 deletions pkg/compute/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewBidder(params BidderParams) Bidder {
}

func (b Bidder) RunBidding(ctx context.Context, execution *models.Execution) error {
bidResult, err := b.doBidding(ctx, execution.Job)
bidResult, err := b.doBidding(ctx, execution)
if err != nil {
return b.handleError(ctx, execution, err)
}
Expand All @@ -62,11 +62,11 @@ func (b Bidder) RunBidding(ctx context.Context, execution *models.Execution) err
// | true | true | true |
// | true | false | false |
// | false | N/A | false |
func (b Bidder) doBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) {
func (b Bidder) doBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) {
// NB(forrest): always run semantic bidding before resource bidding since generally there isn't much point in
// calling resource strategies that require DiskUsageCalculator.Calculate (a precursor to checking bidding) if
// semantically the job cannot run.
semanticResponse, err := b.runSemanticBidding(ctx, job)
semanticResponse, err := b.runSemanticBidding(ctx, execution)
if err != nil {
return nil, err
}
Expand All @@ -77,18 +77,18 @@ func (b Bidder) doBidding(ctx context.Context, job *models.Job) (*bidStrategyRes
}

// else the request is semantically biddable, calculate resource usage and check resource-based bidding.
resourceResponse, err := b.runResourceBidding(ctx, job)
resourceResponse, err := b.runResourceBidding(ctx, execution)
if err != nil {
return nil, err
}

return resourceResponse, nil
}

func (b Bidder) runSemanticBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) {
func (b Bidder) runSemanticBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) {
// ask the bidding strategy if we should bid on this job
request := bidstrategy.BidStrategyRequest{
Job: *job,
Job: *execution.Job,
}

// assume we are bidding unless a request is rejected
Expand Down Expand Up @@ -125,21 +125,21 @@ func (b Bidder) runSemanticBidding(ctx context.Context, job *models.Job) (*bidSt
}, nil
}

func (b Bidder) runResourceBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) {
func (b Bidder) runResourceBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) {
// parse job resource config
parsedUsage, err := job.Task().ResourcesConfig.ToResources()
parsedUsage, err := execution.Job.Task().ResourcesConfig.ToResources()
if err != nil {
return nil, fmt.Errorf("parsing job resource config: %w", err)
}
// calculate resource usage of the job, failure here represents a compute failure.
resourceUsage, err := b.usageCalculator.Calculate(ctx, *job, *parsedUsage)
resourceUsage, err := b.usageCalculator.Calculate(ctx, execution, *parsedUsage)
if err != nil {
return nil, fmt.Errorf("calculating resource usage of job: %w", err)
}

// ask the bidding strategy if we should bid on this job
request := bidstrategy.BidStrategyRequest{
Job: *job,
Job: *execution.Job,
}

// assume we are bidding unless a request is rejected
Expand Down
6 changes: 3 additions & 3 deletions pkg/compute/capacity/calculators.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewDefaultsUsageCalculator(params DefaultsUsageCalculatorParams) *DefaultsU
}

func (c *DefaultsUsageCalculator) Calculate(
ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) {
ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) {
return parsedUsage.Merge(c.defaults), nil
}

Expand All @@ -40,10 +40,10 @@ func NewChainedUsageCalculator(params ChainedUsageCalculatorParams) *ChainedUsag
}

func (c *ChainedUsageCalculator) Calculate(
ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) {
ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) {
aggregatedUsage := &parsedUsage
for _, calculator := range c.calculators {
calculatedUsage, err := calculator.Calculate(ctx, job, parsedUsage)
calculatedUsage, err := calculator.Calculate(ctx, execution, parsedUsage)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/compute/capacity/disk/calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ func NewDiskUsageCalculator(params DiskUsageCalculatorParams) *DiskUsageCalculat
}
}

func (c *DiskUsageCalculator) Calculate(ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) {
func (c *DiskUsageCalculator) Calculate(
ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) {
requirements := &models.Resources{}

var totalDiskRequirements uint64 = 0
for _, input := range job.Task().InputSources {
for _, input := range execution.Job.Task().InputSources {
strg, err := c.storages.Get(ctx, input.Source.Type)
if err != nil {
return nil, err
}
volumeSize, err := strg.GetVolumeSize(ctx, *input)
volumeSize, err := strg.GetVolumeSize(ctx, execution, *input)
if err != nil {
return nil, bacerrors.Wrap(err, "error getting job disk space requirements")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/capacity/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type UsageTracker interface {
// UsageCalculator calculates the resource usage of a job.
// Can also be used to populate the resource usage of a job with default values if not defined
type UsageCalculator interface {
Calculate(ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error)
Calculate(ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error)
}

// Provider returns the available capacity of a compute node.
Expand Down
4 changes: 2 additions & 2 deletions pkg/s3/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func NewS3PublisherError(code bacerrors.ErrorCode, message string) bacerrors.Err
WithComponent(PublisherComponent)
}

func NewS3InputSourceError(code bacerrors.ErrorCode, message string) bacerrors.Error {
return bacerrors.New("%s", message).
func NewS3InputSourceError(code bacerrors.ErrorCode, format string, a ...any) bacerrors.Error {
return bacerrors.New(format, a...).
WithCode(code).
WithComponent(InputSourceComponent)
}
Expand Down
Loading

0 comments on commit b241601

Please sign in to comment.