Skip to content

Commit

Permalink
Revert "Fix concurrent conflicts in elasticsearch (#53)"
Browse files Browse the repository at this point in the history
Unfortunately concurrent.Map is not ready for concurrency,
so we need to avoid using it.

This reverts commit e61141a.
  • Loading branch information
paramite committed Jul 12, 2021
1 parent b44402a commit 6d5319d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
18 changes: 8 additions & 10 deletions plugins/application/elasticsearch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"

"github.com/infrawatch/sg-core/pkg/concurrent"
"github.com/infrawatch/sg-core/plugins/application/elasticsearch/pkg/lib"
)

Expand Down Expand Up @@ -52,15 +51,15 @@ type Elasticsearch struct {
configuration *lib.AppConfig
logger *logging.Logger
client *lib.Client
buffer *concurrent.Map
buffer map[string][]string
dump chan esIndex
}

// New constructor
func New(logger *logging.Logger) application.Application {
return &Elasticsearch{
logger: logger,
buffer: concurrent.NewMap(),
buffer: make(map[string][]string),
dump: make(chan esIndex, 100),
}
}
Expand Down Expand Up @@ -89,20 +88,19 @@ func (es *Elasticsearch) ReceiveEvent(event data.Event) {
// buffer or index record
var recordList []string
if es.configuration.BufferSize > 1 {
if !es.buffer.Contains(event.Index) {
es.buffer.Set(event.Index, make([]string, 0, es.configuration.BufferSize))
if _, ok := es.buffer[event.Index]; !ok {
es.buffer[event.Index] = make([]string, 0, es.configuration.BufferSize)
}

recordList = (es.buffer.Get(event.Index)).([]string)
recordList = append(recordList, record)
if len(recordList) < es.configuration.BufferSize {
es.buffer[event.Index] = append(es.buffer[event.Index], record)
if len(es.buffer[event.Index]) < es.configuration.BufferSize {
// buffer is not full, don't send
es.logger.Metadata(logging.Metadata{"plugin": appname, "record": record})
es.logger.Debug("buffering record")
es.buffer.Set(event.Index, recordList)
return
}
es.buffer.Delete(event.Index)
recordList = es.buffer[event.Index]
delete(es.buffer, event.Index)
} else {
recordList = []string{record}
}
Expand Down
5 changes: 2 additions & 3 deletions plugins/application/elasticsearch/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"testing"

"github.com/infrawatch/apputils/logging"
"github.com/infrawatch/sg-core/pkg/concurrent"
"github.com/infrawatch/sg-core/pkg/data"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -249,7 +248,7 @@ func TestElasticsearchApp(t *testing.T) {
results := make(chan esIndex, len(eventCases))
app := &Elasticsearch{
logger: logger,
buffer: concurrent.NewMap(),
buffer: make(map[string][]string),
dump: results,
}
err := app.Config([]byte(testConf))
Expand All @@ -271,7 +270,7 @@ func TestElasticsearchApp(t *testing.T) {
results := make(chan esIndex, len(logCases))
app := &Elasticsearch{
logger: logger,
buffer: concurrent.NewMap(),
buffer: make(map[string][]string),
dump: results,
}
err := app.Config([]byte(testConf))
Expand Down

0 comments on commit 6d5319d

Please sign in to comment.