-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
In-order containerd events #1677
Conversation
ec3df39
to
0faad3f
Compare
@@ -66,7 +99,7 @@ func EnableEventListener() error { | |||
return err | |||
} | |||
|
|||
go listenForDockerEvents(r) | |||
go listenForDockerEvents(&eventQueue, r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we only enqueue now, we want to run this in sync otherwise the enqueue itself could get out of order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow you. We still have a race between syncWithRuntime and listenForDockerEvents. listenForDockerEvents is annoying to call from syncWithRuntime because it spends it's time reading a Reader. As you suggested elsewhere, though, we can pass a "since" option to the queries for both events and containerList. I haven't touched that here, though, but it's the next thing I was going to do.
I'm happy to have it all in one PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forget what I said, I misread the code. I thought that was invoked per event. This is fine.
pkg/workloads/containerd/watcher.go
Outdated
@@ -139,6 +173,24 @@ func listenForDockerEvents(reader io.ReadCloser) { | |||
} | |||
} | |||
|
|||
func processContainerEvents(containerID string, events chan dTypesEvents.Message) { | |||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we stop these go subroutines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They exit on channel close, which is done by the reapEmpty call. I was trying to avoid more coordination with them but we can have the termination go the other way.
pkg/workloads/containerd/watcher.go
Outdated
@@ -48,15 +48,48 @@ const ( | |||
syncRateDocker = 30 * time.Second | |||
|
|||
maxRetries = 3 | |||
|
|||
eventQueueBufferSize = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we enqueue in sync, we want a bigger buffer size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides @tgraf comments and a small nit, overall looks awesome!
pkg/workloads/containerd/watcher.go
Outdated
func processContainerEvents(containerID string, events chan dTypesEvents.Message) { | ||
for { | ||
select { | ||
case m, ok := <-events: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for { select { case ...
can be made into a single for m := range events{
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent point. I originally had more things to handle in there. This will be much cleaner.
This creates a worker per container. It reaps emtpy queues periodically. This does not address the race with syncWithRuntime. Fixes GH-1594 Signed-off-by: Ray Bejjani <ray@covalent.io>
0faad3f
to
5ad5988
Compare
Creates a goroutine per container to handle events, reaping empty ones periodically. This should mean less goroutines overall but they live for longer. I'm not sure how many events we expect to see, since we seem to only handle start/die. If it's low enough that a single goroutine could do it I'd structure this differently but I stuck with the model we have now. I chatted with @tgraf about how this relates to #1662 and we decided to keep this PR restricted to the in-order part.
Since this is my first PR, I probably got style things wrong too, so let me know!