Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AliECS core Kafka producer #520

Merged
merged 43 commits into from
Mar 22, 2024
Merged
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
f963751
[core] Refactor serverutil in preparation for eventstream
teo Sep 1, 2022
25fdadd
[core] Events protofile
teo Feb 15, 2024
ca1c759
[core] Additional events
teo Feb 15, 2024
0f92acb
[core] Update events.proto+o2control.proto with NewEnvironmentAsync
teo Feb 16, 2023
cf40d75
[common] Use events.proto in o2control.proto
teo Feb 20, 2024
8c5f41f
[coconut] Fix Protobuf generator call
teo Feb 21, 2024
34b7a0d
[core] Kafka wrapper
teo Feb 23, 2024
0a7c61b
[core] Emit environment events
teo Feb 29, 2024
e983b2c
[coconut] Add asynchronous mode (-y) to coconut env create command
teo Feb 16, 2023
fabece2
[coconut] Implement coconut env create -y
teo Feb 29, 2024
d13d12b
[core] Add task traits and CallEvent to events.proto
teo Feb 16, 2023
231d3d9
[common] Add CallEvent
teo Feb 29, 2024
aa4ac3f
[core] Emit call events to inform on plugin calls
teo Feb 16, 2023
95c41fe
[core] Send EnvId with TaskEvents
teo Feb 29, 2024
225a5f9
[core] Rename busEvent in task.go
teo Mar 8, 2024
48499fb
[core] Add IntegratedServiceEvent and rename Envid field
teo Mar 8, 2024
fe2dc90
[core] Push env vars on workflow load
teo Mar 13, 2024
7530576
[common] Allow event creation with specific timestamp
teo Mar 14, 2024
28c3e68
[common] Various additions to events in events.proto
teo Mar 14, 2024
6b7c8fd
[build] Bump dependencies
teo Mar 14, 2024
92d513f
[core] Include parent role path in task events
teo Mar 14, 2024
e82a40e
[core] Improve call information in CallEvents
teo Mar 14, 2024
dc190db
[core] Emit IntegratedServiceEvents from DCS
teo Mar 14, 2024
8c57a45
[core] Make sure we always output ECS detector codes, not DCS ones
teo Mar 14, 2024
274e73f
[core] Don't forget to include error in DCS ERROR events
teo Mar 14, 2024
b3e3461
[core] Better DCS event descriptions
teo Mar 15, 2024
27eec71
[core] Emit ddscheduler events
teo Mar 15, 2024
68b2c46
[core] Remove legacy ODC handlers
teo Mar 15, 2024
16ce849
[core] Emit ODC events
teo Mar 15, 2024
db62a9f
[core] Emit TRG events
teo Mar 18, 2024
ae22193
[common] Enable AllowAutoTopicCreation in Kafka client
teo Mar 19, 2024
831178d
[core] Correct Kafka topic
teo Mar 19, 2024
9c62431
[build] Generate fdset file for decoding Kafka messages with pq
teo Mar 19, 2024
d17a4a1
[core] Emit call events to aliecs.call topic and include envId
teo Mar 19, 2024
8695b2a
[core] Enable IntegratedServiceEvents
teo Mar 20, 2024
31eb36d
[core] Pass IntegratedServiceEvents by ref
teo Mar 20, 2024
e733190
[core] Write to Kafka asynchronously
teo Mar 21, 2024
6ffe6b6
[core] Nullify odc Devices list before emitting events
teo Mar 21, 2024
d8785f0
[core] Trim down ODC events some more
teo Mar 21, 2024
e3a392f
[core] Publish ODC partition state changes
teo Mar 21, 2024
e3d59c1
[core] Document events.proto and change currentRunNumber field
teo Mar 22, 2024
62dcce8
[core] Document currently unused topics
teo Mar 22, 2024
f7c2786
[docs] Document Kafka producer functionality
teo Mar 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
[docs] Document Kafka producer functionality
  • Loading branch information
teo committed Mar 22, 2024
commit f7c27866e7ad37f06a4bcc23f0af7406f770e3ba
63 changes: 57 additions & 6 deletions docs/kafka.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,60 @@
# Kafka plugin
# Kafka producer functionality in AliECS

## Kafka producer functionality in AliECS core

As of 2024 the AliECS core integrates Kafka producer functionality independent of the plugin, with the goal of all consumers eventually migrating to this new interface.

### Making sure that AliECS sends messages

