From 5801ba9ffcea724e1aca3ffbe4b2eebe82c9b585 Mon Sep 17 00:00:00 2001 From: Matheus Pimenta Date: Tue, 6 Jun 2023 16:40:39 +0100 Subject: [PATCH] Add Provider for Google Pub/Sub Topic Signed-off-by: Matheus Pimenta Co-authored-by: Somtochi Onyekwere Co-authored-by: Max Jonas Werner --- api/v1beta2/provider_types.go | 3 +- ...ification.toolkit.fluxcd.io_providers.yaml | 1 + docs/spec/v1beta2/providers.md | 49 ++++++ go.mod | 16 ++ go.sum | 34 +++++ internal/controller/provider_controller.go | 9 +- internal/notifier/factory.go | 5 +- internal/notifier/google_pubsub_topic.go | 135 +++++++++++++++++ internal/notifier/google_pubsub_topic_test.go | 142 ++++++++++++++++++ internal/server/event_handlers.go | 11 +- internal/server/event_server.go | 29 +++- 11 files changed, 423 insertions(+), 11 deletions(-) create mode 100644 internal/notifier/google_pubsub_topic.go create mode 100644 internal/notifier/google_pubsub_topic_test.go diff --git a/api/v1beta2/provider_types.go b/api/v1beta2/provider_types.go index ba08b1e39..b8bee7515 100644 --- a/api/v1beta2/provider_types.go +++ b/api/v1beta2/provider_types.go @@ -39,6 +39,7 @@ const ( BitbucketProvider string = "bitbucket" AzureDevOpsProvider string = "azuredevops" GoogleChatProvider string = "googlechat" + GooglePubSubProvider string = "googlepubsub" WebexProvider string = "webex" SentryProvider string = "sentry" AzureEventHubProvider string = "azureeventhub" @@ -52,7 +53,7 @@ const ( // ProviderSpec defines the desired state of the Provider. type ProviderSpec struct { // Type specifies which Provider implementation to use. - // +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucket;azuredevops;googlechat;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch; + // +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucket;azuredevops;googlechat;googlepubsub;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch; // +required Type string `json:"type"` diff --git a/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml b/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml index 580aff1d4..31d6314ec 100644 --- a/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml +++ b/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml @@ -291,6 +291,7 @@ spec: - bitbucket - azuredevops - googlechat + - googlepubsub - webex - sentry - azureeventhub diff --git a/docs/spec/v1beta2/providers.md b/docs/spec/v1beta2/providers.md index 0b0f052ab..cce1d60f5 100644 --- a/docs/spec/v1beta2/providers.md +++ b/docs/spec/v1beta2/providers.md @@ -112,6 +112,7 @@ The supported alerting providers are: | [Discord](#discord) | `discord` | | [GitHub dispatch](#github-dispatch) | `githubdispatch` | | [Google Chat](#google-chat) | `googlechat` | +| [Google Pub/Sub](#google-pubsub) | `googlepubsub` | | [Grafana](#grafana) | `grafana` | | [Lark](#lark) | `lark` | | [Matrix](#matrix) | `matrix` | @@ -671,6 +672,54 @@ stringData: address: https://chat.googleapis.com/v1/spaces/... ``` +##### Google Pub/Sub + +When `.spec.type` is set to `googlepubsub`, the controller will publish the payload of +an [Event](events.md#event-structure) on the Google Pub/Sub Topic provided on the +[Channel](#channel) field of the Provider (the format must be +`projects//topics/`). + +This Provider type can optionally use the [Secret reference](#secret-reference) to +authenticate on the Google Pub/Sub API by using [JSON credentials](https://cloud.google.com/iam/docs/service-account-creds#key-types). +The credentials must be specified on [the `token`](#token-example) field of the Secret. + +If the Secret reference is not specified, then the automatic authentication methods of +the Google libraries will take place, and therefore methods like Workload Identity will +be automatically attempted. + +Either way, the identity effectively used for publishing messages on the Google +Pub/Sub Topic must have [the required permissions](https://cloud.google.com/iam/docs/understanding-roles#pubsub.publisher) on the topic. + +This Provider type does not support the configuration of a [proxy URL](#https-proxy) +or [TLS certificates](#tls-certificates). + +###### Google Pub/Sub JSON Credentials Example + +To configure a Provider for Google Pub/Sub authenticating with JSON credentials, create +a Secret with [the `token`](#token-example) set to the necessary JSON credentials, and a +`googlepubsub` Provider with the associated [Secret reference](#secret-reference). + +```yaml +--- +apiVersion: notification.toolkit.fluxcd.io/v1beta2 +kind: Provider +metadata: + name: googlepubsub-provider + namespace: desired-namespace +spec: + type: googlepubsub + secretRef: + name: googlepubsub-provider-creds +--- +apiVersion: v1 +kind: Secret +metadata: + name: googlepubsub-provider-creds + namespace: desired-namespace +stringData: + token: +``` + ##### Opsgenie When `.spec.type` is set to `opsgenie`, the controller will send a payload for diff --git a/go.mod b/go.mod index 488587f3d..8b43866a3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 replace github.com/fluxcd/notification-controller/api => ./api require ( + cloud.google.com/go/pubsub v1.30.0 code.gitea.io/sdk/gitea v0.15.1 github.com/AdaLogics/go-fuzz-headers v0.0.0-20230106234847-43070de90fa1 github.com/Azure/azure-amqp-common-go/v4 v4.1.0 @@ -31,6 +32,7 @@ require ( github.com/whilp/git-urls v1.0.0 github.com/xanzy/go-gitlab v0.83.0 golang.org/x/oauth2 v0.8.0 + google.golang.org/api v0.125.0 k8s.io/api v0.27.2 k8s.io/apimachinery v0.27.2 k8s.io/client-go v0.27.2 @@ -44,6 +46,11 @@ require ( replace gopkg.in/yaml.v3 => gopkg.in/yaml.v3 v3.0.1 require ( + cloud.google.com/go v0.110.0 // indirect + cloud.google.com/go/compute v1.19.3 // indirect + cloud.google.com/go/compute/metadata v0.2.3 // indirect + cloud.google.com/go/iam v1.0.1 // indirect + cloud.google.com/go/kms v1.11.0 // indirect github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect github.com/Azure/go-amqp v1.0.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect @@ -85,8 +92,11 @@ require ( github.com/google/go-cmp v0.5.9 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/google/s2a-go v0.1.4 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/uuid v1.3.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect + github.com/googleapis/gax-go/v2 v2.10.0 // indirect github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-version v1.6.0 // indirect @@ -118,18 +128,24 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/spf13/cobra v1.7.0 // indirect github.com/xlab/treeprint v1.2.0 // indirect + go.opencensus.io v0.24.0 // indirect go.starlark.net v0.0.0-20230302034142-4b1e35fe2254 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.9.0 // indirect golang.org/x/net v0.10.0 // indirect + golang.org/x/sync v0.2.0 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/term v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect golang.org/x/time v0.3.0 // indirect gomodules.xyz/jsonpatch/v2 v2.3.0 // indirect google.golang.org/appengine v1.6.7 // indirect + google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect + google.golang.org/grpc v1.55.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index f78d6add2..1ad0ecf1e 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w9 cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc= cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU= cloud.google.com/go v0.104.0/go.mod h1:OO6xxXdJyvuJPcEPBLN9BJPD+jep5G1+2U5B5gkRYtA= +cloud.google.com/go v0.110.0 h1:Zc8gqp3+a9/Eyph2KDmcGaPtbKRIoqq4YTlL4NMD0Ys= +cloud.google.com/go v0.110.0/go.mod h1:SJnCLqQ0FCFGSZMUNUf84MV3Aia54kn7pi8st7tMzaY= cloud.google.com/go/aiplatform v1.22.0/go.mod h1:ig5Nct50bZlzV6NvKaTwmplLLddFx0YReh9WfTO5jKw= cloud.google.com/go/aiplatform v1.24.0/go.mod h1:67UUvRBKG6GTayHKV8DBv2RtR1t93YRu5B1P3x99mYY= cloud.google.com/go/analytics v0.11.0/go.mod h1:DjEWCu41bVbYcKyvlws9Er60YE4a//bK6mnhWvQeFNI= @@ -70,9 +72,13 @@ cloud.google.com/go/compute v1.7.0/go.mod h1:435lt8av5oL9P3fv1OEzSbSUe+ybHXGMPQH cloud.google.com/go/compute v1.10.0/go.mod h1:ER5CLbMxl90o2jtNbGSbtfOpQKR0t15FOtRsugnLrlU= cloud.google.com/go/compute v1.12.0/go.mod h1:e8yNOBcBONZU1vJKCvCoDw/4JQsA0dpM4x/6PIIOocU= cloud.google.com/go/compute v1.12.1/go.mod h1:e8yNOBcBONZU1vJKCvCoDw/4JQsA0dpM4x/6PIIOocU= +cloud.google.com/go/compute v1.19.3 h1:DcTwsFgGev/wV5+q8o2fzgcHOaac+DKGC91ZlvpsQds= +cloud.google.com/go/compute v1.19.3/go.mod h1:qxvISKp/gYnXkSAD1ppcSOveRAmzxicEv/JlizULFrI= cloud.google.com/go/compute/metadata v0.1.0/go.mod h1:Z1VN+bulIf6bt4P/C37K4DyZYZEXYonfTBHHFPO/4UU= cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/containeranalysis v0.5.1/go.mod h1:1D92jd8gRR/c0fGMlymRgxWD3Qw9C1ff6/T7mLgVL8I= cloud.google.com/go/containeranalysis v0.6.0/go.mod h1:HEJoiEIu+lEXM+k7+qLCci0h33lX3ZqoYFdmPcoO7s4= cloud.google.com/go/datacatalog v1.3.0/go.mod h1:g9svFY6tuR+j+hrTw3J2dNcmI0dzmSiyOzm8kpLq0a0= @@ -110,6 +116,10 @@ cloud.google.com/go/gkehub v0.9.0/go.mod h1:WYHN6WG8w9bXU0hqNxt8rm5uxnk8IH+lPY9J cloud.google.com/go/gkehub v0.10.0/go.mod h1:UIPwxI0DsrpsVoWpLB0stwKCP+WFVG9+y977wO+hBH0= cloud.google.com/go/grafeas v0.2.0/go.mod h1:KhxgtF2hb0P191HlY5besjYm6MqTSTj3LSI+M+ByZHc= cloud.google.com/go/iam v0.3.0/go.mod h1:XzJPvDayI+9zsASAFO68Hk07u3z+f+JrT2xXNdp4bnY= +cloud.google.com/go/iam v1.0.1 h1:lyeCAU6jpnVNrE9zGQkTl3WgNgK/X+uWwaw0kynZJMU= +cloud.google.com/go/iam v1.0.1/go.mod h1:yR3tmSL8BcZB4bxByRv2jkSIahVmCtfKZwLYGBalRE8= +cloud.google.com/go/kms v1.11.0 h1:0LPJPKamw3xsVpkel1bDtK0vVJec3EyqdQOLitiD030= +cloud.google.com/go/kms v1.11.0/go.mod h1:hwdiYC0xjnWsKQQCQQmIQnS9asjYVSK6jtXm+zFqXLM= cloud.google.com/go/language v1.4.0/go.mod h1:F9dRpNFQmJbkaop6g0JhSBXCNlO90e1KWx5iDdxbWic= cloud.google.com/go/language v1.6.0/go.mod h1:6dJ8t3B+lUYfStgls25GusK04NLh3eDLQnWM3mdEbhI= cloud.google.com/go/lifesciences v0.5.0/go.mod h1:3oIKy8ycWGPUyZDR/8RNnTOYevhaMLqh5vLUXs9zvT8= @@ -138,6 +148,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= +cloud.google.com/go/pubsub v1.30.0 h1:vCge8m7aUKBJYOgrZp7EsNDf6QMd2CAlXZqWTn3yq6s= +cloud.google.com/go/pubsub v1.30.0/go.mod h1:qWi1OPS0B+b5L+Sg6Gmc9zD1Y+HaM0MdUr7LsupY1P4= cloud.google.com/go/recaptchaenterprise v1.3.1/go.mod h1:OdD+q+y4XGeAlxRaMn1Y7/GveP6zmq76byL6tjPE7d4= cloud.google.com/go/recaptchaenterprise/v2 v2.1.0/go.mod h1:w9yVqajwroDNTfGuhmOjPDN//rZGySaf6PtFVcSCa7o= cloud.google.com/go/recaptchaenterprise/v2 v2.2.0/go.mod h1:/Zu5jisWGeERrd5HnlS3EUGb/D335f9k51B/FVil0jk= @@ -457,6 +469,8 @@ github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc= +github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -466,6 +480,8 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99a87aa/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8= github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8= github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg= +github.com/googleapis/enterprise-certificate-proxy v0.2.3 h1:yk9/cqRKtT9wXZSsRH9aurXEpJX+U6FLtpYTdC3R06k= +github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= @@ -475,6 +491,8 @@ github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99 github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c= github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqEF02fYlzkUCyo= github.com/googleapis/gax-go/v2 v2.6.0/go.mod h1:1mjbznJAPHFpesgE5ucqfYEscaz5kMdcIDwU/6+DDoY= +github.com/googleapis/gax-go/v2 v2.10.0 h1:ebSgKfMxynOdxw8QQuFOKMgomqeLGPqNLQox2bo42zg= +github.com/googleapis/gax-go/v2 v2.10.0/go.mod h1:4UOEnMCrxsSqQ940WnTiD6qJ63le2ev3xfyagutxiPw= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -767,6 +785,8 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.starlark.net v0.0.0-20230302034142-4b1e35fe2254 h1:Ss6D3hLXTM0KobyBYEAygXzFfGcjnmfEJOBgSbemCtg= go.starlark.net v0.0.0-20230302034142-4b1e35fe2254/go.mod h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds= @@ -794,6 +814,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= @@ -944,6 +965,8 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= +golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1060,6 +1083,7 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= @@ -1192,6 +1216,8 @@ google.golang.org/api v0.97.0/go.mod h1:w7wJQLTM+wvQpNf5JyEcBoxK0RH7EDrh/L4qfsuJ google.golang.org/api v0.98.0/go.mod h1:w7wJQLTM+wvQpNf5JyEcBoxK0RH7EDrh/L4qfsuJ13s= google.golang.org/api v0.100.0/go.mod h1:ZE3Z2+ZOr87Rx7dqFsdRQkRBk36kDtp/h+QpHbB7a70= google.golang.org/api v0.102.0/go.mod h1:3VFl6/fzoA+qNuS1N1/VfXY4LjoXN/wzeIp7TweWwGo= +google.golang.org/api v0.125.0 h1:7xGvEY4fyWbhWMHf3R2/4w7L4fXyfpRGE9g6lp8+DCk= +google.golang.org/api v0.125.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -1304,6 +1330,12 @@ google.golang.org/genproto v0.0.0-20221010155953-15ba04fc1c0e/go.mod h1:3526vdqw google.golang.org/genproto v0.0.0-20221014173430-6e2ab493f96b/go.mod h1:1vXfmgAz9N9Jx0QA82PqRVauvCz1SGSz739p0f183jM= google.golang.org/genproto v0.0.0-20221014213838-99cd37c6964a/go.mod h1:1vXfmgAz9N9Jx0QA82PqRVauvCz1SGSz739p0f183jM= google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e/go.mod h1:9qHF0xnpdSfF6knlcsnpzUu5y+rpwgbvsyGAZPBMg4s= +google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc h1:8DyZCyvI8mE1IdLy/60bS+52xfymkE72wv1asokgtao= +google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64= +google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc h1:kVKPf/IiYSBWEWtkIn6wZXwWGCnLKcC8oWfZvXjsGnM= +google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -1340,6 +1372,8 @@ google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACu google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc v1.50.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= +google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= +google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/internal/controller/provider_controller.go b/internal/controller/provider_controller.go index 8c86a2534..3524e5155 100644 --- a/internal/controller/provider_controller.go +++ b/internal/controller/provider_controller.go @@ -20,6 +20,7 @@ import ( "context" "crypto/x509" "fmt" + "io" "net/url" "time" @@ -254,9 +255,15 @@ func (r *ProviderReconciler) validateCredentials(ctx context.Context, provider * } factory := notifier.NewFactory(address, proxy, username, provider.Spec.Channel, token, headers, certPool, password, string(provider.UID)) - if _, err := factory.Notifier(provider.Spec.Type); err != nil { + n, err := factory.Notifier(ctx, provider.Spec.Type) + if err != nil { return fmt.Errorf("failed to initialize provider, error: %w", err) } + if closer, ok := n.(io.Closer); ok { + if err := closer.Close(); err != nil { + return fmt.Errorf("failed to close provider: %w", err) + } + } return nil } diff --git a/internal/notifier/factory.go b/internal/notifier/factory.go index ec0fdd834..059a6b3fc 100644 --- a/internal/notifier/factory.go +++ b/internal/notifier/factory.go @@ -17,6 +17,7 @@ limitations under the License. package notifier import ( + "context" "crypto/x509" "fmt" @@ -57,7 +58,7 @@ func NewFactory(url string, } } -func (f Factory) Notifier(provider string) (Interface, error) { +func (f Factory) Notifier(ctx context.Context, provider string) (Interface, error) { if f.URL == "" { return &NopNotifier{}, nil } @@ -91,6 +92,8 @@ func (f Factory) Notifier(provider string) (Interface, error) { n, err = NewAzureDevOps(f.ProviderUID, f.URL, f.Token, f.CertPool) case apiv1.GoogleChatProvider: n, err = NewGoogleChat(f.URL, f.ProxyURL) + case apiv1.GooglePubSubProvider: + n, err = NewGooglePubSubTopic(ctx, f.Channel, []byte(f.Token)) case apiv1.WebexProvider: n, err = NewWebex(f.URL, f.ProxyURL, f.CertPool, f.Channel, f.Token) case apiv1.SentryProvider: diff --git a/internal/notifier/google_pubsub_topic.go b/internal/notifier/google_pubsub_topic.go new file mode 100644 index 000000000..55937c5d0 --- /dev/null +++ b/internal/notifier/google_pubsub_topic.go @@ -0,0 +1,135 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package notifier + +import ( + "context" + "encoding/json" + "fmt" + "regexp" + + "cloud.google.com/go/pubsub" + "google.golang.org/api/option" + "sigs.k8s.io/controller-runtime/pkg/log" + + eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" +) + +type ( + // GooglePubSubTopic holds a Google Pub/Sub client and target topic. + GooglePubSubTopic struct { + client interface { + Publish(ctx context.Context, topicID string, msg []byte) (serverID string, err error) + Close() error + } + topicName string + topicID string + } + + googlePubSubClient struct { + *pubsub.Client + } +) + +var ( + // ensure *GooglePubSubTopic implements Interface. + _ Interface = &GooglePubSubTopic{} + + googleTopicNameRegex = regexp.MustCompile(`^projects/(?P[^/]+)/topics/(?P[^/]+)$`) + googleTopicNameProjectIDIndex = googleTopicNameRegex.SubexpIndex("projectID") + googleTopicNameTopicIDIndex = googleTopicNameRegex.SubexpIndex("topicID") +) + +// NewGooglePubSubTopic creates a Google Pub/Sub client tied to a specific topic. +// The topicName must follow the format projects/projectID/topics/topicID, which +// can be found on the Google Pub/Sub Console webpage. The jsonCreds parameter +// is optional, and if len(jsonCreds) == 0 then the automatic authentication +// methods of the Google libraries will take place, and therefore methods like +// Workload Identity will be automatically attempted. +func NewGooglePubSubTopic(ctx context.Context, topicName string, jsonCreds []byte) (*GooglePubSubTopic, error) { + projectID, topicID, err := ParseGooglePubSubTopic(topicName) + if err != nil { + return nil, err + } + + var opts []option.ClientOption + if len(jsonCreds) > 0 { + opts = append(opts, option.WithCredentialsJSON(jsonCreds)) + } + client, err := pubsub.NewClient(ctx, projectID, opts...) + if err != nil { + return nil, fmt.Errorf("error creating google pubsub client for project id %s: %w", projectID, err) + } + + return &GooglePubSubTopic{ + client: googlePubSubClient{client}, + topicName: topicName, + topicID: topicID, + }, nil +} + +// Close closes the GooglePubSubTopic. +func (g *GooglePubSubTopic) Close() error { + return g.client.Close() +} + +// ParseGooglePubSubTopic tests whether topicName follows the pattern +// projects/projectID/topics/topicID and returns projectID and topicID. +func ParseGooglePubSubTopic(topicName string) (projectID, topicID string, err error) { + match := googleTopicNameRegex.FindStringSubmatch(topicName) + if match == nil { + return "", "", fmt.Errorf("topic name doesn't match 'projects//topics/': '%s'", topicName) + } + + projectID = match[googleTopicNameProjectIDIndex] + topicID = match[googleTopicNameTopicIDIndex] + return +} + +// Post posts Flux events to a Google Pub/Sub Topic. +func (g *GooglePubSubTopic) Post(ctx context.Context, event eventv1.Event) error { + // Skip Git commit status update event. + if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) { + return nil + } + + eventBytes, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("error json-marshaling event: %w", err) + } + + serverID, err := g.client.Publish(ctx, g.topicID, eventBytes) + if err != nil { + return fmt.Errorf("error publishing event to topic %s: %w", g.topicName, err) + } + + // debug log + log.FromContext(ctx).V(1).Info("Event published to Google Pub/Sub Topic", + "reconciler kind", event.InvolvedObject.Kind, + "name", event.InvolvedObject.Name, + "namespace", event.InvolvedObject.Namespace, + "topic", g.topicName, + "server message id", serverID) + + return nil +} + +func (g googlePubSubClient) Publish(ctx context.Context, topicID string, msg []byte) (serverID string, err error) { + return g.Topic(topicID). + Publish(ctx, &pubsub.Message{Data: msg}). + Get(ctx) +} diff --git a/internal/notifier/google_pubsub_topic_test.go b/internal/notifier/google_pubsub_topic_test.go new file mode 100644 index 000000000..422f0cedd --- /dev/null +++ b/internal/notifier/google_pubsub_topic_test.go @@ -0,0 +1,142 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package notifier + +import ( + "context" + "errors" + "fmt" + "testing" + + . "github.com/onsi/gomega" + + eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" +) + +type googlePubSubTestCase struct { + topicName string + topicID string + event eventv1.Event + err error + expectedMessage string + expectedErr error + + g *WithT +} + +func (tt *googlePubSubTestCase) Publish(ctx context.Context, topicID string, msg []byte) (serverID string, err error) { + tt.g.Expect(topicID).To(Equal(tt.topicID)) + tt.g.Expect(string(msg)).To(Equal(tt.expectedMessage)) + // serverID is only used in a debug log for now, there's no way to assert it + return "", tt.err +} + +func (tt *googlePubSubTestCase) Close() error { + return tt.err +} + +func TestParseGooglePubSubTopic(t *testing.T) { + for name, tt := range map[string]struct { + topicName string + expectedProjectID string + expectedTopicID string + expectedErr error + }{ + "invalid topic names return error": { + topicName: "invalid-topic-name", + expectedProjectID: "", + expectedTopicID: "", + expectedErr: errors.New("topic name doesn't match 'projects//topics/': 'invalid-topic-name'"), + }, + "project and topic ids are parsed from the topic name": { + topicName: "projects/parsedProjectID/topics/parsedTopicID", + expectedProjectID: "parsedProjectID", + expectedTopicID: "parsedTopicID", + expectedErr: nil, + }, + } { + t.Run(name, func(t *testing.T) { + g := NewWithT(t) + + projectID, topicID, err := ParseGooglePubSubTopic(tt.topicName) + g.Expect(projectID).To(Equal(tt.expectedProjectID)) + g.Expect(topicID).To(Equal(tt.expectedTopicID)) + if tt.expectedErr == nil { + g.Expect(err).To(BeNil()) + } else { + g.Expect(err).To(Equal(tt.expectedErr)) + } + }) + } +} + +func TestGooglePubSubTopicPost(t *testing.T) { + for name, tt := range map[string]*googlePubSubTestCase{ + "events are properly marshaled": { + topicID: "topicID", + event: eventv1.Event{ + Metadata: map[string]string{"foo": "bar"}, + }, + expectedMessage: `{"involvedObject":{},"severity":"","timestamp":null,"message":"","reason":"","metadata":{"foo":"bar"},"reportingController":""}`, + }, + "commit status updates are dropped": { + topicID: "topicID", + event: eventv1.Event{ + Metadata: map[string]string{"commit_status": "update"}, + }, + expectedMessage: "no assertion about the message should be made when dropped", + }, + "publish error is wrapped and relayed": { + topicName: "projects/projectID/topics/topicID", + topicID: "topicID", + err: errors.New("publish error"), + expectedMessage: `{"involvedObject":{},"severity":"","timestamp":null,"message":"","reason":"","reportingController":""}`, + expectedErr: fmt.Errorf("error publishing event to topic projects/projectID/topics/topicID: %w", errors.New("publish error")), + }, + } { + t.Run(name, func(t *testing.T) { + g := NewWithT(t) + tt.g = g + + topic := &GooglePubSubTopic{ + client: tt, + topicName: tt.topicName, + topicID: tt.topicID, + } + + err := topic.Post(context.Background(), tt.event) + if tt.expectedErr == nil { + g.Expect(err).To(BeNil()) + } else { + g.Expect(err).To(Equal(tt.expectedErr)) + } + }) + } +} + +func TestGooglePubSubTopicClose(t *testing.T) { + g := NewWithT(t) + + topic := &GooglePubSubTopic{ + client: &googlePubSubTestCase{ + err: errors.New("close error"), + }, + } + + err := topic.Close() + g.Expect(err).To(Equal(errors.New("close error"))) +} diff --git a/internal/server/event_handlers.go b/internal/server/event_handlers.go index c49081221..2341df58e 100644 --- a/internal/server/event_handlers.go +++ b/internal/server/event_handlers.go @@ -21,6 +21,7 @@ import ( "crypto/x509" "errors" "fmt" + "io" "net/http" "regexp" "time" @@ -243,7 +244,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) } factory := notifier.NewFactory(webhook, proxy, username, provider.Spec.Channel, token, headers, certPool, password, string(provider.UID)) - sender, err := factory.Notifier(provider.Spec.Type) + sender, err := factory.Notifier(ctx, provider.Spec.Type) if err != nil { s.logger.Error(err, "failed to initialize provider", "reconciler kind", apiv1beta2.ProviderKind, @@ -270,6 +271,14 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) "name", event.InvolvedObject.Name, "namespace", event.InvolvedObject.Namespace) } + if closer, ok := n.(io.Closer); ok { + if err := closer.Close(); err != nil { + s.logger.Error(err, "failed to close provider", + "reconciler kind", apiv1beta2.ProviderKind, + "name", providerName.Name, + "namespace", providerName.Namespace) + } + } }(sender, notification) } diff --git a/internal/server/event_server.go b/internal/server/event_server.go index 8f762732f..114d628b1 100644 --- a/internal/server/event_server.go +++ b/internal/server/event_server.go @@ -34,6 +34,7 @@ import ( "github.com/slok/go-http-metrics/middleware" "github.com/slok/go-http-metrics/middleware/std" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" ) @@ -68,8 +69,9 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid var handler http.Handler = http.HandlerFunc(s.handleEvent()) for _, middleware := range []func(http.Handler) http.Handler{ limitMiddleware.Handle, - s.logRateLimitMiddleware, - s.cleanupMetadataMiddleware, + logRateLimitMiddleware, + cleanupMetadataMiddleware, + s.loggerInjectorMiddleware, } { handler = middleware(handler) } @@ -100,14 +102,25 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid } } +// loggerInjectorMiddleware injects the server logger into the request context. +func (s *EventServer) loggerInjectorMiddleware(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctxWithLogger := log.IntoContext(r.Context(), s.logger) + reqWithLogger := r.WithContext(ctxWithLogger) + h.ServeHTTP(w, reqWithLogger) + }) +} + // cleanupMetadataMiddleware cleans up the metadata using cleanupMetadata() and // adds the cleaned event in the request context which can then be queried and // used directly by the other http handlers. -func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler { +func cleanupMetadataMiddleware(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logger := log.FromContext(r.Context()) + body, err := io.ReadAll(r.Body) if err != nil { - s.logger.Error(err, "reading the request body failed") + logger.Error(err, "reading the request body failed") w.WriteHeader(http.StatusBadRequest) return } @@ -117,7 +130,7 @@ func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler { event := &eventv1.Event{} err = json.Unmarshal(body, event) if err != nil { - s.logger.Error(err, "decoding the request body failed") + logger.Error(err, "decoding the request body failed") w.WriteHeader(http.StatusBadRequest) return } @@ -172,8 +185,10 @@ func (r *statusRecorder) WriteHeader(status int) { r.ResponseWriter.WriteHeader(status) } -func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler { +func logRateLimitMiddleware(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logger := log.FromContext(r.Context()) + recorder := &statusRecorder{ ResponseWriter: w, Status: 200, @@ -182,7 +197,7 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler { if recorder.Status == http.StatusTooManyRequests { event := r.Context().Value(eventContextKey{}).(*eventv1.Event) - s.logger.V(1).Info("Discarding event, rate limiting duplicate events", + logger.V(1).Info("Discarding event, rate limiting duplicate events", "reconciler kind", event.InvolvedObject.Kind, "name", event.InvolvedObject.Name, "namespace", event.InvolvedObject.Namespace)