Skip to content

Commit

Permalink
Add support for loading bad events into Elasticsearch (closes #30)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeemster committed Mar 4, 2016
1 parent b59b391 commit 89b0d5d
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 2 deletions.
86 changes: 86 additions & 0 deletions resources/configs/snowplow-elasticsearch-sink-bad.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Default configuration for kinesis-elasticsearch-sink

sink {

# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'stdin' for reading unencoded tab-separated events from stdin
# If set to "stdin", JSON documents will not be sent to Elasticsearch
# but will be written to stdout.
source = "stdin"

# Where to write good and bad records
sink {
# Sinks currently supported are:
# 'elasticsearch' for writing good records to Elasticsearch
# 'stdout' for writing good records to stdout
"good": "elasticsearch"

# Sinks currently supported are:
# 'kinesis' for writing bad records to Kinesis
# 'stderr' for writing bad records to stderr
# 'none' for ignoring bad records
"bad": "none"
}

# "good" for a stream of successfully enriched events
# "bad" for a stream of bad events
stream-type: "bad"

# The following are used to authenticate for the Amazon Kinesis sink.
#
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
#
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
#
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
access-key: ""
secret-key: ""
}

kinesis {

in {
stream-name: "" # Kinesis stream name

# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# Note: This only affects the first run of this application
# on a stream.
initial-position: "TRIM_HORIZON"
}

out {
# Stream for enriched events which are rejected by Elasticsearch
stream-name: ""
shards: 1
}

region: ""

# "app-name" is used for a DynamoDB table to maintain stream state.
# You can set it automatically using: "SnowplowElasticsearchSink-$\\{connector.kinesis.in.stream-name\\}"
app-name: ""
}

elasticsearch {
cluster-name: "elasticsearch"
endpoint: "localhost"
max-timeout: "10000"
index: "bad" # Elasticsearch index name
type: "bad" # Elasticsearch type name
}

# Events are accumulated in a buffer before being sent to Elasticsearch.
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byte-limit or
# - the number of stored records exceeds record-limit or
# - the time in milliseconds since it was last emptied exceeds time-limit
buffer {
byte-limit: 5242880
record-limit: 10000
time-limit: 60000
}
}
12 changes: 10 additions & 2 deletions resources/elasticsearch/bad-mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,16 @@
},
"properties": {
"errors": {
"type": "string",
"analyzer": "standard"
"properties": {
"message" : {
"type": "string",
"analyzer": "standard"
},
"level" : {
"type": "string",
"analyzer": "standard"
}
}
},
"failure_tstamp": {
"type": "date",
Expand Down
98 changes: 98 additions & 0 deletions resources/init/snowplow_elasticsearch_sink_bad_0.5.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/bin/sh
### BEGIN INIT INFO
# Provides:
# Required-Start: $remote_fs $syslog
# Required-Stop: $remote_fs $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Start daemon at boot time
# Description: Enable service provided by daemon.
### END INIT INFO

dir="/home/ubuntu/snowplow/bin/"
cmd="./snowplow-elasticsearch-sink-0.5.0 --config /home/ubuntu/snowplow/configs/snowplow-elasticsearch-sink-bad.hocon"
user="ubuntu"

name=`basename $0`
pid_file="/var/run/$name.pid"
stdout_log="/var/log/$name.log"
stderr_log="/var/log/$name.err"

get_pid() {
cat "$pid_file"
}

is_running() {
[ -f "$pid_file" ] && ps `get_pid` > /dev/null 2>&1
}

case "$1" in
start)
if is_running; then
echo "Already started"
else
echo "Starting $name"
cd "$dir"
if [ -z "$user" ]; then
cat /home/ubuntu/snowplow/pipes/bad-1-pipe | sudo $cmd >> "$stdout_log" 2>> "$stderr_log" &
else
cat /home/ubuntu/snowplow/pipes/bad-1-pipe | sudo -u "$user" $cmd >> "$stdout_log" 2>> "$stderr_log" &
fi
echo $! > "$pid_file"
if ! is_running; then
echo "Unable to start, see $stdout_log and $stderr_log"
exit 1
fi
fi
;;
stop)
if is_running; then
echo -n "Stopping $name.."
kill `get_pid`
for i in {1..10}
do
if ! is_running; then
break
fi

echo -n "."
sleep 1
done
echo

if is_running; then
echo "Not stopped; may still be shutting down or shutdown may have failed"
exit 1
else
echo "Stopped"
if [ -f "$pid_file" ]; then
rm "$pid_file"
fi
fi
else
echo "Not running"
fi
;;
restart)
$0 stop
if is_running; then
echo "Unable to stop, will not attempt to start"
exit 1
fi
$0 start
;;
status)
if is_running; then
echo "Running"
else
echo "Stopped"
exit 1
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
exit 1
;;
esac

exit 0

0 comments on commit 89b0d5d

Please sign in to comment.