Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioning strategies for S3 storage source #4805

Merged
merged 1 commit into from
Jan 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pkg/compute/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewBidder(params BidderParams) Bidder {
}

func (b Bidder) RunBidding(ctx context.Context, execution *models.Execution) error {
bidResult, err := b.doBidding(ctx, execution.Job)
bidResult, err := b.doBidding(ctx, execution)
if err != nil {
return b.handleError(ctx, execution, err)
}
Expand All @@ -62,11 +62,11 @@ func (b Bidder) RunBidding(ctx context.Context, execution *models.Execution) err
// | true | true | true |
// | true | false | false |
// | false | N/A | false |
func (b Bidder) doBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) {
func (b Bidder) doBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) {
// NB(forrest): always run semantic bidding before resource bidding since generally there isn't much point in
// calling resource strategies that require DiskUsageCalculator.Calculate (a precursor to checking bidding) if
// semantically the job cannot run.
semanticResponse, err := b.runSemanticBidding(ctx, job)
semanticResponse, err := b.runSemanticBidding(ctx, execution)
if err != nil {
return nil, err
}
Expand All @@ -77,18 +77,18 @@ func (b Bidder) doBidding(ctx context.Context, job *models.Job) (*bidStrategyRes
}

// else the request is semantically biddable, calculate resource usage and check resource-based bidding.
resourceResponse, err := b.runResourceBidding(ctx, job)
resourceResponse, err := b.runResourceBidding(ctx, execution)
if err != nil {
return nil, err
}

return resourceResponse, nil
}

func (b Bidder) runSemanticBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) {
func (b Bidder) runSemanticBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) {
// ask the bidding strategy if we should bid on this job
request := bidstrategy.BidStrategyRequest{
Job: *job,
Job: *execution.Job,
}

// assume we are bidding unless a request is rejected
Expand Down Expand Up @@ -125,21 +125,21 @@ func (b Bidder) runSemanticBidding(ctx context.Context, job *models.Job) (*bidSt
}, nil
}

func (b Bidder) runResourceBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) {
func (b Bidder) runResourceBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) {
// parse job resource config
parsedUsage, err := job.Task().ResourcesConfig.ToResources()
parsedUsage, err := execution.Job.Task().ResourcesConfig.ToResources()
if err != nil {
return nil, fmt.Errorf("parsing job resource config: %w", err)
}
// calculate resource usage of the job, failure here represents a compute failure.
resourceUsage, err := b.usageCalculator.Calculate(ctx, *job, *parsedUsage)
resourceUsage, err := b.usageCalculator.Calculate(ctx, execution, *parsedUsage)
if err != nil {
return nil, fmt.Errorf("calculating resource usage of job: %w", err)
}

// ask the bidding strategy if we should bid on this job
request := bidstrategy.BidStrategyRequest{
Job: *job,
Job: *execution.Job,
}

// assume we are bidding unless a request is rejected
Expand Down
6 changes: 3 additions & 3 deletions pkg/compute/capacity/calculators.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewDefaultsUsageCalculator(params DefaultsUsageCalculatorParams) *DefaultsU
}

func (c *DefaultsUsageCalculator) Calculate(
ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) {
ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) {
return parsedUsage.Merge(c.defaults), nil
}

Expand All @@ -40,10 +40,10 @@ func NewChainedUsageCalculator(params ChainedUsageCalculatorParams) *ChainedUsag
}

func (c *ChainedUsageCalculator) Calculate(
ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) {
ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) {
aggregatedUsage := &parsedUsage
for _, calculator := range c.calculators {
calculatedUsage, err := calculator.Calculate(ctx, job, parsedUsage)
calculatedUsage, err := calculator.Calculate(ctx, execution, parsedUsage)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/compute/capacity/disk/calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ func NewDiskUsageCalculator(params DiskUsageCalculatorParams) *DiskUsageCalculat
}
}

func (c *DiskUsageCalculator) Calculate(ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) {
func (c *DiskUsageCalculator) Calculate(
ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) {
requirements := &models.Resources{}

var totalDiskRequirements uint64 = 0
for _, input := range job.Task().InputSources {
for _, input := range execution.Job.Task().InputSources {
wdbaruni marked this conversation as resolved.
Show resolved Hide resolved
strg, err := c.storages.Get(ctx, input.Source.Type)
if err != nil {
return nil, err
}
volumeSize, err := strg.GetVolumeSize(ctx, *input)
volumeSize, err := strg.GetVolumeSize(ctx, execution, *input)
if err != nil {
return nil, bacerrors.Wrap(err, "error getting job disk space requirements")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/capacity/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type UsageTracker interface {
// UsageCalculator calculates the resource usage of a job.
// Can also be used to populate the resource usage of a job with default values if not defined
type UsageCalculator interface {
Calculate(ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error)
Calculate(ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error)
}

// Provider returns the available capacity of a compute node.
Expand Down
4 changes: 2 additions & 2 deletions pkg/s3/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func NewS3PublisherError(code bacerrors.ErrorCode, message string) bacerrors.Err
WithComponent(PublisherComponent)
}

func NewS3InputSourceError(code bacerrors.ErrorCode, message string) bacerrors.Error {
return bacerrors.New("%s", message).
func NewS3InputSourceError(code bacerrors.ErrorCode, format string, a ...any) bacerrors.Error {
return bacerrors.New(format, a...).
WithCode(code).
WithComponent(InputSourceComponent)
}
Expand Down
Loading
Loading