Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added kafka splitting setup #117

Merged
merged 3 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mirrord-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.9.2
version: 1.10.0

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
32 changes: 30 additions & 2 deletions mirrord-operator/templates/cluster-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ rules:
- get
- list
- watch
{{- if .Values.operator.sqsSplitting }}
# For patching target workloads to use different queue.
{{- if or .Values.operator.sqsSplitting .Values.operator.kafkaSplitting }}
# For patching target workloads to use different queue/topic.
- apiGroups:
- apps
resources:
Expand Down Expand Up @@ -111,6 +111,34 @@ rules:
verbs:
- update
{{- end }}
{{- if .Values.operator.kafkaSplitting }}
- apiGroups:
- queues.mirrord.metalbear.co
resources:
- mirrordkafkaephemeraltopics
verbs:
- get
- list
- watch
- create
- delete
- apiGroups:
- queues.mirrord.metalbear.co
resources:
- mirrordkafkaclientconfigs
verbs:
- get
- list
- watch
- apiGroups:
- queues.mirrord.metalbear.co
resources:
- mirrordkafkatopicsconsumers
verbs:
- get
- list
- watch
{{- end }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
Expand Down
273 changes: 273 additions & 0 deletions mirrord-operator/templates/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,276 @@ spec:
subresources:
status: {}
{{ end }}
{{ if .Values.operator.kafkaSplitting}}
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: mirrordkafkaclientconfigs.queues.mirrord.metalbear.co
spec:
group: queues.mirrord.metalbear.co
names:
categories: []
kind: MirrordKafkaClientConfig
plural: mirrordkafkaclientconfigs
shortNames: []
singular: mirrordkafkaclientconfig
scope: Namespaced
versions:
- additionalPrinterColumns:
- description: Name of parent configuration.
jsonPath: .spec.parent
name: PARENT
type: string
name: v1alpha
schema:
openAPIV3Schema:
description: Auto-generated derived type for MirrordKafkaClientConfigSpec via `CustomResource`
properties:
spec:
description: Configuration to use when creating operator's Kafka client. Resources of this kind should live in the operator's namespace.
properties:
parent:
description: Name of parent resource to use as base when resolving final configuration.
nullable: true
type: string
properties:
description: |-
Properties to set.

When performing Kafka splitting, the operator will override `group.id` property.

The list of all available properties can be found [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
items:
description: Property to use when creating operator's Kafka client.
properties:
name:
description: Name of the property, e.g `bootstrap.servers`.
type: string
value:
description: Value for the property, e.g `kafka.default.svc.cluster.local:9092`. `null` clears the property from parent resource when resolving the final configuration.
nullable: true
type: string
required:
- name
type: object
type: array
required:
- properties
type: object
required:
- spec
title: MirrordKafkaClientConfig
type: object
served: true
storage: true
subresources: {}
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: mirrordkafkaephemeraltopics.queues.mirrord.metalbear.co
spec:
group: queues.mirrord.metalbear.co
names:
categories: []
kind: MirrordKafkaEphemeralTopic
plural: mirrordkafkaephemeraltopics
shortNames: []
singular: mirrordkafkaephemeraltopic
scope: Namespaced
versions:
- additionalPrinterColumns:
- description: Name of the topic.
jsonPath: .spec.name
name: NAME
type: string
- description: Name of MirrordKafkaClientProperties to use when creating Kafka client.
jsonPath: .spec.clientConfig
name: CLIENT-CONFIG
type: string
name: v1alpha
schema:
openAPIV3Schema:
description: Auto-generated derived type for MirrordKafkaEphemeralTopicSpec via `CustomResource`
properties:
spec:
description: |-
Ephemeral topic created in your Kafka cluster for the purpose of running a Kafka splitting session.

Resources of this kind should live in the operator's namespace. They will be used to clean up topics that are no longer used.
properties:
clientConfig:
description: Links to [`MirrordKafkaClientConfigSpec`] resource living in the same namespace.
type: string
name:
description: Name of the topic.
type: string
required:
- clientConfig
- name
type: object
required:
- spec
title: MirrordKafkaEphemeralTopic
type: object
served: true
storage: true
subresources: {}
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: mirrordkafkatopicsconsumers.queues.mirrord.metalbear.co
spec:
group: queues.mirrord.metalbear.co
names:
categories: []
kind: MirrordKafkaTopicsConsumer
plural: mirrordkafkatopicsconsumers
shortNames: []
singular: mirrordkafkatopicsconsumer
scope: Namespaced
versions:
- additionalPrinterColumns:
- description: Name of the topic consumer workload.
jsonPath: .spec.consumerName
name: CONSUMER-NAME
type: string
- description: Kind of the topic consumer workload.
jsonPath: .spec.consumerKind
name: CONSUMER-KIND
type: string
- description: Api version of the topic consumer workload.
jsonPath: .spec.consumerApiVersion
name: CONSUMER-API-VERSION
type: string
- description: Timeout for consumer workload restart.
jsonPath: .spec.consumerRestartTimeout
name: CONSUMER-RESTART-TIMEOUT
type: string
name: v1alpha
schema:
openAPIV3Schema:
description: Auto-generated derived type for MirrordKafkaTopicsConsumerSpec via `CustomResource`
properties:
spec:
description: |-
Defines splittable Kafka topics consumed by some workload living in the same namespace.

# Concurrent splitting

Concurrent Kafka splitting sessions are allowed, as long as they use the same topic id or their topics' `nameSources` do not overlap.

# Example

```yaml apiVersion: queues.mirrord.metalbear.co/v1alpha kind: MirrordKafkaTopicsConsumer metadata: name: example namespace: default spec: consumerName: example-deployment consumerApiVersion: apps/v1 consumerKind: Deployment topics: - id: example-topic nameSources: - directEnvVar: container: example-container name: KAFKA_TOPIC_NAME groupIdSources: - directEnvVar: container: example-container name: KAFKA_GROUP_ID clientConfig: example-config ```

1. Creating the resource below will enable Kafka splitting on a deployment `example-deployment` living in namespace `default`. Id `example-topic` can be then used in the mirrord config to split the topic for the duration of the mirrord session.

2. Topic name will be resolved based on `example-deployment`'s pod template by extracting value of variable `KAFKA_TOPIC_NAME` defined directly in `example-container`.

3. Consumer group id used by the mirrord operator will be resolved based on `example-deployment`'s pod template by extracting value of variable `KAFKA_GROUP_ID` defined directly in `example-container`.

4. For the duration of the session, `example-deployment` will be patched - the mirrord operator will substitute topic name in `KAFKA_TOPIC_NAME` variable with a name of an ephemeral Kafka topic.

5. Local application will see a different value of the `KAFKA_TOPIC_NAME` - it will be a name of another ephemeral Kafka topic.

6. `MirrordKafkaClientConfig` named `example-config` living in mirrord operator's namespace will be used to manage ephemeral Kafka topics and consume/produce messages.
properties:
consumerApiVersion:
description: Workload api version, for example `apps/v1`.
type: string
consumerKind:
description: Workload kind, for example `Deployment`.
type: string
consumerName:
description: Workload name, for example `my-deployment`.
type: string
consumerRestartTimeout:
description: |-
Timeout for waiting until workload patch takes effect, that is at least one pod reads from the ephemeral topic.

Specified in seconds. Defaults to 60s.
format: uint32
minimum: 0.0
nullable: true
type: integer
topics:
description: List of consumed splittable topics.
items:
description: Splittable Kafka topic consumed by some remote target.
properties:
clientConfig:
description: Links to [`MirrordKafkaClientConfig`] in the operator's namespace. This config will be used to manage ephemeral Kafka topics and consume/produce messages.
type: string
groupIdSources:
description: All occurrences of this topic's group id in the workload's pod template.
items:
description: Source of some topic property required for Kafka splitting.
oneOf:
- required:
- directEnvVar
properties:
directEnvVar:
description: Environment variable with value defined directly in the pod template.
properties:
container:
description: Name of the container.
type: string
variable:
description: Name of the variable.
type: string
required:
- container
- variable
type: object
type: object
type: array
id:
description: Id of this topic. Can be used in mirrord config to identify this topic.
type: string
nameSources:
description: All occurrences of this topic's name in the workload's pod template.
items:
description: Source of some topic property required for Kafka splitting.
oneOf:
- required:
- directEnvVar
properties:
directEnvVar:
description: Environment variable with value defined directly in the pod template.
properties:
container:
description: Name of the container.
type: string
variable:
description: Name of the variable.
type: string
required:
- container
- variable
type: object
type: object
type: array
required:
- clientConfig
- groupIdSources
- id
- nameSources
type: object
type: array
required:
- consumerApiVersion
- consumerKind
- consumerName
- topics
type: object
required:
- spec
title: MirrordKafkaTopicsConsumer
type: object
served: true
storage: true
subresources: {}
{{ end }}
2 changes: 2 additions & 0 deletions mirrord-operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ spec:
{{- end }}
- name: OPERATOR_SQS_SPLITTING
value: {{ .Values.operator.sqsSplitting | ternary "true" "false" | quote }}
- name: OPERATOR_KAFKA_SPLITTING
value: {{ .Values.operator.kafkaSplitting | ternary "true" "false" | quote }}
- name: OPERATOR_JSON_LOG
value: {{ .Values.operator.jsonLog | ternary "true" "false" | quote }}
- name: OPERATOR_AGENT_CONFIG
Expand Down
2 changes: 2 additions & 0 deletions mirrord-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ operator:
jsonLog: false
# Has to be set to `true` in order to use the SQS queue splitting feature.
sqsSplitting: false
# Has to be set to `true` in order to use the Kafka queue splitting feature.
kafkaSplitting: false
# imagePullSecrets:
# - name: value

Expand Down
8 changes: 8 additions & 0 deletions test_values/operator_kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
operator:
kafkaSplitting: true

license:
file:
secret: mirrord-operator-license
data:
license.pem: "DOESN'TNEEDTOBOOTSOITCANBEINVALID"
Loading