Skip to content

Commit

Permalink
Use goroutine instead of errGroup (#497)
Browse files Browse the repository at this point in the history
Use goroutine instead of errGroup

Adjust timings

<!-- Thank you for contributing! Please make sure that your code changes
are covered with tests. In case of new features or big changes remember
to adjust the documentation.

In case of an existing issue, reference it using one of the following:

closes: #ISSUE
related: #ISSUE

How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->

## What
*Describe what the change is solving*

## How
*Describe the solution*

## Breaking Changes
*Are there any breaking changes in this PR?*
  • Loading branch information
hjkatz authored Nov 8, 2024
2 parents 6e2c6e7 + 17bf76f commit d474e62
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
6 changes: 3 additions & 3 deletions internal/controller/bindings/boundendpoint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,9 @@ func (r *BoundEndpointReconciler) tryToBindEndpoint(ctx context.Context, boundEn

retries := 5
attempt := 0
waitDuration := 0 * time.Second // start immediately
backoffDuration := 10 * time.Second // increasing duration to wait between retries
dialTimeout := 5 * time.Second // timeout for dialing the targetService
waitDuration := 0 * time.Second // start immediately
backoffDuration := 3 * time.Second // increasing duration to wait between retries
dialTimeout := 1 * time.Second // timeout for dialing the targetService

// to be filled in
var bindErr error
Expand Down
22 changes: 10 additions & 12 deletions internal/controller/bindings/boundendpoint_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
bindingsv1alpha1 "github.com/ngrok/ngrok-operator/api/bindings/v1alpha1"
ngrokv1alpha1 "github.com/ngrok/ngrok-operator/api/ngrok/v1alpha1"
"github.com/ngrok/ngrok-operator/internal/ngrokapi"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -216,22 +215,21 @@ func (r *BoundEndpointPoller) reconcileBoundEndpointsFromAPI(ctx context.Context
toCreate, toUpdate, toDelete := r.filterBoundEndpointActions(ctx, existingBoundEndpoints, desiredBoundEndpoints)

// create context + errgroup for managing/closing the future goroutine in the reconcile actions loops
errGroup, reconcileActionCtx := errgroup.WithContext(context.Background())
reconcileActionCtx, cancel := context.WithCancel(reconcileActionCtx)
reconcileActionCtx, cancel := context.WithCancel(context.Background())
reconcileActionCtx = ctrl.LoggerInto(reconcileActionCtx, log)
r.reconcilingCancel = cancel

// launch goroutines to reconcile the BoundEndpoints' actions in the background until the next polling loop

r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toCreate, "create", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
r.reconcileBoundEndpointAction(reconcileActionCtx, toCreate, "create", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.createBinding(reconcileActionCtx, binding)
})

r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toUpdate, "update", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
r.reconcileBoundEndpointAction(reconcileActionCtx, toUpdate, "update", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.updateBinding(reconcileActionCtx, binding)
})

r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toDelete, "delete", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
r.reconcileBoundEndpointAction(reconcileActionCtx, toDelete, "delete", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.deleteBinding(reconcileActionCtx, binding)
})

Expand All @@ -243,32 +241,32 @@ type boundEndpointActionFn func(context.Context, bindingsv1alpha1.BoundEndpoint)

// reconcileBoundEndpointAction runs a goroutine to try and process a list of BoundEndpoints
// for their desired action over and over again until stopChan is closed or receives a value
func (r *BoundEndpointPoller) reconcileBoundEndpointAction(ctx context.Context, errGroup *errgroup.Group, boundEndpoints []bindingsv1alpha1.BoundEndpoint, actionMsg string, action boundEndpointActionFn) {
func (r *BoundEndpointPoller) reconcileBoundEndpointAction(ctx context.Context, boundEndpoints []bindingsv1alpha1.BoundEndpoint, actionMsg string, action boundEndpointActionFn) {
log := ctrl.LoggerFrom(ctx)

if len(boundEndpoints) == 0 {
// nothing to do
return
}

errGroup.Go(func() error {
go func() {
// attempt reconciliation actions every so often
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

// remainingBindings is the list of BoundEndpoints that still need to be actioned upon
remainingBindings := boundEndpoints

for {
if len(remainingBindings) == 0 {
return nil // all bindings have been processed
return
}

select {
// stop go routine and return, there is a new reconcile poll happening actively
case <-ctx.Done():
log.Info("Reconcile Action context canceled, stopping BoundEndpoint reconcile action loop early", "action", actionMsg)
return nil
return
case <-ticker.C:
log.V(9).Info("Received tick", "action", actionMsg, "remaining", remainingBindings)

Expand All @@ -287,7 +285,7 @@ func (r *BoundEndpointPoller) reconcileBoundEndpointAction(ctx context.Context,
remainingBindings = failedBindings
}
}
})
}()
}

// filterBoundEndpointActions takse 2 sets of existing and desired BoundEndpoints
Expand Down

0 comments on commit d474e62

Please sign in to comment.