Skip to content

Commit

Permalink
feat: add support for only promoting Freight that has been verified i…
Browse files Browse the repository at this point in the history
…n all upstream Stages
  • Loading branch information
aidan-canva committed Jan 11, 2025
1 parent 759a417 commit c6645f0
Show file tree
Hide file tree
Showing 11 changed files with 585 additions and 254 deletions.
13 changes: 13 additions & 0 deletions api/v1alpha1/freight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,19 @@ func (f *Freight) IsVerifiedIn(stage string) bool {
return verified
}

// IsVerifiedInAll returns whether the Freight has been verified in all
// Stages.
func (f *Freight) IsVerifiedInAll(stages []string) bool {
// NB: This method exists for convenience. It doesn't require the caller to
// know anything about the Freight status' internal data structure.
for _, s := range stages {
if !f.IsVerifiedIn(s) {
return false
}
}
return true
}

// IsApprovedFor returns whether the Freight has been approved for the specified
// Stage.
func (f *Freight) IsApprovedFor(stage string) bool {
Expand Down
63 changes: 63 additions & 0 deletions api/v1alpha1/freight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,69 @@ func TestFreight_IsVerifiedIn(t *testing.T) {
require.True(t, freight.IsVerifiedIn(testStage))
}

func TestFreight_IsVerifiedInAll(t *testing.T) {
testCases := []struct {
name string
freight Freight
stages []string
expected bool
}{
{
name: "Freight is not verified in any stage",
freight: Freight{
Status: FreightStatus{
VerifiedIn: map[string]VerifiedStage{},
},
},
stages: []string{"stage1", "stage2"},
expected: false,
},
{
name: "Freight is verified in all stages",
freight: Freight{
Status: FreightStatus{
VerifiedIn: map[string]VerifiedStage{
"stage1": {},
"stage2": {},
},
},
},
stages: []string{"stage1", "stage2"},
expected: true,
},
{
name: "Freight is verified in some stages",
freight: Freight{
Status: FreightStatus{
VerifiedIn: map[string]VerifiedStage{
"stage1": {},
},
},
},
stages: []string{"stage1", "stage2"},
expected: false,
},
{
name: "Freight is verified in all stages but not in the same order",
freight: Freight{
Status: FreightStatus{
VerifiedIn: map[string]VerifiedStage{
"stage2": {},
"stage1": {},
},
},
},
stages: []string{"stage1", "stage2"},
expected: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
require.Equal(t, testCase.expected, testCase.freight.IsVerifiedInAll(testCase.stages))
})
}
}

