Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use goroutine instead of errGroup #497

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/controller/bindings/boundendpoint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,9 @@ func (r *BoundEndpointReconciler) tryToBindEndpoint(ctx context.Context, boundEn

retries := 5
attempt := 0
waitDuration := 0 * time.Second // start immediately
backoffDuration := 10 * time.Second // increasing duration to wait between retries
dialTimeout := 5 * time.Second // timeout for dialing the targetService
waitDuration := 0 * time.Second // start immediately
backoffDuration := 3 * time.Second // increasing duration to wait between retries
dialTimeout := 1 * time.Second // timeout for dialing the targetService

// to be filled in
var bindErr error
Expand Down
22 changes: 10 additions & 12 deletions internal/controller/bindings/boundendpoint_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
bindingsv1alpha1 "github.com/ngrok/ngrok-operator/api/bindings/v1alpha1"
ngrokv1alpha1 "github.com/ngrok/ngrok-operator/api/ngrok/v1alpha1"
"github.com/ngrok/ngrok-operator/internal/ngrokapi"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -216,22 +215,21 @@ func (r *BoundEndpointPoller) reconcileBoundEndpointsFromAPI(ctx context.Context
toCreate, toUpdate, toDelete := r.filterBoundEndpointActions(ctx, existingBoundEndpoints, desiredBoundEndpoints)

// create context + errgroup for managing/closing the future goroutine in the reconcile actions loops
errGroup, reconcileActionCtx := errgroup.WithContext(context.Background())
reconcileActionCtx, cancel := context.WithCancel(reconcileActionCtx)
reconcileActionCtx, cancel := context.WithCancel(context.Background())
reconcileActionCtx = ctrl.LoggerInto(reconcileActionCtx, log)
r.reconcilingCancel = cancel

// launch goroutines to reconcile the BoundEndpoints' actions in the background until the next polling loop

r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toCreate, "create", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
r.reconcileBoundEndpointAction(reconcileActionCtx, toCreate, "create", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.createBinding(reconcileActionCtx, binding)
})

r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toUpdate, "update", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
r.reconcileBoundEndpointAction(reconcileActionCtx, toUpdate, "update", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.updateBinding(reconcileActionCtx, binding)
})

r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toDelete, "delete", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
r.reconcileBoundEndpointAction(reconcileActionCtx, toDelete, "delete", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.deleteBinding(reconcileActionCtx, binding)
})

Expand All @@ -243,32 +241,32 @@ type boundEndpointActionFn func(context.Context, bindingsv1alpha1.BoundEndpoint)

// reconcileBoundEndpointAction runs a goroutine to try and process a list of BoundEndpoints
// for their desired action over and over again until stopChan is closed or receives a value
func (r *BoundEndpointPoller) reconcileBoundEndpointAction(ctx context.Context, errGroup *errgroup.Group, boundEndpoints []bindingsv1alpha1.BoundEndpoint, actionMsg string, action boundEndpointActionFn) {
func (r *BoundEndpointPoller) reconcileBoundEndpointAction(ctx context.Context, boundEndpoints []bindingsv1alpha1.BoundEndpoint, actionMsg string, action boundEndpointActionFn) {
log := ctrl.LoggerFrom(ctx)

if len(boundEndpoints) == 0 {
// nothing to do
return
}

errGroup.Go(func() error {
go func() {
// attempt reconciliation actions every so often
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

// remainingBindings is the list of BoundEndpoints that still need to be actioned upon
remainingBindings := boundEndpoints

for {
if len(remainingBindings) == 0 {
return nil // all bindings have been processed
return
}

select {
// stop go routine and return, there is a new reconcile poll happening actively
case <-ctx.Done():
log.Info("Reconcile Action context canceled, stopping BoundEndpoint reconcile action loop early", "action", actionMsg)
return nil
return
case <-ticker.C:
log.V(9).Info("Received tick", "action", actionMsg, "remaining", remainingBindings)

Expand All @@ -287,7 +285,7 @@ func (r *BoundEndpointPoller) reconcileBoundEndpointAction(ctx context.Context,
remainingBindings = failedBindings
}
}
})
}()
}

// filterBoundEndpointActions takse 2 sets of existing and desired BoundEndpoints
Expand Down
Loading