Skip to content

Commit

Permalink
Use unique context for endpoint poller reconcile actions
Browse files Browse the repository at this point in the history
  • Loading branch information
hjkatz committed Nov 7, 2024
1 parent 2e1647f commit 9386161
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions internal/controller/bindings/boundendpoint_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,22 +216,23 @@ func (r *BoundEndpointPoller) reconcileBoundEndpointsFromAPI(ctx context.Context
toCreate, toUpdate, toDelete := r.filterBoundEndpointActions(ctx, existingBoundEndpoints, desiredBoundEndpoints)

// create context + errgroup for managing/closing the future goroutine in the reconcile actions loops
errGroup, ctx := errgroup.WithContext(ctx)
ctx, cancel := context.WithCancel(ctx)
errGroup, reconcileActionCtx := errgroup.WithContext(context.Background())
reconcileActionCtx, cancel := context.WithCancel(reconcileActionCtx)
reconcileActionCtx = ctrl.LoggerInto(reconcileActionCtx, log)
r.reconcilingCancel = cancel

// launch goroutines to reconcile the BoundEndpoints' actions in the background until the next polling loop

r.reconcileBoundEndpointAction(ctx, errGroup, toCreate, "create", func(ctx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.createBinding(ctx, binding)
r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toCreate, "create", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.createBinding(reconcileActionCtx, binding)
})

r.reconcileBoundEndpointAction(ctx, errGroup, toUpdate, "update", func(ctx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.updateBinding(ctx, binding)
r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toUpdate, "update", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.updateBinding(reconcileActionCtx, binding)
})

r.reconcileBoundEndpointAction(ctx, errGroup, toDelete, "delete", func(ctx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.deleteBinding(ctx, binding)
r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toDelete, "delete", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.deleteBinding(reconcileActionCtx, binding)
})

return nil
Expand All @@ -245,6 +246,11 @@ type boundEndpointActionFn func(context.Context, bindingsv1alpha1.BoundEndpoint)
func (r *BoundEndpointPoller) reconcileBoundEndpointAction(ctx context.Context, errGroup *errgroup.Group, boundEndpoints []bindingsv1alpha1.BoundEndpoint, actionMsg string, action boundEndpointActionFn) {
log := ctrl.LoggerFrom(ctx)

if len(boundEndpoints) == 0 {
// nothing to do
return
}

errGroup.Go(func() error {
// attempt reconciliation actions every so often
ticker := time.NewTicker(5 * time.Second)
Expand All @@ -254,16 +260,17 @@ func (r *BoundEndpointPoller) reconcileBoundEndpointAction(ctx context.Context,
remainingBindings := boundEndpoints

for {
if len(remainingBindings) == 0 {
return nil // all bindings have been processed
}

select {
// stop go routine and return, there is a new reconcile poll happening actively
case <-ctx.Done():
log.Error(ctx.Err(), "Reconcile context canceled, stopping BoundEndpoint reconcile loop early", "action", actionMsg)
log.Info("Reconcile Action context canceled, stopping BoundEndpoint reconcile action loop early", "action", actionMsg)
return nil
case <-ticker.C:
log.V(9).Info("Received tick", "action", actionMsg, "remaining", remainingBindings)
if len(remainingBindings) == 0 {
return nil // all bindings have been processed
}

failedBindings := []bindingsv1alpha1.BoundEndpoint{}

Expand Down

0 comments on commit 9386161

Please sign in to comment.