-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add publisher confirms to ingress #543
Conversation
@ikvmw: The label(s) In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: ikvmw The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Codecov Report
@@ Coverage Diff @@
## main #543 +/- ##
=======================================
Coverage 75.78% 75.78%
=======================================
Files 47 47
Lines 2783 2783
=======================================
Hits 2109 2109
Misses 549 549
Partials 125 125 Continue to review full report at Codecov.
|
@ikvmw: PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to step back and improve the way the amqp library handles confirms rather than working around it like this. It's pretty hard to reason about this solution with all the levels of concurrency that we're working with here.
I am thinking something like if channel.Publish
returned a promise-type object that we would be able to push a lot of this complexity into the library and hide it from users.
@@ -1,27 +1,29 @@ | |||
/* | |||
Copyright 2020 The Knative Authors | |||
Copyright 2020 The Knative Authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove these whitespace changes
channels []*Channel | ||
logger *zap.SugaredLogger | ||
|
||
rc uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These variable names could be a little more clear
crc := atomic.AddUint64(&env.rc, 1) | ||
|
||
channel := env.channels[crc%uint64(env.cc)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use a Go channel and have all these Channels select on it rather than trying to do this load balancing logic
} | ||
|
||
func (channel *Channel) confirmsHandler() func() error { | ||
return func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need to make this an anonymous function, just have this be the signature of confirmsHandler
|
||
if !present { | ||
// really die? | ||
log.Fatalf("Received confirm for unknown send. DeliveryTag: %d", confirm.DeliveryTag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would Warn
but not Fatal
here, this shouldn't be possible unless there's a bug in RabbitMQ
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I found the card hard to review for correctness due to the concurrency and non-transactional locking at play. I also don't see how this code is more performant than the proposed sync.Pool take/return Channel proposed by @bmoss.
Can you please explain why this complexity is justified? Perhaps I'm not understanding the optimizations at play.
@@ -36,34 +38,44 @@ import ( | |||
const ( | |||
defaultMaxIdleConnections = 1000 | |||
defaultMaxIdleConnectionsPerHost = 1000 | |||
defaultMaxAmqpChannels = 10 | |||
) | |||
|
|||
type envConfig struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we create a separate ingress server object and use envConfig
only for what it actually does? It's a bit confusing in the code and method signatures
} | ||
|
||
func main() { | ||
var env envConfig | ||
|
||
var err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If envConfig
is separated from the server implementation, this line is not needed anymore (and addresses err
shadowing in line 62)
if err != nil { | ||
log.Fatalf("failed to connect to RabbitMQ: %s", err) | ||
} | ||
defer conn.Close() | ||
defer env.conn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd at least log the error
here or use the run
pattern where main
only does initialization logic, e.g. env vars, and then calls a blocking run(context.Context) error
on the ingress server. defer
can then be handled inside run
and any error easily returned to main
if needed.
|
||
env.openAmqpChannels() | ||
|
||
defer env.closeAmqpChannels() | ||
|
||
env.logger = logging.FromContext(context.Background()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could use https://pkg.go.dev/knative.dev/pkg/signals
to handle SIGTERM etc
if err := envconfig.Process("", &env); err != nil { | ||
log.Fatal("Failed to process env var", zap.Error(err)) | ||
} | ||
|
||
conn, err := amqp.Dial(env.BrokerURL) | ||
env.conn, err = amqp.Dial(env.BrokerURL) | ||
|
||
if err != nil { | ||
log.Fatalf("failed to connect to RabbitMQ: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use the pkg logger (move its construction up so you can use it) and thus allow for structured logging (see also other places where we still use log.)
log.Fatalf("failed to open a channel: %s", err) | ||
} | ||
|
||
channel := &Channel{channel: a_channel, counter: 1, pconfirms_chan: make(chan amqp.Confirmation)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use unbuffered channel for confirmation. docs for confirmation channel in NotifyPublish state it should be buffered to accomodate for multiple in-flights, otherwise risking deadlocks. Could we run into this scenario with an unbuffered channel?
for confirm := range channel.pconfirms_chan { | ||
channel.handleConfirm(confirm) | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a log debug line could not harm here to understand that the chan was closed, e.g. due to underlying RMQ channel closed
} | ||
|
||
func (env *envConfig) sendMessage(headers amqp.Table, bytes []byte) (chan amqp.Confirmation, error) { | ||
crc := atomic.AddUint64(&env.rc, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like this means the first message will bump counter to 1 and hence env.channels[0] would never be used? if so, you might change your slice indexing in line 175 accordingly
Body: bytes, | ||
}); err != nil { | ||
|
||
ch, err := env.sendMessage(headers, bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: confirmCh
} | ||
|
||
func (channel *Channel) publish(exchange string, headers amqp.Table, bytes []byte) (chan amqp.Confirmation, error) { | ||
channel.mutex.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This basically means that every message published will lock the particular channel, i.e. serialize the logic and constrain parallelism to the number of defined channels, correct?
Changes
/kind
Fixes #334
Release Note
Docs