From 8b54913c1d49fe11761f5ca631f11a02c3438663 Mon Sep 17 00:00:00 2001 From: ory-bot <60093411+ory-bot@users.noreply.github.com> Date: Tue, 20 Dec 2022 07:57:48 +0100 Subject: [PATCH] chore: code review --- selfservice/hook/web_hook.go | 50 +++++++++++-------- selfservice/hook/web_hook_integration_test.go | 2 + 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/selfservice/hook/web_hook.go b/selfservice/hook/web_hook.go index 1327ea747de6..bddbd0c54571 100644 --- a/selfservice/hook/web_hook.go +++ b/selfservice/hook/web_hook.go @@ -274,29 +274,36 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error { attribute.String("webhook.identity.nid", data.Identity.NID.String()), ) } + var ( - httpClient = e.deps.HTTPClient(ctx) - async = gjson.GetBytes(e.conf, "response.ignore").Bool() - parseResponse = gjson.GetBytes(e.conf, "can_interrupt").Bool() - tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks") - cancel context.CancelFunc = func() {} - spanOpts = []trace.SpanStartOption{trace.WithAttributes(attrs...)} - errChan = make(chan error, 1) + httpClient = e.deps.HTTPClient(ctx) + ignoreResponse = gjson.GetBytes(e.conf, "response.ignore").Bool() + canInterrupt = gjson.GetBytes(e.conf, "can_interrupt").Bool() + tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks") + spanOpts = []trace.SpanStartOption{trace.WithAttributes(attrs...)} + errChan = make(chan error, 1) ) - if async { - // dissociate the context from the one passed into this function - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) - spanOpts = append(spanOpts, trace.WithNewRoot()) + + req = req.WithContext(ctx) + if ignoreResponse { + // This is one of the few places where spawning a context.Background() is ok. We need to do this + // because the function runs asynchronously and we don't want to cancel the request if the + // incoming request context is cancelled. + // + // The webhook will still cancel after 30 seconds as that is the configured timeout for the HTTP client. + req = req.WithContext(context.Background()) + // spanOpts = append(spanOpts, trace.WithNewRoot()) } + ctx, span := tracer.Start(ctx, "Webhook", spanOpts...) e.deps.Logger().WithRequest(req.Request).Info("Dispatching webhook") - t0 := time.Now() + + startTime := time.Now() go func() { defer close(errChan) - defer cancel() defer span.End() - resp, err := httpClient.Do(req.WithContext(ctx)) + resp, err := httpClient.Do(req) if err != nil { span.SetStatus(codes.Error, err.Error()) errChan <- errors.WithStack(err) @@ -307,7 +314,7 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error { if resp.StatusCode >= http.StatusBadRequest { span.SetStatus(codes.Error, "HTTP status code >= 400") - if parseResponse { + if canInterrupt { if err := parseWebhookResponse(resp); err != nil { span.SetStatus(codes.Error, err.Error()) errChan <- err @@ -320,16 +327,17 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error { errChan <- nil }() - if async { + if ignoreResponse { traceID, spanID := span.SpanContext().TraceID(), span.SpanContext().SpanID() + logger := e.deps.Logger().WithField("otel", map[string]string{ + "trace_id": traceID.String(), + "span_id": spanID.String(), + }) go func() { if err := <-errChan; err != nil { - e.deps.Logger().WithField("otel", map[string]string{ - "trace_id": traceID.String(), - "span_id": spanID.String(), - }).WithError(err).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.") + logger.WithField("duration", time.Since(startTime)).WithError(err).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.") } else { - e.deps.Logger().WithField("duration", time.Since(t0)).Info("Webhook request succeeded") + logger.WithField("duration", time.Since(startTime)).Info("Webhook request succeeded") } }() return nil diff --git a/selfservice/hook/web_hook_integration_test.go b/selfservice/hook/web_hook_integration_test.go index 7621c7609ce6..90349627bcde 100644 --- a/selfservice/hook/web_hook_integration_test.go +++ b/selfservice/hook/web_hook_integration_test.go @@ -861,6 +861,7 @@ func TestAsyncWebhook(t *testing.T) { URL: &url.URL{Path: "/some_end_point"}, Method: http.MethodPost, } + incomingCtx, incomingCancel := context.WithCancel(context.Background()) if deadline, ok := t.Deadline(); ok { // cancel this context one second before test timeout for clean shutdown @@ -868,6 +869,7 @@ func TestAsyncWebhook(t *testing.T) { incomingCtx, cleanup = context.WithDeadline(incomingCtx, deadline.Add(-time.Second)) defer cleanup() } + req = req.WithContext(incomingCtx) s := &session.Session{ID: x.NewUUID(), Identity: &identity.Identity{ID: x.NewUUID()}} f := &login.Flow{ID: x.NewUUID()}