Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add publisher confirms to ingress #543

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions cmd/ingress/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
Copyright 2021 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"log"
"sync"

amqp "github.com/rabbitmq/amqp091-go"
)

type Channel struct {
channel *amqp.Channel
counter uint64
mutex sync.Mutex
confirmsMap sync.Map
pconfirms_chan chan amqp.Confirmation
}

func openChannel(conn *amqp.Connection) *Channel {
a_channel, err := conn.Channel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not go variable naming style. use aChan or amqpChan for example

if err != nil {
log.Fatalf("failed to open a channel: %s", err)
}

channel := &Channel{channel: a_channel, counter: 1, pconfirms_chan: make(chan amqp.Confirmation)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use unbuffered channel for confirmation. docs for confirmation channel in NotifyPublish state it should be buffered to accomodate for multiple in-flights, otherwise risking deadlocks. Could we run into this scenario with an unbuffered channel?


go channel.confirmsHandler()

a_channel.NotifyPublish(channel.pconfirms_chan)
// noWait is false
if err := a_channel.Confirm(false); err != nil {
log.Fatalf("faild to switch connection channel to confirm mode: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return err instead of calling fatal so you get central err handling in main/run

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo faild

}

return channel
}

func (channel *Channel) publish(exchange string, headers amqp.Table, bytes []byte) (chan amqp.Confirmation, error) {
channel.mutex.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This basically means that every message published will lock the particular channel, i.e. serialize the logic and constrain parallelism to the number of defined channels, correct?

defer channel.mutex.Unlock()

cchan := make(chan amqp.Confirmation, 1)

channel.confirmsMap.Store(channel.counter, cchan)

if err := channel.channel.Publish(
exchange,
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: headers,
ContentType: "application/json",
Body: bytes,
}); err != nil {
channel.confirmsMap.LoadAndDelete(channel.counter)
return nil, err
}

channel.counter++

return cchan, nil
}

func (channel *Channel) confirmsHandler() func() error {
return func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to make this an anonymous function, just have this be the signature of confirmsHandler

for confirm := range channel.pconfirms_chan {
channel.handleConfirm(confirm)
}
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a log debug line could not harm here to understand that the chan was closed, e.g. due to underlying RMQ channel closed

}
}

