Skip to content

Commit

Permalink
Added Kafka producer and consumer test suites.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Oct 29, 2017
1 parent 088fdb0 commit a34d35b
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 0 deletions.
124 changes: 124 additions & 0 deletions dev/suites/xtrigger/kafka/consumer/lib/python/cylc_kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python
"""
A generic Kakfa consumer for use as a Cylc external trigger function.
A NOTE ON OVERHEADS of checking Kafka for an external trigger condition at
intervals, vs a persistent consumer looking for all suite trigger messages:
1) Every unique trigger has to check Kafka separately for its own specific
message, at intervals until the message is found.
2) Every call has connection and authentication overheads.
3) The first call for each unique trigger has to consume messages from the
start of the topic - required in case messages are checked for out of order.
Subsequent checks for the same message do not need to start from the beginning
of the topic. This is achieved by giving each trigger a unique consumer group
ID.
"""

import re
import sys
import json
from kafka import KafkaConsumer
from cylc.suite_logging import LOG


# Time out after 1 second if we reach the end of the topic.
CONSUMER_TIMEOUT_MS = 1000


def _match_msg(cylc_msg, kafka_msg):
all_msg_items_matched = True
result = {}
for ckey, cval in cylc_msg.items():
if ckey not in kafka_msg.value:
all_msg_items_matched = False
break
elif cval.startswith('<') and cval.endswith('>'):
m = re.match(cval[1:-1], kafka_msg.value[ckey])
# TODO: check regex; and num match groups should be one.
if m:
result[ckey] = m.group(0)
else:
all_msg_items_matched = False
break
elif kafka_msg.value[ckey] != cval:
all_msg_items_matched = False
break
# (else exact match this field; continue to next item)
if all_msg_items_matched:
return result
else:
return {}


def cylc_kafka_consumer(kafka_server, kafka_topic, group_id, message, debug):
"""Look for a matching message in a Kafka topic.
ARGUMENTS:
* kafka_server - Kafka server URL, e.g. "localhost:9092".
* kafka_topic - the Kafka topic to check, e.g. "data-avail".
* group_id - determines Kafka offset ownership (see below).
* message - string-ified dict with optional pattern elements (see below).
* debug - boolean; set by daemon debug mode; prints to suite err log.
The topic is first consumed from the beginning, then from the previous
committed offset. If the message is not found by end of topic, commit the
offset and return (to will try again later). If found, return the result.
Kafka commits offsets per "consumer group" so the group_id argument
must be unique per distinct trigger in the suite - this allows each trigger
to separately consume the topic from the beginning, looking for its own
messages (otherwise, with shared offsets, one trigger could move the offset
beyond the messages of another trigger). This goes for successive instances
of an external-triggered cycling task too, because out-of-order triggering
could be required sometimes. So this argument should typically be, e.g.:
group_id=x%(task_id)s
where "x" is an arbitrary string you can use to change the group name if
you need to re-run the suite, and the messages, from the start again,
without re-running the producer suite. Note this also serves to make the
function signature cycle-point-specific for Cylc even if the message does
not contain the cycle point (although it probably should).
The "message" argument is a stringified dict, e.g.:
{'system': 'prod', 'point': '2025', 'data': '<nwp.*\.nc>'}
should be represented as:
"system:prod point:2025 data:<nwp.*\.nc>"
A match occurs Kafka if all message dict items match, and the result returned
is the sub-dict of the actual values of items containing
angle-bracket-delineated regex patterns. E.g. above {'data': 'nwp-2025.nc'}.
"""

consumer = KafkaConsumer(kafka_topic, bootstrap_servers=[kafka_server],
value_deserializer=lambda m: json.loads(m),
consumer_timeout_ms=CONSUMER_TIMEOUT_MS,
auto_offset_reset='earliest',
group_id=group_id)

# Construct a dict from the message argument "key1=val1 key2=val2 ...".
cylc_msg = dict(m.split(':') for m in message.split())

result = (False, {})
n_cons = 0
for kafka_msg in consumer:
n_cons += 1
m = _match_msg(cylc_msg, kafka_msg)
if m:
result = (True, m)
break
# (else consume and compare next message)
consumer.commit()
# Unsubscribe before exit, otherwise next call will be slow while
# Kafka times out waiting for this original consumer connection.
consumer.unsubscribe()
if debug:
if result[0]:
res = "\n MATCHED: %s" % result[1]
else:
res = "no match."
LOG.debug('Kafka: "%s" (consumed %d) ... %s' % (message, n_cons, res))
return result
5 changes: 5 additions & 0 deletions dev/suites/xtrigger/kafka/consumer/rose-suite.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[jinja2:suite.rc]
KAFKA_SERVER="localhost:9092"
KAFKA_TOPIC="data-avail"
SYSTEM="prod"
GROUP_ID_PREFIX=1
47 changes: 47 additions & 0 deletions dev/suites/xtrigger/kafka/consumer/suite.rc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!Jinja2

