diff --git a/go.mod b/go.mod index 792a796091..1cf6191863 100644 --- a/go.mod +++ b/go.mod @@ -21,10 +21,10 @@ require ( k8s.io/client-go v0.29.2 k8s.io/code-generator v0.29.2 k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 - knative.dev/eventing v0.41.1-0.20240523103154-74253dd1c57a - knative.dev/hack v0.0.0-20240507013718-68e3bfb39d11 - knative.dev/pkg v0.0.0-20240521083825-99e1685a7997 - knative.dev/reconciler-test v0.0.0-20240523103149-1351b601972d + knative.dev/eventing v0.41.1-0.20240531140411-c53b44d67c71 + knative.dev/hack v0.0.0-20240529131459-3b6d6441e7ea + knative.dev/pkg v0.0.0-20240602234151-229e527a1366 + knative.dev/reconciler-test v0.0.0-20240531150205-2723f53aa8ed sigs.k8s.io/controller-runtime v0.15.2 ) diff --git a/go.sum b/go.sum index 3328d3314d..23425b53f8 100644 --- a/go.sum +++ b/go.sum @@ -849,14 +849,14 @@ k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/A k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -knative.dev/eventing v0.41.1-0.20240523103154-74253dd1c57a h1:FTDl2yRQgnp/ZmBmU66igmMspkoN7Lf2WxDijIWyOqs= -knative.dev/eventing v0.41.1-0.20240523103154-74253dd1c57a/go.mod h1:IUBpMsDIfyV5836R9PN0/VyADgoDa9ykgz7gpvTseLw= -knative.dev/hack v0.0.0-20240507013718-68e3bfb39d11 h1:CYoD72R8/R35REjeY2nnWfBak+Q3f+NxXwEfwcID1eU= -knative.dev/hack v0.0.0-20240507013718-68e3bfb39d11/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q= -knative.dev/pkg v0.0.0-20240521083825-99e1685a7997 h1:MN/u8ivos957WdihUjKfFw1Qv9HZ3+pe4TOwdYQiB2Q= -knative.dev/pkg v0.0.0-20240521083825-99e1685a7997/go.mod h1:GHFUP1wtD/bR/c02QADqaAK3odDJh1ddBMvXhq/goy8= -knative.dev/reconciler-test v0.0.0-20240523103149-1351b601972d h1:MhO1bvabteY0VjkS2MR0S6+VHCT+DByTIGTLodRBV48= -knative.dev/reconciler-test v0.0.0-20240523103149-1351b601972d/go.mod h1:IhLACQFqZDDTj34bY3Nte9tWWqc8KyahohdRGyeayRM= +knative.dev/eventing v0.41.1-0.20240531140411-c53b44d67c71 h1:viivu+pEwuYFcnElQ9/DB3uXbmZAbI9N+pj2twc1pGw= +knative.dev/eventing v0.41.1-0.20240531140411-c53b44d67c71/go.mod h1:toAuiWQiMPHgWcXNYocPhMNx8xYFbjQsupEiQdPT83o= +knative.dev/hack v0.0.0-20240529131459-3b6d6441e7ea h1:iWW6SNMrVd2hI5Y+ltKIEzXVedoQLL86b23dS5fkvXs= +knative.dev/hack v0.0.0-20240529131459-3b6d6441e7ea/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q= +knative.dev/pkg v0.0.0-20240602234151-229e527a1366 h1:dUkGourt1Xva2lS+ffO/mpoIPGEOkzOk5ALF3OtEK1U= +knative.dev/pkg v0.0.0-20240602234151-229e527a1366/go.mod h1:GHFUP1wtD/bR/c02QADqaAK3odDJh1ddBMvXhq/goy8= +knative.dev/reconciler-test v0.0.0-20240531150205-2723f53aa8ed h1:H9kEW1TgWsbx+dQKWyv7HA0LVUnCOXMeHmTerplGGCk= +knative.dev/reconciler-test v0.0.0-20240531150205-2723f53aa8ed/go.mod h1:uxJT+sJfxS+oZiC7PGWX5YFEUiQkRTUPg5YoyJrvNqs= pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw= pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= diff --git a/vendor/knative.dev/eventing/pkg/apis/eventing/v1beta2/eventtype_conversion.go b/vendor/knative.dev/eventing/pkg/apis/eventing/v1beta2/eventtype_conversion.go index 2bdd421a5c..6212e245df 100644 --- a/vendor/knative.dev/eventing/pkg/apis/eventing/v1beta2/eventtype_conversion.go +++ b/vendor/knative.dev/eventing/pkg/apis/eventing/v1beta2/eventtype_conversion.go @@ -18,17 +18,86 @@ package v1beta2 import ( "context" - "fmt" "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + + eventing "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/apis/eventing/v1beta3" ) -// ConvertTo implements apis.Convertible +// ConvertTo converts the receiver into `to`. func (source *EventType) ConvertTo(ctx context.Context, to apis.Convertible) error { - return fmt.Errorf("v1beta2 is the highest known version, got: %T", to) + switch sink := to.(type) { + case *v1beta3.EventType: + + source.ObjectMeta.DeepCopyInto(&sink.ObjectMeta) + source.Status.Status.DeepCopyInto(&sink.Status.Status) + + sink.Spec.Reference = source.Spec.Reference.DeepCopy() + sink.Spec.Description = source.Spec.Description + + if source.Spec.Reference == nil && source.Spec.Broker != "" { + source.Spec.Reference = &duckv1.KReference{ + Kind: "Broker", + Name: source.Spec.Broker, + APIVersion: eventing.SchemeGroupVersion.String(), + } + } + + sink.Spec.Attributes = []v1beta3.EventAttributeDefinition{} + if source.Spec.Type != "" { + sink.Spec.Attributes = append(sink.Spec.Attributes, v1beta3.EventAttributeDefinition{ + Name: "type", + Required: true, + Value: source.Spec.Type, + }) + } + if source.Spec.Schema != nil { + sink.Spec.Attributes = append(sink.Spec.Attributes, v1beta3.EventAttributeDefinition{ + Name: "schemadata", + Required: false, + Value: source.Spec.Schema.String(), + }) + } + if source.Spec.Source != nil { + sink.Spec.Attributes = append(sink.Spec.Attributes, v1beta3.EventAttributeDefinition{ + Name: "source", + Required: true, + Value: source.Spec.Source.String(), + }) + } + return nil + default: + return apis.ConvertToViaProxy(ctx, source, &v1beta3.EventType{}, to) + } + } // ConvertFrom implements apis.Convertible func (sink *EventType) ConvertFrom(ctx context.Context, from apis.Convertible) error { - return fmt.Errorf("v1beta2 is the highest known version, got: %T", from) + switch source := from.(type) { + case *v1beta3.EventType: + + source.ObjectMeta.DeepCopyInto(&sink.ObjectMeta) + source.Status.Status.DeepCopyInto(&sink.Status.Status) + + sink.Spec.Reference = source.Spec.Reference.DeepCopy() + sink.Spec.Description = source.Spec.Description + + for _, at := range source.Spec.Attributes { + switch at.Name { + case "source": + sink.Spec.Source, _ = apis.ParseURL(at.Value) + case "type": + sink.Spec.Type = at.Value + case "schemadata": + sink.Spec.Schema, _ = apis.ParseURL(at.Value) + } + } + + return nil + default: + return apis.ConvertFromViaProxy(ctx, from, &v1beta3.EventType{}, sink) + } } diff --git a/vendor/knative.dev/eventing/pkg/kncloudevents/event_dispatcher.go b/vendor/knative.dev/eventing/pkg/kncloudevents/event_dispatcher.go index 877a206757..a9c3bd5cd4 100644 --- a/vendor/knative.dev/eventing/pkg/kncloudevents/event_dispatcher.go +++ b/vendor/knative.dev/eventing/pkg/kncloudevents/event_dispatcher.go @@ -289,11 +289,18 @@ func (d *Dispatcher) send(ctx context.Context, message binding.Message, destinat } func (d *Dispatcher) executeRequest(ctx context.Context, target duckv1.Addressable, message cloudevents.Message, additionalHeaders http.Header, retryConfig *RetryConfig, oidcServiceAccount *types.NamespacedName, transformers ...binding.Transformer) (context.Context, cloudevents.Message, *DispatchInfo, error) { + var scheme string + if target.URL != nil { + scheme = target.URL.Scheme + } else { + // assume that the scheme is http by default + scheme = "http" + } dispatchInfo := DispatchInfo{ Duration: NoDuration, ResponseCode: NoResponse, ResponseHeader: make(http.Header), - Scheme: target.URL.Scheme, + Scheme: scheme, } ctx, span := trace.StartSpan(ctx, "knative.dev", trace.WithSpanKind(trace.SpanKindClient)) diff --git a/vendor/knative.dev/pkg/network/network.go b/vendor/knative.dev/pkg/network/network.go index cf5feaa8fa..87b2b7acd8 100644 --- a/vendor/knative.dev/pkg/network/network.go +++ b/vendor/knative.dev/pkg/network/network.go @@ -59,11 +59,12 @@ const ( // KubeletProbeHeaderName is the header name to augment the probes, because // Istio with mTLS rewrites probes, but their probes pass a different // user-agent. + // + // Deprecated: use knative.dev/networking/pkg/http/header.UserAgentKey KubeletProbeHeaderName = "K-Kubelet-Probe" ) // IsKubeletProbe returns true if the request is a Kubernetes probe. func IsKubeletProbe(r *http.Request) bool { - return strings.HasPrefix(r.Header.Get(UserAgentKey), KubeProbeUAPrefix) || - r.Header.Get(KubeletProbeHeaderName) != "" + return strings.HasPrefix(r.Header.Get(UserAgentKey), KubeProbeUAPrefix) } diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/forwarder/forwarder.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/forwarder/forwarder.go index 3c0cfc5b96..4be51a17b4 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/forwarder/forwarder.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/forwarder/forwarder.go @@ -19,11 +19,13 @@ package forwarder import ( "bytes" "context" + "encoding/json" "fmt" "io" "io/ioutil" "net/http" "net/url" + "os" "strings" "time" @@ -51,6 +53,9 @@ type Forwarder struct { // Sink Sink string + // FromFiles allows forwarding JSON-formatted events that are present on specified files (comma separated list of file paths) + FromFiles string + // EventLogs is the list of EventLogger implementors to vent observed events. EventLogs *eventshub.EventLogs @@ -69,6 +74,9 @@ type envConfig struct { // Sink url for the message destination Sink string `envconfig:"SINK" required:"true"` + + // FromFiles allows forwarding JSON-formatted events that are present on specified files (comma separated list of file paths) + FromFiles string `envconfig:"FROM_FILES" required:"false"` } func NewFromEnv(ctx context.Context, eventLogs *eventshub.EventLogs, handlerFuncs []eventshub.HandlerFunc, clientOpts []eventshub.ClientOption) *Forwarder { @@ -83,6 +91,7 @@ func NewFromEnv(ctx context.Context, eventLogs *eventshub.EventLogs, handlerFunc Name: env.Name, Namespace: env.Namespace, Sink: env.Sink, + FromFiles: env.FromFiles, EventLogs: eventLogs, ctx: ctx, handlerFuncs: handlerFuncs, @@ -106,6 +115,11 @@ func (o *Forwarder) Start(ctx context.Context) error { handler = dec(handler) } + if o.FromFiles != "" { + o.forwardFromFiles() + return nil + } + server := &http.Server{Addr: ":8080", Handler: handler} var err error @@ -277,3 +291,67 @@ func (o *Forwarder) responseInfo(res *http.Response, event *cloudevents.Event) e } return responseInfo } + +func (o *Forwarder) forwardFromFile(f string) { + b, err := os.ReadFile(f) + if err != nil { + logging.FromContext(o.ctx).Fatalw("Failed to read the file", + zap.String("file", f), + zap.Error(err)) + } + + event := &cloudevents.Event{} + if err := json.Unmarshal(b, event); err != nil { + logging.FromContext(o.ctx).Fatalw("Failed to unmarshal the event", + zap.String("file", f), + zap.Error(err)) + } + + eventInfo := eventshub.EventInfo{ + Error: err.Error(), + Event: event, + Observer: o.Name, + Origin: f, + Time: time.Now(), + Kind: eventshub.EventSent, + } + + // Log the event that is being forwarded + if err := o.EventLogs.Vent(eventInfo); err != nil { + logging.FromContext(o.ctx).Fatalw("Failed to vent the event", + zap.String("file", f), + zap.Error(err)) + } + + req, err := http.NewRequest("POST", o.Sink, nil) + if err != nil { + logging.FromContext(o.ctx).Fatalw("Failed to create the client request", + zap.String("file", f), + zap.Error(err)) + } + if err := cehttp.WriteRequest(context.Background(), cloudevents.ToMessage(event), req); err != nil { + logging.FromContext(o.ctx).Fatalw("Failed to write the client request", + zap.String("file", f), + zap.Error(err)) + } + + res, err := o.httpClient.Do(req) + if err != nil { + logging.FromContext(o.ctx).Fatalw("Failed to forward the event", + zap.String("file", f), + zap.Error(err)) + } + + // Vent the response info + if err := o.EventLogs.Vent(o.responseInfo(res, event)); err != nil { + logging.FromContext(o.ctx).Errorw("Failed to log response for forwarded event", + zap.String("file", f), + zap.Error(err)) + } +} + +func (o *Forwarder) forwardFromFiles() { + for _, f := range strings.Split(o.FromFiles, ",") { + o.forwardFromFile(f) + } +} diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go index 6693aa0ea5..ec641f7403 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go @@ -81,7 +81,16 @@ func StartSenderTLS(sinkSvc string, caCerts *string) EventsHubOption { // This can be used together with InputEvent, AddTracing, EnableIncrementalId, InputEncoding and InputHeader options func StartSenderToResource(gvr schema.GroupVersionResource, name string) EventsHubOption { return func(ctx context.Context, envs map[string]string) error { - u, err := k8s.Address(ctx, gvr, name) + env := environment.FromContext(ctx) + return StartSenderToNamespacedResource(gvr, name, env.Namespace())(ctx, envs) + } +} + +// StartSenderToNamespacedResource starts the sender in the eventshub pointing to the provided resource +// This can be used together with InputEvent, AddTracing, EnableIncrementalId, InputEncoding and InputHeader options +func StartSenderToNamespacedResource(gvr schema.GroupVersionResource, name, namespace string) EventsHubOption { + return func(ctx context.Context, envs map[string]string) error { + u, err := k8s.NamespacedAddress(ctx, gvr, name, namespace) if err != nil { return err } @@ -102,7 +111,17 @@ func StartSenderToResource(gvr schema.GroupVersionResource, name string) EventsH // This can be used together with InputEvent, AddTracing, EnableIncrementalId, InputEncoding and InputHeader options func StartSenderToResourceTLS(gvr schema.GroupVersionResource, name string, caCerts *string) EventsHubOption { return func(ctx context.Context, m map[string]string) error { - u, err := k8s.Address(ctx, gvr, name) + env := environment.FromContext(ctx) + return StartSenderToNamespacedResourceTLS(gvr, name, env.Namespace(), caCerts)(ctx, m) + } +} + +// StartSenderToNamespacedResourceTLS starts the sender in the eventshub pointing to the provided namespaced resource. +// `caCerts` parameter is optional, if nil, it will fall back to use the addressable CA certs. +// This can be used together with InputEvent, AddTracing, EnableIncrementalId, InputEncoding and InputHeader options +func StartSenderToNamespacedResourceTLS(gvr schema.GroupVersionResource, name, namespace string, caCerts *string) EventsHubOption { + return func(ctx context.Context, m map[string]string) error { + u, err := k8s.NamespacedAddress(ctx, gvr, name, namespace) if err != nil { return err } diff --git a/vendor/knative.dev/reconciler-test/pkg/k8s/steps.go b/vendor/knative.dev/reconciler-test/pkg/k8s/steps.go index 86d27ed065..42f1f776c1 100644 --- a/vendor/knative.dev/reconciler-test/pkg/k8s/steps.go +++ b/vendor/knative.dev/reconciler-test/pkg/k8s/steps.go @@ -95,16 +95,21 @@ func IsAddressable(gvr schema.GroupVersionResource, name string, timing ...time. // resource is found but not Addressable, Address will return (nil, nil). func Address(ctx context.Context, gvr schema.GroupVersionResource, name string) (*duckv1.Addressable, error) { env := environment.FromContext(ctx) + return NamespacedAddress(ctx, gvr, name, env.Namespace()) +} +// NamespacedAddress attempts to resolve an Addressable address in a specific namespace into a URL. If the +// resource is found but not Addressable, Address will return (nil, nil). +func NamespacedAddress(ctx context.Context, gvr schema.GroupVersionResource, name, namespace string) (*duckv1.Addressable, error) { // Special case Service. if gvr.Group == "" && gvr.Version == "v1" && gvr.Resource == "services" { - u := "http://" + network.GetServiceHostname(name, env.Namespace()) + u := "http://" + network.GetServiceHostname(name, namespace) url, err := apis.ParseURL(u) return &duckv1.Addressable{URL: url}, err } like := &duckv1.AddressableType{} - us, err := dynamicclient.Get(ctx).Resource(gvr).Namespace(env.Namespace()).Get(ctx, name, metav1.GetOptions{}) + us, err := dynamicclient.Get(ctx).Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { return nil, err } diff --git a/vendor/modules.txt b/vendor/modules.txt index a97b6e0c60..085c132922 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1073,7 +1073,7 @@ k8s.io/utils/pointer k8s.io/utils/ptr k8s.io/utils/strings/slices k8s.io/utils/trace -# knative.dev/eventing v0.41.1-0.20240523103154-74253dd1c57a +# knative.dev/eventing v0.41.1-0.20240531140411-c53b44d67c71 ## explicit; go 1.21 knative.dev/eventing/cmd/heartbeats knative.dev/eventing/pkg/adapter/v2 @@ -1192,10 +1192,10 @@ knative.dev/eventing/test/test_images knative.dev/eventing/test/test_images/event-sender knative.dev/eventing/test/test_images/performance knative.dev/eventing/test/test_images/print -# knative.dev/hack v0.0.0-20240507013718-68e3bfb39d11 +# knative.dev/hack v0.0.0-20240529131459-3b6d6441e7ea ## explicit; go 1.18 knative.dev/hack -# knative.dev/pkg v0.0.0-20240521083825-99e1685a7997 +# knative.dev/pkg v0.0.0-20240602234151-229e527a1366 ## explicit; go 1.21 knative.dev/pkg/apis knative.dev/pkg/apis/duck @@ -1302,7 +1302,7 @@ knative.dev/pkg/webhook/json knative.dev/pkg/webhook/resourcesemantics knative.dev/pkg/webhook/resourcesemantics/defaulting knative.dev/pkg/webhook/resourcesemantics/validation -# knative.dev/reconciler-test v0.0.0-20240523103149-1351b601972d +# knative.dev/reconciler-test v0.0.0-20240531150205-2723f53aa8ed ## explicit; go 1.21 knative.dev/reconciler-test/cmd/eventshub knative.dev/reconciler-test/pkg/environment