Skip to content

Commit

Permalink
ES memory write race fix and performance improvement (#67)
Browse files Browse the repository at this point in the history
* Fix Loki readiness check
* Allow multiple API calls to ES
  - This patch adds possibility to submit multiple requests to ES in paralel
     and hence improving performance.
* Revert "Fix concurrent conflicts in elasticsearch (#53)"
  - Unfortunately concurrent.Map is not ready for concurrency,
     so we need to avoid using it.
  - This reverts commit e61141a.
* Fix concurrency in ES client message buffer
  -  "fatal error: concurrent map writes"
      /go/src/github.com/infrawatch/sg-core/plugins/application/elasticsearch/main.go:95
     /go/src/github.com/infrawatch/sg-core/pkg/bus.go:65 calls Application callbacks as a goroutine
     which results in concurrency in ReceiveEvent(). I've added a mutex to protect access.
  - Co-authored-by: Chris Sibbitt <csibbitt@redhat.com>
* Remove unused package
  - concurrent.Map is unused throughout the project and it did not work as expected,
     because values could be overwritten when unlocked after returning from method.
* Add amqp1 transport
* Don't transfer whole message through channels
   - This patch makes channel synchronization use pointers instead of whole messages,
     which improves performance of the elasticsearch app plugin. It also adds mechanism
     to flush index buffer in case of remnants of some batch messages are kept
      for longer time.
* Integration CI fixes
  - simplify rsyslog configuration to work with newer rsyslog
  - stop using ELN repo because of rsyslog-omamqp1
     (plugin is available in appstream)
  - add proper wait time between steps instead of simple sleep
* Update .github/workflows/tests.yml
  - Co-authored-by: Leif Madsen <lmadsen@redhat.com>
* Move golint exclude-rules to proper place

(cherry picked from commit 7d46ba5)
  • Loading branch information
paramite committed Feb 9, 2022
1 parent a2a2ee9 commit 44cb460
Show file tree
Hide file tree
Showing 16 changed files with 384 additions and 164 deletions.
73 changes: 55 additions & 18 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ env:
BRIDGE_SOCKET: /tmp/sg-bridge/test-socket

TEST_IMAGE: registry.access.redhat.com/ubi8
TEST_PORT: "-p 3000:3000"

on: push

Expand Down Expand Up @@ -60,51 +61,87 @@ jobs:
docker run --name=sgbridge $BRIDGE_VOLUME -d -uroot --network host -e OPSTOOLS_REPO \
-e GITHUB_REF -e BRIDGE_SOCKET --workdir=$(dirname $BRIDGE_SOCKET) \
$TEST_IMAGE bash $PROJECT_ROOT/ci/integration/logging/run_bridge.sh
- name: Wait for services to start successfuly and print logs
- name: Run rsyslog to produce log messages
run: |
docker run --name=rsyslog -d -uroot --network host $RSYSLOG_VOLUME \
--volume ${{ github.workspace }}:$PROJECT_ROOT:z --workdir $PROJECT_ROOT \
$RSYSLOG_IMAGE bash $PROJECT_ROOT/ci/integration/logging/run_rsyslog.sh
- name: Wait for services to start successfuly
run: |
timeout=180
echo "======================= rsyslog ======================="
rsyslog_wait=0
while [[ $(docker exec qdr qdstat -b 127.0.0.1:5666 -a | grep rsyslog/logs | awk '{print $8}') -le 0 ]]
do
sleep 1
rsyslog_wait=$(($rsyslog_wait+1))
if [[ $rsyslog_wait -gt $timeout ]]; then
echo "ERROR: timeout for rsyslog startup"
break
fi
done
echo "INFO: rsyslog startup took ${rsyslog_wait}s"
echo "===================== elasticsearch ====================="
elastic_wait=0
while ! curl -sX GET "http://127.0.0.1:9200/_cluster/health"
do
sleep 1
elastic_wait=$(($elastic_wait+1))
if [[ $elastic_wait -gt $timeout ]]; then
echo "\nERROR: timeout for elasticsearch startup"
break
fi
done
docker logs elastic
echo "INFO: elasticsearch startup took ${elastic_wait}s"
echo "========================== loki ========================="
while ! curl -sX GET "http://127.0.0.1:3100/loki/api/v1/ready"
loki_wait=0
while ! curl -sX GET "http://127.0.0.1:3100/ready" | grep -q "^ready$"
do
sleep 1
loki_wait=$(($loki_wait+1))
if [[ $loki_wait -gt $timeout ]]; then
echo "ERROR: timeout for loki startup"
break
fi
done
echo "INFO: loki startup took ${loki_wait}s"
- name: Print container logs
run: |
echo "======================= rsyslog ======================="
docker logs rsyslog
echo "===================== elasticsearch ====================="
docker logs elastic
echo "========================== loki ========================="
docker logs loki
echo "========================== qdr =========================="
docker logs qdr
echo "======================= sg-bridge ======================="
docker logs sgbridge
# produce logs
- name: Run rsyslog to produce log messages
run: |
docker run --name=rsyslog -d -uroot --network host $RSYSLOG_VOLUME \
--volume ${{ github.workspace }}:$PROJECT_ROOT:z --workdir $PROJECT_ROOT \
$RSYSLOG_IMAGE bash $PROJECT_ROOT/ci/integration/logging/run_rsyslog.sh
- name: debug
run: |
sleep 20
docker logs rsyslog
docker exec qdr qdstat -b 127.0.0.1:5666 -c
docker exec qdr qdstat -b 127.0.0.1:5666 -a
# run integration tests
- name: Run sg-core to process log messages
run: |
docker run --name=sgcore -d -uroot --network host $BRIDGE_VOLUME -e OPSTOOLS_REPO \
--volume ${{ github.workspace }}:$PROJECT_ROOT:z --workdir $PROJECT_ROOT \
$TEST_IMAGE bash $PROJECT_ROOT/ci/integration/logging/run_sg.sh
- name: debug
- name: sg-core debug output
run: |
sleep 60
timeout=360
sg_wait=0
while [[ $(curl -sX GET "http://127.0.0.1:3000/metrics" | grep 'sg_total_logs_received{source="SG"}' | awk '{print $2}') -le 0 ]]
do
sleep 1
sg_wait=$(($sg_wait+1))
if [[ $sg_wait -gt $timeout ]]; then
echo "ERROR: timeout for sg-core startup"
break
fi
done
echo "INFO: sg-core startup took ${sg_wait}s"
docker logs sgcore
docker exec qdr qdstat -b 127.0.0.1:5666 -c
docker exec qdr qdstat -b 127.0.0.1:5666 -a
- name: Validate log message processing
run: |
sleep 60
docker run --name=validate -uroot --network host \
--volume ${{ github.workspace }}:$PROJECT_ROOT:z --workdir $PROJECT_ROOT \
$TEST_IMAGE bash $PROJECT_ROOT/ci/integration/logging/run_validation.sh
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
done
docker logs elastic
echo "========================== loki ========================="
while ! curl -sX GET "http://127.0.0.1:3100/loki/api/v1/ready"
while ! curl -sX GET "http://127.0.0.1:3100/ready" | grep -q "^ready$"
do
sleep 1
done
Expand Down
5 changes: 4 additions & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ issues:
- text: New
linters:
- deadcode
- linters:
- staticcheck
# https://staticcheck.io/docs/checks#SA4008 (The variable in the loop condition never changes, are you incrementing the wrong variable?)
text: "SA4008:"

linters:
disable-all: true
Expand Down Expand Up @@ -55,4 +59,3 @@ linters:
- unparam
- varcheck
# - whitespace

15 changes: 7 additions & 8 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# CONTAINER_BUILD=true ./build.sh
#
# Production build (omits test plugin binaries to minimize image size and builds for container)
# PRODUCTION_BUILD=true ./build.sh
# PRODUCTION_BUILD=true ./build.sh

base=$(pwd)

Expand All @@ -24,17 +24,16 @@ fi
# to keep the image size as small as possible
if $PRODUCTION_BUILD; then
OMIT_TRANSPORTS=(
"amqp1"
"dummy-alertmanager"
"dummy-events"
"dummy-metrics"
"dummy-logs"
)

OMIT_HANDLERS=(

)

OMIT_APPLICATIONS=(
)
fi
Expand All @@ -56,26 +55,26 @@ search_list() {
build_plugins() {
# build transports
cd "$base"
for i in plugins/transport/*; do
for i in plugins/transport/*; do
cd "$base/$i"
search_list "$(basename $i)" OMIT_TRANSPORTS
if [ $? -ne 1 ]; then
echo "building $(basename $i).so"
$GOCMD build -o "$PLUGIN_DIR$(basename $i).so" -buildmode=plugin
fi
done

# build handlers
cd "$base"
for i in plugins/handler/*; do
for i in plugins/handler/*; do
cd "$base/$i"
search_list "$(basename $i)" OMIT_HANDLERS
if [ $? -ne 1 ]; then
echo "building $(basename $i).so"
$GOCMD build -o "$PLUGIN_DIR$(basename $i).so" -buildmode=plugin
fi
done

# build applications
cd "$base"
for i in plugins/application/*; do
Expand Down
17 changes: 3 additions & 14 deletions ci/integration/logging/run_rsyslog.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,8 @@ set -ex

EXIT_CODE=0

# enable ELN repo
cat > /etc/yum.repos.d/fedora-eln.repo <<EOF
[eln-baseos]
name=Fedora - ELN BaseOS - Developmental packages for the next Enterprise Linux release
baseurl=https://odcs.fedoraproject.org/composes/production/latest-Fedora-ELN/compose/BaseOS/\$basearch/os/
#metalink=https://mirrors.fedoraproject.org/metalink?repo=eln&arch=\$basearch
enabled=1
gpgcheck=0
skip_if_unavailable=False
EOF

# Locale setting in CentOS8 is broken
dnf install -y glibc-langpack-en rsyslog-omamqp1
dnf install -q -y glibc-langpack-en rsyslog-omamqp1

# Generate log records for verification
touch /tmp/test.log
Expand All @@ -27,9 +16,9 @@ do
echo "[$(date +'%Y-%m-%d %H:%M')] WARNING Something bad might happen" >> /tmp/test.log
echo "[$(date +'%Y-%m-%d %H:%M')] :ERROR: Something bad happened" >> /tmp/test.log
echo "[$(date +'%Y-%m-%d %H:%M')] [DEBUG] Wubba lubba dub dub" >> /tmp/test.log
sleep 10
sleep 10
done &

# launch rsyslog
cat /etc/rsyslog.d/*
echo "Launching rsyslog"
rsyslogd -n
2 changes: 1 addition & 1 deletion ci/integration/logging/run_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ LOKI_URL=http://127.0.0.1:3100
# debug output of cluster status
curl -sX GET "$ELASTIC_URL/_cluster/health?pretty"
# verify expected index
#TODO(mmagr): adapt elasticseatch plugin to create index templates avoinding unnecessary prefix and suffix
curl -sX GET "$ELASTIC_URL/_cat/indices"
expected_index="sglogs-$(echo $HOST | tr - _).$TS"
curl -sX GET "$ELASTIC_URL/_cat/indices/sglogs-*?h=index"
found_index=$(curl -sX GET "$ELASTIC_URL/_cat/indices/sglogs-*?h=index")
Expand Down
4 changes: 4 additions & 0 deletions ci/integration/logging/sg_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ applications:
connection: http://127.0.0.1:3100
batchsize: 3
maxwaittime: 1
- name: prometheus
config:
host: 0.0.0.0
port: 3000
23 changes: 9 additions & 14 deletions ci/service_configs/rsyslog/rsyslog_config.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

module(load="imfile" PollingInterval="5")
module(load="omamqp1")
module(load="imfile")
input(type="imfile"
File="/tmp/test.log"
Tag="ci.integration.test"
reopenOnTruncate="on")

template (name="rsyslog-record" type="list" option.jsonf="on")
{
Expand All @@ -16,15 +18,8 @@ template (name="rsyslog-record" type="list" option.jsonf="on")
constant(format="jsonf" value="<region-name>" outname="region")
}

input(type="imfile"
file="/tmp/test.log"
tag="ci.integration.test"
)

module(load="omamqp1")
action(type="omamqp1"
host="localhost:5666"
target="rsyslog/logs"
template="rsyslog-record"
reconnectDelay="3"
maxRetries="5"
)
host="localhost:5666"
target="rsyslog/logs"
template="rsyslog-record")
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func main() {
logger.Info("loaded application plugin")
}

// NOTE(mmagr): so if err will be just the warning from above, do we still need to end execution?
if err != nil {
return
}
Expand Down
77 changes: 0 additions & 77 deletions pkg/concurrent/concurrent.go

This file was deleted.

Loading

0 comments on commit 44cb460

Please sign in to comment.