#------------------------------------------------------------------------------
# A suite that uses the cylc_kafka_consumer module as an external trigger
# function, to trigger tasks off of Kafka messages. Jinja2 inputs are defined
# in rose-suite.conf.
#------------------------------------------------------------------------------
# For use with the corresponding PRODUCER example suite.
#------------------------------------------------------------------------------
# NOTE: to re-run the suite, re-run the producer suite to generate new
# triggering messages, or change GROUP_ID_PREFIX in rose-suite.conf to change
# the consume group ID and thereby re-consume the already-used messages.
#------------------------------------------------------------------------------

[cylc]
cycle point format = %Y
[scheduling]
initial cycle point = 3010
[[xtriggers]]
# Trigger off of availability of "analysis" files:
# Don't quote the function arguments here.
an = cylc_kafka_consumer( \
kafka_server={{KAFKA_SERVER}}, kafka_topic={{KAFKA_TOPIC}}, \
message=system:{{SYSTEM}} point:%(point)s data:<.*analysis.*>, \
group_id=an{{GROUP_ID_PREFIX}}(task_id)s):PT10S
# Trigger off of availability of "forecast" files:
# Don't quote the function arguments here.
fc = cylc_kafka_consumer( \
kafka_server={{KAFKA_SERVER}}, kafka_topic={{KAFKA_TOPIC}}, \
message=system:{{SYSTEM}} point:%(point)s data:<.*forecast.*>, \
group_id=fc{{GROUP_ID_PREFIX}}%(task_id)s):PT10S
[[dependencies]]
[[[P1Y]]]
graph = """@an => proc_an
@fc => proc_fc
pre => proc_an & proc_fc => publish
# Make sure products are published in correct order:
publish[-P1Y] => publish"""
[runtime]
[[pre]]
#...
[[proc_an]]
script = sleep 10; echo "my analysis data is: $an_data"
[[proc_fc]]
script = sleep 10; echo "my forecast data is: $fc_data"
[[publish]]
#...
51 changes: 51 additions & 0 deletions dev/suites/xtrigger/kafka/producer/bin/cylc_kafka_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python
"""
A generic Kafka producer for use as a Cylc event handler.
Hilary Oliver, October 2017.
"""

import sys
import json
from inspect import cleandoc
from kafka import KafkaProducer


def main():
"""
A generic Kafka producer for use as a Cylc event handler.
USAGE:
cylc_kafka_producer.py <HOST:PORT> <TOPIC> key1=val1 key2=val2 ...
serializes {key1: val1, key2: val2, ...} to TOPIC at Kafka on HOST:PORT.
This is generic in that a JSON message schema is defined by the received
command line keyword arguments. To enforce compliance to a particular
schema, copy and modify as needed.
Can be partnered with the generic cylc_kafka_consumer external trigger
function, for triggering downstream suites.
"""

if 'help' in sys.argv[1]:
print cleandoc(main.__doc__)
sys.exit(0)

# TODO exception handling for bad inputs etc.
kafka_server = sys.argv[1]
kafka_topic = sys.argv[2]
# Construct a message dict from kwargs.
dmsg = dict([k.split('=') for k in sys.argv[3:]])

producer = KafkaProducer(
bootstrap_servers=kafka_server,
value_serializer=lambda msg: json.dumps(msg).encode('utf-8'))

producer.send(kafka_topic, dmsg)
producer.flush()


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions dev/suites/xtrigger/kafka/producer/rose-suite.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[jinja2:suite.rc]
KAFKA_SERVER="localhost:9092"
KAFKA_TOPIC="data-avail"
SYSTEM="prod"
31 changes: 31 additions & 0 deletions dev/suites/xtrigger/kafka/producer/suite.rc
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!Jinja2

#------------------------------------------------------------------------------
# A suite that uses the bin/cylc_kafka_producer as custom event handler to send
# messages to Kafka. Jinja2 inputs are defined in rose-suite.conf.
#------------------------------------------------------------------------------
# For use with the corresponding CONSUMER example suite.
#------------------------------------------------------------------------------

[cylc]
cycle point format = %Y
[scheduling]
initial cycle point = 3010
final cycle point = 3015
[[dependencies]]
[[[P1Y]]]
graph = "pre & forecast[-P1Y] => forecast => post"
[runtime]
[[root]]
pre-script = sleep 10
[[[events]]]
# (don't use single quotes here)
custom handler = cylc_kafka_producer.py \
{{KAFKA_SERVER}} {{KAFKA_TOPIC}} "system={{SYSTEM}}" \
"point=%(point)s" "data=%(message)s"
[[pre]]
script = cylc message -p CUSTOM /data/analysis-${CYLC_TASK_CYCLE_POINT}.nc
[[forecast]]
script = cylc message -p CUSTOM /data/forecast-${CYLC_TASK_CYCLE_POINT}.nc
[[post]]
script = cylc message -p CUSTOM /data/products-${CYLC_TASK_CYCLE_POINT}.nc

0 comments on commit a34d35b

Please sign in to comment.