To enable the plugin, one should make sure that the following points are fullfiled.
* The consul instance includes coordinates to the list of kafka brokers.
Navigate to `o2/components/aliecs/ANY/any/settings` and make sure the following key value pairs are there:
```
kafkaEndpoints:
- "my-kafka-broker-1:9092"
- "my-kafka-broker-2:9092"
- "my-kafka-broker-3:9092"
```
Please restart the AliECS core if you modify this file.

No further AliECS configuration is necessary.

AliECS will create the necessary topics if they don't exist yet, in this case the very first message will be lost.
Once the topics exist, no further messages can be lost and no action is necessary.

### Currently available topics

See [events.proto](../common/protos/events.proto) for the protobuf definitions of the messages.

* `aliecs.core` - core events that don't concern a specific environment or task
* `aliecs.environment` - events that concern an environment, e.g. environment state changes
* `aliecs.task` - events emitted by a task, e.g. task state changes
* `aliecs.call` - events emitted before and after the execution of a call
* `aliecs.integrated_service.dcs` - events emitted by the DCS integrated service
* `aliecs.integrated_service.ddsched` - events emitted by the DDSched integrated service
* `aliecs.integrated_service.odc` - events emitted by the ODC integrated service
* `aliecs.integrated_service.trg` - events emitted by the TRG integrated service

### Decoding the messages

Messages are encoded with protobuf, with the aforementioned [events.proto](../common/protos/events.proto) file defining the schema.
Integraed service messages include a payload portion that is usually JSON-encoded, and has no predefined schema.

To generate the precompiled protobuf interface, run `make fdset`.
You can then consume the messages from a given topic using [https://github.com/sevagh/pq](https://github.com/sevagh/pq):
```
$ FDSET_PATH=./fdset pq kafka aliecs.environment --brokers kafka-broker-hostname:9092 --msgtype events.Event
```

Adjust the topic name, fdset path, and broker endpoint as necessary, and append `--beginning` to consume past messages from the beginning of the topic.


## Legacy events: Kafka plugin

The Kafka plugin in AliECS publishes updates messages about new states of environments and lists of environments in the RUNNING state.
The messages are encoded with protobuf.

## Making sure that AliECS sends messages
### Making sure that AliECS sends messages

To enable the plugin, one should make sure that the following points are fullfiled.
* The consul instance includes coordinates to your kafka broker and enables the plugin.
Expand All @@ -17,27 +68,27 @@ To enable the plugin, one should make sure that the following points are fullfil
* Plugin is enabled for the new environments. Make sure that there is a `true` value set in the consul instance at the path `o2/runtime/aliecs/vars/kafka_enabled`.
Alternatively, one can put `kafka_enabled : true` in the Advanced configuration panel in the AliECS GUI.

## Currently available topics
### Currently available topics

As for today, AliECS publishes on the following types of topics:

* `aliecs.env_state.<state>` where `state` can be `STANDBY`, `DEPLOYED`, `CONFIGURED`, `RUNNING`, `ERROR`, `UNKNOWN`. For each topic, AliECS publishes a `NewStateNotification` message when any environment reaches the corresponding state. The `UNKNOWN` state is usually published when an environment gets a `DESTROY` request, but the plugin cannot know what will be the state after the transition.
* `aliecs.env_leave_state.<state>` where `state` can be `STANDBY`, `DEPLOYED`, `CONFIGURED`, `RUNNING`, `ERROR`. For each topic, AliECS publishes a `NewStateNotification` message when any environment is about to leave the corresponding state.
* `aliecs.env_list.<state>` where `state` is only `RUNNING`. Each time there is an environment state change, AliECS publishes an `ActiveRunsList` message which contains a list of all environments which are currently in `RUNNING` state.

## Decoding the messages
### Decoding the messages

Messages are encoded with protobuf. Please use [this](../core/integration/kafka/protos/kafka.proto) proto file to generate code which deserializes the messages.

## Getting Start of Run and End of Run notifications
### Getting Start of Run and End of Run notifications

To get SOR and EOR notifications, please subscribe to the two corresponding topics:
* `aliecs.env_state.RUNNING` for Start of Run
* `aliecs.env_leave_state.RUNNING` for End of Run

Both will provide `NewStateNotification` messages encoded with protobuf. Please note that the EOR message will still contain the RUNNING state, because it is sent just before the transition starts.

## Using Kafka debug tools
### Using Kafka debug tools

One can use some Kafka command line tools to verify that a given setup works correctly. One should make sure to have Kafka installed on the machine used to run the tools.

Expand Down
Loading