func TestFreight_IsApprovedFor(t *testing.T) {
const testStage = "fake-stage"
freight := &Freight{}
Expand Down
533 changes: 288 additions & 245 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions api/v1alpha1/stage_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func GetStage(
// This includes:
//
// 1. Any Freight from a Warehouse that the Stage subscribes to directly
// 2. Any Freight that is verified in any upstream Stages (with any applicable soak time elapsed)
// 2. Any Freight that is verified in upstream Stages matching configured AvailabilityStrategy (with any applicable soak time elapsed)
// 3. Any Freight that is approved for the Stage
func (s *Stage) ListAvailableFreight(
ctx context.Context,
Expand Down Expand Up @@ -135,8 +135,9 @@ func (s *Stage) ListAvailableFreight(
var listOpts *ListWarehouseFreightOptions
if !req.Sources.Direct {
listOpts = &ListWarehouseFreightOptions{
ApprovedFor: s.Name,
VerifiedIn: req.Sources.Stages,
ApprovedFor: s.Name,
VerifiedIn: req.Sources.Stages,
RequireAllVerifiedIn: req.AvailabilityStrategy == FreightAvailabilityStrategyAllUpstream,
}
if requiredSoak := req.Sources.RequiredSoakTime; requiredSoak != nil {
listOpts.VerifiedBefore = &metav1.Time{Time: time.Now().Add(-requiredSoak.Duration)}
Expand Down
14 changes: 14 additions & 0 deletions api/v1alpha1/stage_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ type FreightOriginKind string

const FreightOriginKindWarehouse FreightOriginKind = "Warehouse"

// +kubebuilder:validation:Enum={AllUpstream,AnyUpstream}
type FreightAvailabilityStrategy string

const (
FreightAvailabilityStrategyAllUpstream FreightAvailabilityStrategy = "AllUpstream"
FreightAvailabilityStrategyAnyUpstream FreightAvailabilityStrategy = "AnyUpstream"
)

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name=Shard,type=string,JSONPath=`.spec.shard`
Expand Down Expand Up @@ -205,6 +213,12 @@ type FreightRequest struct {
// Sources describes where the requested Freight may be obtained from. This is
// a required field.
Sources FreightSources `json:"sources" protobuf:"bytes,2,opt,name=sources"`
// AvailabilityStrategy specifies the semantics for how requested Freight is
// made available to the Stage. This field is optional. When left unspecified,
// the field is implicitly treated as if its value were "AnyUpstream".
//
// +kubebuilder:default=AnyUpstream
AvailabilityStrategy FreightAvailabilityStrategy `json:"availabilityStrategy" protobuf:"bytes,3,opt,name=availabilityStrategy"`
}

// FreightOrigin describes a kind of Freight in terms of where it may have
Expand Down
60 changes: 56 additions & 4 deletions api/v1alpha1/warehouse_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -86,6 +87,11 @@ type ListWarehouseFreightOptions struct {
// This is useful for filtering out Freight whose soak time has not yet
// elapsed.
VerifiedBefore *metav1.Time
// RequireAllVerifiedIn specifies whether ALL of the Stages defined in VerifiedIn
// must have verified a given Freight for it to match.
// IMPORTANT: This is also applied to Freight matched using the VerifiedBefore
// condition.
RequireAllVerifiedIn bool
}

// ListFreight returns a list of all Freight resources that originated from the
Expand Down Expand Up @@ -149,6 +155,32 @@ func (w *Warehouse) ListFreight(
freight = append(freight, res.Items...)
}

// Filter out identified Freight that has not been verified in all of the
// specified VerifiedIn Stages if RequireAllVerifiedIn is true.
// Default behavior is to return Freight that is verified in any of the
// specified VerifiedIn Stages.
if opts.RequireAllVerifiedIn && len(opts.VerifiedIn) > 0 {
// Reduce Freight to only items that are verified in ALL upstream
// Stages. Freight that has been approved for a Stage is considered
// verified in this check.
filtered := make([]Freight, 0, len(freight))

for _, f := range freight {
if opts.ApprovedFor != "" {
if f.IsApprovedFor(opts.ApprovedFor) {
filtered = append(filtered, f)
continue
}
}
if f.IsVerifiedInAll(opts.VerifiedIn) {
filtered = append(filtered, f)
continue
}
}

freight = filtered
}

// Sort and de-dupe
slices.SortFunc(freight, func(lhs, rhs Freight) int {
return strings.Compare(lhs.Name, rhs.Name)
Expand All @@ -164,22 +196,42 @@ func (w *Warehouse) ListFreight(

// Filter out Freight whose soak time has not yet elapsed
filtered := make([]Freight, 0, len(freight))
freightLoop:
for _, f := range freight {
if opts.ApprovedFor != "" {
if f.IsApprovedFor(opts.ApprovedFor) {
filtered = append(filtered, f)
continue
}
}
for _, ver := range f.Status.VerifiedIn {

// Track set of Stages that have passed the verification soak time
// for the Freight.
verifiedStages := sets.New[string]()
for stage, ver := range f.Status.VerifiedIn {
if verifiedAt := ver.VerifiedAt; verifiedAt != nil {
if verifiedAt.Time.Before(opts.VerifiedBefore.Time) {
filtered = append(filtered, f)
continue freightLoop
verifiedStages.Insert(stage)
}
}
}

// Filter out Freight that has not been verified in all of the specified
// VerifiedIn Stages if RequireAllVerifiedIn is true.
// Otherwise, include Freight if it has passed the soak time in a single
// Stage.
if opts.RequireAllVerifiedIn {
// If Freight is verified in ALL upstream Stages, then it is
// available.
if verifiedStages.Equal(sets.New(opts.VerifiedIn...)) {
filtered = append(filtered, f)
}
continue
}

// If Freight is verified in ANY upstream Stage, then it is available.
if verifiedStages.Len() > 0 {
filtered = append(filtered, f)
}
}
return filtered, nil
}
108 changes: 107 additions & 1 deletion api/v1alpha1/warehouse_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func TestWarehouse_ListFreight(t *testing.T) {
const testWarehouse = "fake-warehouse"
const testStage = "fake-stage"
const testUpstreamStage = "fake-upstream-stage"
const testUpstreamStage2 = "fake-upstream-stage2"

testCases := []struct {
name string
Expand Down Expand Up @@ -171,7 +172,7 @@ func TestWarehouse_ListFreight(t *testing.T) {
},
},
{
name: "success with options",
name: "success with VerifiedIn and VerifiedBefore options",
opts: &ListWarehouseFreightOptions{
ApprovedFor: testStage,
VerifiedIn: []string{testUpstreamStage},
Expand Down Expand Up @@ -269,6 +270,111 @@ func TestWarehouse_ListFreight(t *testing.T) {
require.Equal(t, "fake-freight-5", freight[1].Name)
},
},
{
name: "success with RequireAllVerifiedIn set to true",
opts: &ListWarehouseFreightOptions{
RequireAllVerifiedIn: true,
ApprovedFor: testStage,
VerifiedIn: []string{testUpstreamStage, testUpstreamStage2},
VerifiedBefore: &metav1.Time{Time: time.Now().Add(-1 * time.Hour)},
},
objects: []client.Object{
&Freight{ // This should be returned as its approved for the Stage
ObjectMeta: metav1.ObjectMeta{
Namespace: testProject,
Name: "fake-freight-1",
},
Origin: FreightOrigin{
Kind: FreightOriginKindWarehouse,
Name: testWarehouse,
},
Status: FreightStatus{
// This is approved for the Stage
ApprovedFor: map[string]ApprovedStage{testStage: {}},
// This is only verified in a single upstream Stage
VerifiedIn: map[string]VerifiedStage{
testUpstreamStage: {
VerifiedAt: ptr.To(metav1.Now()),
},
},
},
},
&Freight{ // This should be returned because its verified in both upstream Stages and soak time has lapsed
ObjectMeta: metav1.ObjectMeta{
Namespace: testProject,
Name: "fake-freight-2",
},
Origin: FreightOrigin{
Kind: FreightOriginKindWarehouse,
Name: testWarehouse,
},
Status: FreightStatus{
// This is not approved for any Stage
ApprovedFor: map[string]ApprovedStage{},
// This is verified in all of the upstream Stages and the soak time has lapsed
VerifiedIn: map[string]VerifiedStage{
testUpstreamStage: {
VerifiedAt: ptr.To(metav1.NewTime(time.Now().Add(-2 * time.Hour))),
},
testUpstreamStage2: {
VerifiedAt: ptr.To(metav1.NewTime(time.Now().Add(-2 * time.Hour))),
},
},
},
},
&Freight{ // This should not be returned because it's not verified in all of the upstream Stages
ObjectMeta: metav1.ObjectMeta{
Namespace: testProject,
Name: "fake-freight-3",
},
Origin: FreightOrigin{
Kind: FreightOriginKindWarehouse,
Name: testWarehouse,
},
Status: FreightStatus{
// This is not approved for any Stage
ApprovedFor: map[string]ApprovedStage{},
// This is not verified in all of the upstream Stages
VerifiedIn: map[string]VerifiedStage{
testUpstreamStage: {
VerifiedAt: ptr.To(metav1.Now()),
},
},
},
},
&Freight{ // This should not be returned because its not passed the soak time in all Stages
ObjectMeta: metav1.ObjectMeta{
Namespace: testProject,
Name: "fake-freight-4",
},
Origin: FreightOrigin{
Kind: FreightOriginKindWarehouse,
Name: testWarehouse,
},
Status: FreightStatus{
// This is not approved for any Stage
ApprovedFor: map[string]ApprovedStage{},
// This is verified in all of the upstream Stages but only passed the soak time of one
VerifiedIn: map[string]VerifiedStage{
testUpstreamStage: {
VerifiedAt: ptr.To(metav1.NewTime(time.Now().Add(-2 * time.Hour))),
},
testUpstreamStage2: {
VerifiedAt: ptr.To(metav1.Now()),
},
},
},
},
},
assertions: func(t *testing.T, freight []Freight, err error) {
require.NoError(t, err)
require.Len(t, freight, 2)
require.Equal(t, testProject, freight[0].Namespace)
require.Equal(t, "fake-freight-1", freight[0].Name)
require.Equal(t, testProject, freight[1].Namespace)
require.Equal(t, "fake-freight-2", freight[1].Name)
},
},
}

testScheme := k8sruntime.NewScheme()
Expand Down
Loading

0 comments on commit c6645f0

Please sign in to comment.