-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix concurrent conflicts in elasticsearch #53
Conversation
This patch makes use of concurrent package to fix conflicting writes/reads to/from ES buffer when bulk index is configured
I wrote a similar patch this morning. https://github.com/infrawatch/sg-core/compare/csibbitt-205-buffer-concurrency-fix and was in the process of testing it when I saw this. Can you explain why you chose concurrent.Map instead of a Mutex? Should I benchmark both? |
Hey, yes sure. Concurent package is using RWMutex too. It is just a piece of code we have in sg-core already [1], so I just used what we have. [1] https://github.com/infrawatch/sg-core/blob/master/pkg/concurrent/concurrent.go#L7 |
I'm pretty fine with your path though. |
The other interesting one to try would be sync.Map. However, it is only more performant in very specific circumstances (few cache misses, many CPUs) and I don't believe this situation is one of them |
Honestly it wasn't until I read this that I realized this wasn't using sync.Map. I saw it quickly earlier and assumed that's what was going on (I agree that our use-case doesn't fit, I specifically opted not to use it). On re-read I see we've already solved this in a uniform way and I'd rather re-use that code if there is no significant performance penalty. So basically, I'm preferring this patch over #54 if possible. Thanks @paramite |
I tested this and it behaves the same as my patch. The performance differences should be minimal compared to other improvements we are contemplating, so I'll merge this version. |
This patch makes use of concurrent package to fix conflicting writes/reads to/from ES buffer when bulk index is configured Cherry picked from commit e61141a
Unfortunately concurrent.Map is not ready for concurrency, so we need to avoid using it. This reverts commit e61141a.
Unfortunately concurrent.Map is not ready for concurrency, so we need to avoid using it. This reverts commit e61141a.
* Fix Loki readiness check * Allow multiple API calls to ES - This patch adds possibility to submit multiple requests to ES in paralel and hence improving performance. * Revert "Fix concurrent conflicts in elasticsearch (#53)" - Unfortunately concurrent.Map is not ready for concurrency, so we need to avoid using it. - This reverts commit e61141a. * Fix concurrency in ES client message buffer - "fatal error: concurrent map writes" /go/src/github.com/infrawatch/sg-core/plugins/application/elasticsearch/main.go:95 /go/src/github.com/infrawatch/sg-core/pkg/bus.go:65 calls Application callbacks as a goroutine which results in concurrency in ReceiveEvent(). I've added a mutex to protect access. - Co-authored-by: Chris Sibbitt <csibbitt@redhat.com> * Remove unused package - concurrent.Map is unused throughout the project and it did not work as expected, because values could be overwritten when unlocked after returning from method. * Add amqp1 transport * Don't transfer whole message through channels - This patch makes channel synchronization use pointers instead of whole messages, which improves performance of the elasticsearch app plugin. It also adds mechanism to flush index buffer in case of remnants of some batch messages are kept for longer time. * Integration CI fixes - simplify rsyslog configuration to work with newer rsyslog - stop using ELN repo because of rsyslog-omamqp1 (plugin is available in appstream) - add proper wait time between steps instead of simple sleep * Update .github/workflows/tests.yml - Co-authored-by: Leif Madsen <lmadsen@redhat.com> * Move golint exclude-rules to proper place
* Fix Loki readiness check * Allow multiple API calls to ES - This patch adds possibility to submit multiple requests to ES in paralel and hence improving performance. * Revert "Fix concurrent conflicts in elasticsearch (#53)" - Unfortunately concurrent.Map is not ready for concurrency, so we need to avoid using it. - This reverts commit e61141a. * Fix concurrency in ES client message buffer - "fatal error: concurrent map writes" /go/src/github.com/infrawatch/sg-core/plugins/application/elasticsearch/main.go:95 /go/src/github.com/infrawatch/sg-core/pkg/bus.go:65 calls Application callbacks as a goroutine which results in concurrency in ReceiveEvent(). I've added a mutex to protect access. - Co-authored-by: Chris Sibbitt <csibbitt@redhat.com> * Remove unused package - concurrent.Map is unused throughout the project and it did not work as expected, because values could be overwritten when unlocked after returning from method. * Add amqp1 transport * Don't transfer whole message through channels - This patch makes channel synchronization use pointers instead of whole messages, which improves performance of the elasticsearch app plugin. It also adds mechanism to flush index buffer in case of remnants of some batch messages are kept for longer time. * Integration CI fixes - simplify rsyslog configuration to work with newer rsyslog - stop using ELN repo because of rsyslog-omamqp1 (plugin is available in appstream) - add proper wait time between steps instead of simple sleep * Update .github/workflows/tests.yml - Co-authored-by: Leif Madsen <lmadsen@redhat.com> * Move golint exclude-rules to proper place (cherry picked from commit 7d46ba5)
* Fix Loki readiness check * Allow multiple API calls to ES - This patch adds possibility to submit multiple requests to ES in paralel and hence improving performance. * Revert "Fix concurrent conflicts in elasticsearch (#53)" - Unfortunately concurrent.Map is not ready for concurrency, so we need to avoid using it. - This reverts commit e61141a. * Fix concurrency in ES client message buffer - "fatal error: concurrent map writes" /go/src/github.com/infrawatch/sg-core/plugins/application/elasticsearch/main.go:95 /go/src/github.com/infrawatch/sg-core/pkg/bus.go:65 calls Application callbacks as a goroutine which results in concurrency in ReceiveEvent(). I've added a mutex to protect access. - Co-authored-by: Chris Sibbitt <csibbitt@redhat.com> * Remove unused package - concurrent.Map is unused throughout the project and it did not work as expected, because values could be overwritten when unlocked after returning from method. * Add amqp1 transport * Don't transfer whole message through channels - This patch makes channel synchronization use pointers instead of whole messages, which improves performance of the elasticsearch app plugin. It also adds mechanism to flush index buffer in case of remnants of some batch messages are kept for longer time. * Integration CI fixes - simplify rsyslog configuration to work with newer rsyslog - stop using ELN repo because of rsyslog-omamqp1 (plugin is available in appstream) - add proper wait time between steps instead of simple sleep * Update .github/workflows/tests.yml - Co-authored-by: Leif Madsen <lmadsen@redhat.com> * Move golint exclude-rules to proper place (cherry picked from commit 7d46ba5)
* Fix Loki readiness check * Allow multiple API calls to ES - This patch adds possibility to submit multiple requests to ES in paralel and hence improving performance. * Revert "Fix concurrent conflicts in elasticsearch (#53)" - Unfortunately concurrent.Map is not ready for concurrency, so we need to avoid using it. - This reverts commit e61141a. * Fix concurrency in ES client message buffer - "fatal error: concurrent map writes" /go/src/github.com/infrawatch/sg-core/plugins/application/elasticsearch/main.go:95 /go/src/github.com/infrawatch/sg-core/pkg/bus.go:65 calls Application callbacks as a goroutine which results in concurrency in ReceiveEvent(). I've added a mutex to protect access. - Co-authored-by: Chris Sibbitt <csibbitt@redhat.com> * Remove unused package - concurrent.Map is unused throughout the project and it did not work as expected, because values could be overwritten when unlocked after returning from method. * Add amqp1 transport * Don't transfer whole message through channels - This patch makes channel synchronization use pointers instead of whole messages, which improves performance of the elasticsearch app plugin. It also adds mechanism to flush index buffer in case of remnants of some batch messages are kept for longer time. * Integration CI fixes - simplify rsyslog configuration to work with newer rsyslog - stop using ELN repo because of rsyslog-omamqp1 (plugin is available in appstream) - add proper wait time between steps instead of simple sleep * Update .github/workflows/tests.yml - Co-authored-by: Leif Madsen <lmadsen@redhat.com> * Move golint exclude-rules to proper place (cherry picked from commit 7d46ba5)
This patch makes use of concurrent package to fix conflicting writes/reads
to/from ES buffer when bulk index is configured