Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ability to control semantics for Freight being made available to a Stage #3257

Merged
555 changes: 299 additions & 256 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions api/v1alpha1/stage_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ func GetStage(
// ListAvailableFreight lists all Freight available to the Stage for any reason.
// This includes:
//
// 1. Any Freight from a Warehouse that the Stage subscribes to directly
// 2. Any Freight that is verified in any upstream Stages (with any applicable soak time elapsed)
// 3. Any Freight that is approved for the Stage
// 1. Any Freight from a Warehouse that the Stage subscribes to directly
// 2. Any Freight that is verified in upstream Stages matching configured AvailabilityStrategy
// (with any applicable soak time elapsed)
// 3. Any Freight that is approved for the Stage
func (s *Stage) ListAvailableFreight(
ctx context.Context,
c client.Client,
Expand Down Expand Up @@ -135,8 +136,9 @@ func (s *Stage) ListAvailableFreight(
var listOpts *ListWarehouseFreightOptions
if !req.Sources.Direct {
listOpts = &ListWarehouseFreightOptions{
ApprovedFor: s.Name,
VerifiedIn: req.Sources.Stages,
ApprovedFor: s.Name,
VerifiedIn: req.Sources.Stages,
AvailabilityStrategy: req.Sources.AvailabilityStrategy,
}
if requiredSoak := req.Sources.RequiredSoakTime; requiredSoak != nil {
listOpts.VerifiedBefore = &metav1.Time{Time: time.Now().Add(-requiredSoak.Duration)}
Expand Down
14 changes: 14 additions & 0 deletions api/v1alpha1/stage_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ type FreightOriginKind string

const FreightOriginKindWarehouse FreightOriginKind = "Warehouse"

// +kubebuilder:validation:Enum={All,OneOf}
type FreightAvailabilityStrategy string

const (
FreightAvailabilityStrategyAll FreightAvailabilityStrategy = "All"
FreightAvailabilityStrategyOneOf FreightAvailabilityStrategy = "OneOf"
)

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name=Shard,type=string,JSONPath=`.spec.shard`
Expand Down Expand Up @@ -264,6 +272,12 @@ type FreightSources struct {
// +kubebuilder:validation:Type=string
// +kubebuilder:validation:Pattern="^([0-9]+(\\.[0-9]+)?(s|m|h))+$"
RequiredSoakTime *metav1.Duration `json:"requiredSoakTime,omitempty" protobuf:"bytes,3,opt,name=requiredSoakTime"`
// AvailabilityStrategy specifies the semantics for how requested Freight is
// made available to the Stage. This field is optional. When left unspecified,
// the field is implicitly treated as if its value were "OneOf".
//
// +kubebuilder:default=OneOf
AvailabilityStrategy FreightAvailabilityStrategy `json:"availabilityStrategy" protobuf:"bytes,4,opt,name=availabilityStrategy"`
}

// PromotionTemplate defines a template for a Promotion that can be used to
Expand Down
80 changes: 68 additions & 12 deletions api/v1alpha1/warehouse_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -86,6 +87,12 @@ type ListWarehouseFreightOptions struct {
// This is useful for filtering out Freight whose soak time has not yet
// elapsed.
VerifiedBefore *metav1.Time
// AvailabilityStrategy specifies the semantics for how Freight is determined
// to be available. If not set, the default is to consider Freight available
// if it has been verified in any of the provided VerifiedIn stages.
// IMPORTANT: This is also applied to Freight matched using the VerifiedBefore
// condition.
AvailabilityStrategy FreightAvailabilityStrategy
}

// ListFreight returns a list of all Freight resources that originated from the
Expand All @@ -101,7 +108,7 @@ func (w *Warehouse) ListFreight(

// Build a list of list options to make multiple queries whose results we will
// merge and de-dupe.
fieldSelectors := make([]fields.Selector, 0, 1+len(opts.VerifiedIn))
fieldSelectors := make([]fields.Selector, 0)
warehouseSelector := fields.OneTermEqualSelector("warehouse", w.Name)
if opts.ApprovedFor == "" && len(opts.VerifiedIn) == 0 {
// Just list all Freight resources that originated from the Warehouse
Expand All @@ -117,14 +124,43 @@ func (w *Warehouse) ListFreight(
),
)
}
for _, stage := range opts.VerifiedIn {
// List all Freight resources that are verified in the specified Stage
fieldSelectors = append(
fieldSelectors,
fields.AndSelectors(
warehouseSelector,

// Construct selectors for listing Freight using the configured AvailabilityStrategy
// semantics.
switch opts.AvailabilityStrategy {
case FreightAvailabilityStrategyAll:
// Query for Freight that is verified in ALL of the VerifiedIn stages.
stageSelectors := make([]fields.Selector, 0, len(opts.VerifiedIn))
for _, stage := range opts.VerifiedIn {
stageSelectors = append(
stageSelectors,
fields.OneTermEqualSelector("verifiedIn", stage),
),
)
}

if len(stageSelectors) > 0 {
fieldSelectors = append(
fieldSelectors,
fields.AndSelectors(
append(stageSelectors, warehouseSelector)...,
),
)
}
case FreightAvailabilityStrategyOneOf, "":
// Query for Freight that is verified in ANY of the VerifiedIn stages.
for _, stage := range opts.VerifiedIn {
fieldSelectors = append(
fieldSelectors,
fields.AndSelectors(
warehouseSelector,
fields.OneTermEqualSelector("verifiedIn", stage),
),
)
}
default:
return nil, fmt.Errorf(
"unsupported AvailabilityStrategy: %s",
opts.AvailabilityStrategy,
)
}

Expand Down Expand Up @@ -164,22 +200,42 @@ func (w *Warehouse) ListFreight(

// Filter out Freight whose soak time has not yet elapsed
filtered := make([]Freight, 0, len(freight))
freightLoop:
for _, f := range freight {
if opts.ApprovedFor != "" {
if f.IsApprovedFor(opts.ApprovedFor) {
filtered = append(filtered, f)
continue
}
}
for _, ver := range f.Status.VerifiedIn {

// Track set of Stages that have passed the verification soak time
// for the Freight.
verifiedStages := sets.New[string]()
for stage, ver := range f.Status.VerifiedIn {
if verifiedAt := ver.VerifiedAt; verifiedAt != nil {
if verifiedAt.Time.Before(opts.VerifiedBefore.Time) {
filtered = append(filtered, f)
continue freightLoop
verifiedStages.Insert(stage)
}
}
}

// Filter out Freight that has passed its verification soak time in ALL
// of the specified VerifiedIn Stages if AvailabilityStrategy is set to All.
// Otherwise, include Freight if it has passed the soak time in a single
// Stage.
if opts.AvailabilityStrategy == FreightAvailabilityStrategyAll {
// If Freight is verified in ALL upstream Stages, then it is
// available.
if verifiedStages.Equal(sets.New(opts.VerifiedIn...)) {
filtered = append(filtered, f)
}
continue
}

// If Freight is verified in ANY upstream Stage, then it is available.
if verifiedStages.Len() > 0 {
filtered = append(filtered, f)
}
}
return filtered, nil
}
120 changes: 119 additions & 1 deletion api/v1alpha1/warehouse_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func TestWarehouse_ListFreight(t *testing.T) {
const testWarehouse = "fake-warehouse"
const testStage = "fake-stage"
const testUpstreamStage = "fake-upstream-stage"
const testUpstreamStage2 = "fake-upstream-stage2"

testCases := []struct {
name string
Expand Down Expand Up @@ -171,7 +172,7 @@ func TestWarehouse_ListFreight(t *testing.T) {
},
},
{
name: "success with options",
name: "success with VerifiedIn and VerifiedBefore options",
opts: &ListWarehouseFreightOptions{
ApprovedFor: testStage,
VerifiedIn: []string{testUpstreamStage},
Expand Down Expand Up @@ -269,6 +270,123 @@ func TestWarehouse_ListFreight(t *testing.T) {
require.Equal(t, "fake-freight-5", freight[1].Name)
},
},
{
name: "success with AvailabilityStrategy set to FreightAvailabilityStrategyAll",
opts: &ListWarehouseFreightOptions{
AvailabilityStrategy: FreightAvailabilityStrategyAll,
ApprovedFor: testStage,
VerifiedIn: []string{testUpstreamStage, testUpstreamStage2},
VerifiedBefore: &metav1.Time{Time: time.Now().Add(-1 * time.Hour)},
},
objects: []client.Object{
&Freight{ // This should be returned as its approved for the Stage
ObjectMeta: metav1.ObjectMeta{
Namespace: testProject,
Name: "fake-freight-1",
},
Origin: FreightOrigin{
Kind: FreightOriginKindWarehouse,
Name: testWarehouse,
},
Status: FreightStatus{
// This is approved for the Stage
ApprovedFor: map[string]ApprovedStage{testStage: {}},
// This is only verified in a single upstream Stage
VerifiedIn: map[string]VerifiedStage{
testUpstreamStage: {
VerifiedAt: ptr.To(metav1.Now()),
},
},
},
},
&Freight{ // This should be returned because its verified in both upstream Stages and soak time has lapsed
ObjectMeta: metav1.ObjectMeta{
Namespace: testProject,
Name: "fake-freight-2",
},
Origin: FreightOrigin{
Kind: FreightOriginKindWarehouse,
Name: testWarehouse,
},
Status: FreightStatus{
// This is not approved for any Stage
ApprovedFor: map[string]ApprovedStage{},
// This is verified in all of the upstream Stages and the soak time has lapsed
VerifiedIn: map[string]VerifiedStage{
testUpstreamStage: {
VerifiedAt: ptr.To(metav1.NewTime(time.Now().Add(-2 * time.Hour))),
},
testUpstreamStage2: {
VerifiedAt: ptr.To(metav1.NewTime(time.Now().Add(-2 * time.Hour))),
},
},
},
},
&Freight{ // This should not be returned because it's not verified in all of the upstream Stages
ObjectMeta: metav1.ObjectMeta{
Namespace: testProject,
Name: "fake-freight-3",
},
Origin: FreightOrigin{
Kind: FreightOriginKindWarehouse,
Name: testWarehouse,
},
Status: FreightStatus{
// This is not approved for any Stage
ApprovedFor: map[string]ApprovedStage{},
// This is not verified in all of the upstream Stages
VerifiedIn: map[string]VerifiedStage{
testUpstreamStage: {
VerifiedAt: ptr.To(metav1.Now()),
},
},
},
},
&Freight{ // This should not be returned because its not passed the soak time in all Stages
ObjectMeta: metav1.ObjectMeta{
Namespace: testProject,
Name: "fake-freight-4",
},
Origin: FreightOrigin{
Kind: FreightOriginKindWarehouse,
Name: testWarehouse,
},
Status: FreightStatus{
// This is not approved for any Stage
ApprovedFor: map[string]ApprovedStage{},
// This is verified in all of the upstream Stages but only passed the soak time of one
VerifiedIn: map[string]VerifiedStage{
testUpstreamStage: {
VerifiedAt: ptr.To(metav1.NewTime(time.Now().Add(-2 * time.Hour))),
},
testUpstreamStage2: {
VerifiedAt: ptr.To(metav1.Now()),
},
},
},
},
},
assertions: func(t *testing.T, freight []Freight, err error) {
require.NoError(t, err)
require.Len(t, freight, 2)
require.Equal(t, testProject, freight[0].Namespace)
require.Equal(t, "fake-freight-1", freight[0].Name)
require.Equal(t, testProject, freight[1].Namespace)
require.Equal(t, "fake-freight-2", freight[1].Name)
},
},
{
name: "failure with invalid AvailabilityStrategy",
opts: &ListWarehouseFreightOptions{
AvailabilityStrategy: "Invalid",
ApprovedFor: testStage,
VerifiedIn: []string{testUpstreamStage, testUpstreamStage2},
VerifiedBefore: &metav1.Time{Time: time.Now().Add(-1 * time.Hour)},
},
assertions: func(t *testing.T, _ []Freight, err error) {
require.ErrorContains(t, err, "unsupported AvailabilityStrategy")
},
},
}

testScheme := k8sruntime.NewScheme()
Expand Down
12 changes: 12 additions & 0 deletions charts/kargo/resources/crds/kargo.akuity.io_stages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,16 @@ spec:
Sources describes where the requested Freight may be obtained from. This is
a required field.
properties:
availabilityStrategy:
default: OneOf
description: |-
AvailabilityStrategy specifies the semantics for how requested Freight is
made available to the Stage. This field is optional. When left unspecified,
the field is implicitly treated as if its value were "OneOf".
enum:
- All
- OneOf
type: string
direct:
description: |-
Direct indicates the requested Freight may be obtained directly from the
Expand Down Expand Up @@ -288,6 +298,8 @@ spec:
items:
type: string
type: array
required:
- availabilityStrategy
type: object
required:
- origin
Expand Down
12 changes: 12 additions & 0 deletions ui/src/gen/schema/stages.kargo.akuity.io_v1alpha1.json
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@
"sources": {
"description": "Sources describes where the requested Freight may be obtained from. This is\na required field.",
"properties": {
"availabilityStrategy": {
"default": "OneOf",
"description": "AvailabilityStrategy specifies the semantics for how requested Freight is\nmade available to the Stage. This field is optional. When left unspecified,\nthe field is implicitly treated as if its value were \"OneOf\".",
"enum": [
"All",
"OneOf"
],
"type": "string"
},
"direct": {
"description": "Direct indicates the requested Freight may be obtained directly from the\nWarehouse from which it originated. If this field's value is false, then\nthe value of the Stages field must be non-empty. i.e. Between the two\nfields, at least one source must be specified.",
"type": "boolean"
Expand All @@ -194,6 +203,9 @@
"type": "array"
}
},
"required": [
"availabilityStrategy"
],
"type": "object"
}
},
Expand Down
13 changes: 12 additions & 1 deletion ui/src/gen/v1alpha1/generated_pb.ts

Large diffs are not rendered by default.

Loading