-
Notifications
You must be signed in to change notification settings - Fork 969
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: spurious cancelation of async webhooks, better tracing #2969
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,10 +8,13 @@ import ( | |
"encoding/json" | ||
"fmt" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/tidwall/gjson" | ||
"go.opentelemetry.io/otel/attribute" | ||
"go.opentelemetry.io/otel/codes" | ||
semconv "go.opentelemetry.io/otel/semconv/v1.11.0" | ||
"go.opentelemetry.io/otel/trace" | ||
|
||
"github.com/ory/kratos/ui/node" | ||
|
@@ -29,7 +32,6 @@ import ( | |
"github.com/ory/kratos/session" | ||
"github.com/ory/kratos/text" | ||
"github.com/ory/kratos/x" | ||
"github.com/ory/x/otelx" | ||
) | ||
|
||
var ( | ||
|
@@ -253,22 +255,6 @@ func (e *WebHook) ExecuteSettingsPrePersistHook(_ http.ResponseWriter, req *http | |
} | ||
|
||
func (e *WebHook) execute(ctx context.Context, data *templateContext) error { | ||
span := trace.SpanFromContext(ctx) | ||
attrs := map[string]string{ | ||
"webhook.http.method": data.RequestMethod, | ||
"webhook.http.url": data.RequestURL, | ||
"webhook.http.headers": fmt.Sprintf("%#v", data.RequestHeaders), | ||
} | ||
|
||
if data.Identity != nil { | ||
attrs["webhook.identity.id"] = data.Identity.ID.String() | ||
} else { | ||
attrs["webhook.identity.id"] = "" | ||
} | ||
|
||
span.SetAttributes(otelx.StringAttrs(attrs)...) | ||
defer span.End() | ||
|
||
builder, err := request.NewBuilder(e.conf, e.deps) | ||
if err != nil { | ||
return err | ||
|
@@ -281,35 +267,70 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error { | |
return err | ||
} | ||
|
||
errChan := make(chan error, 1) | ||
attrs := semconv.HTTPClientAttributesFromHTTPRequest(req.Request) | ||
if data.Identity != nil { | ||
attrs = append(attrs, | ||
attribute.String("webhook.identity.id", data.Identity.ID.String()), | ||
attribute.String("webhook.identity.nid", data.Identity.NID.String()), | ||
) | ||
} | ||
var ( | ||
httpClient = e.deps.HTTPClient(ctx) | ||
async = gjson.GetBytes(e.conf, "response.ignore").Bool() | ||
parseResponse = gjson.GetBytes(e.conf, "can_interrupt").Bool() | ||
tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks") | ||
cancel context.CancelFunc = func() {} | ||
spanOpts = []trace.SpanStartOption{trace.WithAttributes(attrs...)} | ||
errChan = make(chan error, 1) | ||
) | ||
if async { | ||
// dissociate the context from the one passed into this function | ||
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) | ||
spanOpts = append(spanOpts, trace.WithNewRoot()) | ||
} | ||
ctx, span := tracer.Start(ctx, "Webhook", spanOpts...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here for example is the first instance where the incorrect context is used if async is true. The trace would be lost and without context! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code is correct. For async webhooks, we start a new root span which is not associated with the incoming span. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think of doing something like this:
that way, we ensure that the context is never leaving the async execution path :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
But then the span would not appear as part of the request, and it would not be possible to correlate it to an incoming HTTP request? Is that really intentional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the span would be orphaned in that sense. But still easy enough to find. If you made the async webhook a child span of the incoming Kratos request span, the child will outlive the parent span. That will trip up many tools and generally violate OpenTelemetry semantics. OpenTracing used to have a special trace-trace relationship ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see, that is a very good explanation - thank you! So I did not include this in the merge, would you be open to make a follow-up PR with the span fixed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The merged patch is fine as-is. The OpenTelemetry people themselves are not sure how they want to models this: open-telemetry/opentelemetry-specification#65 I'd suggest looking at some traces in Jaeger and Tempo, and deciding whether we're OK with how those tools interpret the data in practice. If there's a major hiccup, we can then change the instrumentation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perfect, that sounds like a plan!
|
||
e.deps.Logger().WithRequest(req.Request).Info("Dispatching webhook") | ||
t0 := time.Now() | ||
go func() { | ||
defer close(errChan) | ||
defer cancel() | ||
defer span.End() | ||
|
||
resp, err := e.deps.HTTPClient(ctx).Do(req.WithContext(ctx)) | ||
resp, err := httpClient.Do(req.WithContext(ctx)) | ||
if err != nil { | ||
span.SetStatus(codes.Error, err.Error()) | ||
errChan <- errors.WithStack(err) | ||
return | ||
} | ||
defer resp.Body.Close() | ||
span.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(resp.StatusCode)...) | ||
|
||
if resp.StatusCode >= http.StatusBadRequest { | ||
if gjson.GetBytes(e.conf, "can_interrupt").Bool() { | ||
span.SetStatus(codes.Error, "HTTP status code >= 400") | ||
if parseResponse { | ||
if err := parseWebhookResponse(resp); err != nil { | ||
span.SetStatus(codes.Error, err.Error()) | ||
errChan <- err | ||
} | ||
} | ||
errChan <- fmt.Errorf("web hook failed with status code %v", resp.StatusCode) | ||
span.SetStatus(codes.Error, fmt.Sprintf("web hook failed with status code %v", resp.StatusCode)) | ||
errChan <- fmt.Errorf("webhook failed with status code %v", resp.StatusCode) | ||
return | ||
} | ||
|
||
errChan <- nil | ||
}() | ||
|
||
if gjson.GetBytes(e.conf, "response.ignore").Bool() { | ||
if async { | ||
traceID, spanID := span.SpanContext().TraceID(), span.SpanContext().SpanID() | ||
go func() { | ||
err := <-errChan | ||
e.deps.Logger().WithError(err).Warning("A web hook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.") | ||
if err := <-errChan; err != nil { | ||
e.deps.Logger().WithField("otel", map[string]string{ | ||
"trace_id": traceID.String(), | ||
"span_id": spanID.String(), | ||
}).WithError(err).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.") | ||
} else { | ||
e.deps.Logger().WithField("duration", time.Since(t0)).Info("Webhook request succeeded") | ||
} | ||
}() | ||
return nil | ||
} | ||
|
@@ -323,7 +344,7 @@ func parseWebhookResponse(resp *http.Response) (err error) { | |
} | ||
var hookResponse rawHookResponse | ||
if err := json.NewDecoder(resp.Body).Decode(&hookResponse); err != nil { | ||
return errors.Wrap(err, "hook response could not be unmarshalled properly from JSON") | ||
return errors.Wrap(err, "webhook response could not be unmarshalled properly from JSON") | ||
} | ||
|
||
var validationErrs []*schema.ValidationError | ||
|
@@ -343,11 +364,11 @@ func parseWebhookResponse(resp *http.Response) (err error) { | |
Context: detail.Context, | ||
}) | ||
} | ||
validationErrs = append(validationErrs, schema.NewHookValidationError(msg.InstancePtr, "a web-hook target returned an error", messages)) | ||
validationErrs = append(validationErrs, schema.NewHookValidationError(msg.InstancePtr, "a webhook target returned an error", messages)) | ||
} | ||
|
||
if len(validationErrs) == 0 { | ||
return errors.New("error while parsing hook response: got no validation errors") | ||
return errors.New("error while parsing webhook response: got no validation errors") | ||
} | ||
|
||
return schema.NewValidationListError(validationErrs) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ import ( | |
"testing" | ||
"time" | ||
|
||
"github.com/sirupsen/logrus/hooks/test" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/ory/kratos/schema" | ||
|
@@ -365,7 +366,7 @@ func TestWebHooks(t *testing.T) { | |
}`, | ||
) | ||
|
||
webhookError := schema.NewValidationListError([]*schema.ValidationError{schema.NewHookValidationError("#/traits/username", "a web-hook target returned an error", text.Messages{{ID: 1234, Type: "info", Text: "error message"}})}) | ||
webhookError := schema.NewValidationListError([]*schema.ValidationError{schema.NewHookValidationError("#/traits/username", "a webhook target returned an error", text.Messages{{ID: 1234, Type: "info", Text: "error message"}})}) | ||
for _, tc := range []struct { | ||
uc string | ||
callWebHook func(wh *hook.WebHook, req *http.Request, f flow.Flow, s *session.Session) error | ||
|
@@ -839,3 +840,95 @@ func TestDisallowPrivateIPRanges(t *testing.T) { | |
require.Contains(t, err.Error(), "192.168.178.0 is not a public IP address") | ||
}) | ||
} | ||
|
||
func TestAsyncWebhook(t *testing.T) { | ||
_, reg := internal.NewFastRegistryWithMocks(t) | ||
logger := logrusx.New("kratos", "test") | ||
logHook := new(test.Hook) | ||
logger.Logger.Hooks.Add(logHook) | ||
whDeps := struct { | ||
x.SimpleLoggerWithClient | ||
*jsonnetsecure.TestProvider | ||
}{ | ||
x.SimpleLoggerWithClient{L: logger, C: reg.HTTPClient(context.Background()), T: otelx.NewNoop(logger, &otelx.Config{ServiceName: "kratos"})}, | ||
jsonnetsecure.NewTestProvider(t), | ||
} | ||
|
||
req := &http.Request{ | ||
Header: map[string][]string{"Some-Header": {"Some-Value"}}, | ||
Host: "www.ory.sh", | ||
TLS: new(tls.ConnectionState), | ||
URL: &url.URL{Path: "/some_end_point"}, | ||
Method: http.MethodPost, | ||
} | ||
incomingCtx, incomingCancel := context.WithCancel(context.Background()) | ||
if deadline, ok := t.Deadline(); ok { | ||
// cancel this context one second before test timeout for clean shutdown | ||
var cleanup context.CancelFunc | ||
incomingCtx, cleanup = context.WithDeadline(incomingCtx, deadline.Add(-time.Second)) | ||
defer cleanup() | ||
} | ||
req = req.WithContext(incomingCtx) | ||
s := &session.Session{ID: x.NewUUID(), Identity: &identity.Identity{ID: x.NewUUID()}} | ||
f := &login.Flow{ID: x.NewUUID()} | ||
|
||
handlerEntered, blockHandlerOnExit := make(chan struct{}), make(chan struct{}) | ||
webhookReceiver := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
close(handlerEntered) | ||
<-blockHandlerOnExit | ||
w.Write([]byte("ok")) | ||
})) | ||
t.Cleanup(webhookReceiver.Close) | ||
|
||
wh := hook.NewWebHook(&whDeps, json.RawMessage(fmt.Sprintf(` | ||
{ | ||
"url": %q, | ||
"method": "GET", | ||
"body": "file://stub/test_body.jsonnet", | ||
"response": { | ||
"ignore": true | ||
} | ||
}`, webhookReceiver.URL))) | ||
err := wh.ExecuteLoginPostHook(nil, req, node.DefaultGroup, f, s) | ||
require.NoError(t, err) // execution returns immediately for async webhook | ||
select { | ||
case <-time.After(200 * time.Millisecond): | ||
t.Fatal("timed out waiting for webhook request to reach test handler") | ||
case <-handlerEntered: | ||
// ok | ||
} | ||
// at this point, a goroutine is in the middle of the call to our test handler and waiting for a response | ||
incomingCancel() // simulate the incoming Kratos request having finished | ||
timeout := time.After(200 * time.Millisecond) | ||
for done := false; !done; { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if last := logHook.LastEntry(); last != nil { | ||
msg, err := last.String() | ||
require.NoError(t, err) | ||
assert.Contains(t, msg, "Dispatching webhook") | ||
} | ||
|
||
select { | ||
case <-timeout: | ||
done = true | ||
case <-time.After(50 * time.Millisecond): | ||
// continue loop | ||
} | ||
} | ||
logHook.Reset() | ||
close(blockHandlerOnExit) | ||
timeout = time.After(200 * time.Millisecond) | ||
for { | ||
if last := logHook.LastEntry(); last != nil { | ||
msg, err := last.String() | ||
require.NoError(t, err) | ||
assert.Contains(t, msg, "Webhook request succeeded") | ||
break | ||
} | ||
select { | ||
case <-timeout: | ||
t.Fatal("timed out waiting for successful webhook completion") | ||
case <-time.After(50 * time.Millisecond): | ||
// continue loop | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this would fix the issue, the underlying HTTP client may be canceling the request, not the context? It has a timeout of I think 10 seconds.
Using a background context is also a possible security issue. Please make sure that we're not confusing the context and name this, for example,
asyncContext
, and only use it in the correct execution branchThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is the context being passed to this function (
execute
) being canceled/released while the async webhook is still in flight.How is a background context a security issue?