func (channel *Channel) handleConfirm(confirm amqp.Confirmation) {
// NOTE: current golang driver resequences confirms and sends them one-by-one
value, present := channel.confirmsMap.LoadAndDelete(confirm.DeliveryTag)

if !present {
// really die?
log.Fatalf("Received confirm for unknown send. DeliveryTag: %d", confirm.DeliveryTag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would Warn but not Fatal here, this shouldn't be possible unless there's a bug in RabbitMQ

}

ch := value.(chan amqp.Confirmation)

ch <- confirm
}

func (channel *Channel) Close() {
// should I also stop confirmHandler here?
// mb sending kill switch to channel.pconfirms_chan?
channel.channel.Close()
}
96 changes: 66 additions & 30 deletions cmd/ingress/main.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
/*
Copyright 2020 The Knative Authors
Copyright 2020 The Knative Authors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove these whitespace changes


Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"sync/atomic"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -36,34 +38,44 @@ import (
const (
defaultMaxIdleConnections = 1000
defaultMaxIdleConnectionsPerHost = 1000
defaultMaxAmqpChannels = 10
)

type envConfig struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we create a separate ingress server object and use envConfig only for what it actually does? It's a bit confusing in the code and method signatures

Port int `envconfig:"PORT" default:"8080"`
BrokerURL string `envconfig:"BROKER_URL" required:"true"`
ExchangeName string `envconfig:"EXCHANGE_NAME" required:"true"`

channel *amqp.Channel
logger *zap.SugaredLogger
conn *amqp.Connection
channels []*Channel
logger *zap.SugaredLogger

rc uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These variable names could be a little more clear

cc int
}

func main() {
var env envConfig

var err error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If envConfig is separated from the server implementation, this line is not needed anymore (and addresses err shadowing in line 62)


if err := envconfig.Process("", &env); err != nil {
log.Fatal("Failed to process env var", zap.Error(err))
}

conn, err := amqp.Dial(env.BrokerURL)
env.conn, err = amqp.Dial(env.BrokerURL)

if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use the pkg logger (move its construction up so you can use it) and thus allow for structured logging (see also other places where we still use log.)

}
defer conn.Close()
defer env.conn.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd at least log the error here or use the run pattern where main only does initialization logic, e.g. env vars, and then calls a blocking run(context.Context) error on the ingress server. defer can then be handled inside run and any error easily returned to main if needed.


env.channel, err = conn.Channel()
if err != nil {
log.Fatalf("failed to open a channel: %s", err)
}
defer env.channel.Close()
// TODO: get it from annotation
env.cc = defaultMaxAmqpChannels

env.openAmqpChannels()

defer env.closeAmqpChannels()

env.logger = logging.FromContext(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use https://pkg.go.dev/knative.dev/pkg/signals to handle SIGTERM etc


Expand Down Expand Up @@ -114,6 +126,7 @@ func (env *envConfig) ServeHTTP(writer http.ResponseWriter, request *http.Reques
return
}

// send to RabbitMQ
statusCode, err := env.send(event)
if err != nil {
env.logger.Error("failed to send event,", err)
Expand All @@ -134,17 +147,40 @@ func (env *envConfig) send(event *cloudevents.Event) (int, error) {
for key, val := range event.Extensions() {
headers[key] = val
}
if err := env.channel.Publish(
env.ExchangeName,
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: headers,
ContentType: "application/json",
Body: bytes,
}); err != nil {

ch, err := env.sendMessage(headers, bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: confirmCh


if err != nil {
return http.StatusInternalServerError, fmt.Errorf("failed to publish message")
}
return http.StatusAccepted, nil

confirmation := <-ch

if confirmation.Ack {
return http.StatusAccepted, nil
} else {
return http.StatusServiceUnavailable, errors.New("message was not confirmed")
}
}

func (env *envConfig) sendMessage(headers amqp.Table, bytes []byte) (chan amqp.Confirmation, error) {
crc := atomic.AddUint64(&env.rc, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this means the first message will bump counter to 1 and hence env.channels[0] would never be used? if so, you might change your slice indexing in line 175 accordingly


channel := env.channels[crc%uint64(env.cc)]
Comment on lines +167 to +169
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a Go channel and have all these Channels select on it rather than trying to do this load balancing logic


return channel.publish(env.ExchangeName, headers, bytes)
}

func (env *envConfig) openAmqpChannels() {
env.channels = make([]*Channel, env.cc)

for i := 0; i < env.cc; i++ {
env.channels[i] = openChannel(env.conn)
}
}

func (env *envConfig) closeAmqpChannels() {
for i := 0; i < len(env.channels); i++ {
env.channels[i].Close()
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/michaelklishin/rabbit-hole/v2 v2.11.0
github.com/pkg/errors v0.9.1
github.com/rabbitmq/amqp091-go v1.1.0
github.com/rabbitmq/amqp091-go v1.2.0
github.com/rabbitmq/cluster-operator v1.10.0
github.com/rabbitmq/messaging-topology-operator v1.2.2-0.20211112104201-5a7530880441
github.com/testcontainers/testcontainers-go v0.11.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1197,8 +1197,9 @@ github.com/prometheus/statsd_exporter v0.21.0/go.mod h1:rbT83sZq2V+p73lHhPZfMc3M
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/pseudomuto/protoc-gen-doc v1.4.1/go.mod h1:exDTOVwqpp30eV/EDPFLZy3Pwr2sn6hBC1WIYH/UbIg=
github.com/pseudomuto/protokit v0.2.0/go.mod h1:2PdH30hxVHsup8KpBTOXTBeMVhJZVio3Q8ViKSAXT0Q=
github.com/rabbitmq/amqp091-go v1.1.0 h1:qx8cGMJha71/5t31Z+LdPLdPrkj/BvD38cqC3Bi1pNI=
github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rabbitmq/amqp091-go v1.2.0 h1:1pHBxAsQh54R9eX/xo679fUEAfv3loMqi0pvRFOj2nk=
github.com/rabbitmq/amqp091-go v1.2.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rabbitmq/cluster-operator v1.8.3/go.mod h1:WSi19XiNTc4rC8FEyfhg8nHKlMZ45VMpA3eAzUaWheY=
github.com/rabbitmq/cluster-operator v1.10.0 h1:n2TxmRvimouxq8JO/XKgIMPAlrokVVtNcnx9/Iz81RY=
github.com/rabbitmq/cluster-operator v1.10.0/go.mod h1:uU/gmMu6rmDc21vGRZNMjMttbCl5UQVEvDO7C6sD1k8=
Expand Down Expand Up @@ -2218,7 +2219,6 @@ knative.dev/pkg v0.0.0-20211101212339-96c0204a70dc/go.mod h1:SkfDk9bWIiNZD7XtILG
knative.dev/pkg v0.0.0-20211129195804-438776b3c87c/go.mod h1:AKPae1Cmj+k0GWXWnF2tKY7q5qPa1mTD7oCP4OeMvEM=
knative.dev/pkg v0.0.0-20211202025645-0c482f157959 h1:MLtwkPFGEM6IUGFFBWX0Nvon494culUqZJ3JLKy4dps=
knative.dev/pkg v0.0.0-20211202025645-0c482f157959/go.mod h1:AKPae1Cmj+k0GWXWnF2tKY7q5qPa1mTD7oCP4OeMvEM=
knative.dev/reconciler-test v0.0.0-20211112132636-ae9e2e21972f h1:bSdsp5av09LqSF4iXo8LCCDEENJ+JEbzzdgHBoJdf2w=
knative.dev/reconciler-test v0.0.0-20211112132636-ae9e2e21972f/go.mod h1:gTsbLk496j/M9xqMpx/liyCQ0X3bwDpRtcs2Zzws364=
knative.dev/reconciler-test v0.0.0-20211206091826-80584b570590 h1:2J8y/1mKLRUgzlqz/1MDqDwcIph6bsPx2+/nMRivCok=
knative.dev/reconciler-test v0.0.0-20211206091826-80584b570590/go.mod h1:gTsbLk496j/M9xqMpx/liyCQ0X3bwDpRtcs2Zzws364=
Expand Down
8 changes: 1 addition & 7 deletions vendor/github.com/rabbitmq/amqp091-go/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions vendor/github.com/rabbitmq/amqp091-go/allocator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions vendor/github.com/rabbitmq/amqp091-go/auth.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions vendor/github.com/rabbitmq/amqp091-go/channel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading