Skip to content

Commit a04af1c

Browse files
authored
chore: add configurable pagination key, update readme and recipe (#3)
1 parent 7f1b5e3 commit a04af1c

File tree

5 files changed

+33
-5
lines changed

5 files changed

+33
-5
lines changed

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# datahub-apicurio-kafka
22
ApiCurio Schema Registry implementation for [DataHub Kafka ingestion source](https://datahubproject.io/docs/generated/ingestion/sources/kafka/). Only support AVRO schema currently.
33

4+
## Sample Ingestion Recipe
5+
A sample recipe can be found [here](./kafka_src_recipe.yaml)
6+
47
## Installation
58
Package can be installed using `pip install apicurio-datahub-kafka`

kafka_src_recipe.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@ source:
66
connection:
77
bootstrap: "localhost:9093"
88
schema_registry_url: http://localhost:8085/apis/registry/v2
9+
schema_registry_config:
10+
pagination: 100
911
#sink:
1012
# type: "console"

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
setup_output = setup(
44
name="apicurio_datahub_kafka",
5-
version="1.1.2",
5+
version="1.1.5",
66
description="DataHub ApiCurio Schema Registry for Kafka Source",
77
package_dir={"": "src"},
88
packages=find_packages("src"),

src/apicurio_kafka/apicurio_schema_registry.py

+24-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import logging
44

55
import nest_asyncio
6+
from apicurioregistrysdk.client.search.artifacts.artifacts_request_builder import ArtifactsRequestBuilder
7+
68
nest_asyncio.apply()
79

810
from hashlib import md5
@@ -46,8 +48,26 @@ async def _async_init(
4648
self.registry_client = RegistryClient(request_adapter)
4749

4850
try:
49-
self.known_schema_registry_subjects: ArtifactSearchResults = await self.registry_client.search.artifacts.get()
51+
limit = self.source_config.connection.schema_registry_config.get("pagination")
52+
if limit == None:
53+
logger.warning(f"Unable to get pagination key in recipe. Setting maximum number of artifacts to be returned to default 20")
54+
limit = 20
55+
else:
56+
logger.info(f"Maximum number of artifacts to be returned: {limit}")
57+
58+
query_params = ArtifactsRequestBuilder.ArtifactsRequestBuilderPostQueryParameters(
59+
limit=limit
60+
)
61+
62+
request_configuration = (
63+
ArtifactsRequestBuilder.ArtifactsRequestBuilderPostRequestConfiguration(
64+
query_parameters=query_params
65+
)
66+
)
67+
self.known_schema_registry_subjects: ArtifactSearchResults = await self.registry_client.search.artifacts.get(request_configuration=request_configuration)
5068
logger.info(f"Known schema registry subjects: {self.known_schema_registry_subjects.count}")
69+
for artifact in self.known_schema_registry_subjects.artifacts:
70+
logger.info(f"known artifact id: {artifact.id}")
5171
except Exception as e:
5272
logger.warning(f"Failed to get artifacts from schema registry: {e}")
5373

@@ -109,15 +129,15 @@ async def _get_schema_and_fields(
109129
# Obtain the schema fields from schema for the topic.
110130
fields: List[SchemaField] = []
111131
if artifact is not None:
112-
logger.debug(
132+
logger.info(
113133
f"The {schema_type_str} schema subject:'{artifact.id}', grp ID:'{artifact.group_id}' is found for topic:'{topic}'."
114134
)
115135
fields = await self._get_schema_fields(
116136
topic=topic, artifact=artifact, is_key_schema=is_key_schema
117137
)
118138
else:
119-
logger.debug(
120-
f"For topic: {topic}, the schema registry subject for the schema is not found."
139+
logger.info(
140+
f"For topic: {topic}, the {schema_type_str} schema registry subject for the schema is not found."
121141
)
122142
return artifact, fields
123143

tests/apicurio/test_schema_registry.py

+3
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ async def test_apicurio_schema_registry():
9191

9292
kafka_source_config = KafkaSourceConfig()
9393
kafka_source_config.connection.schema_registry_url = f"http://{os.environ[REGISTRY_HOST]}:{os.environ[REGISTRY_PORT]}/apis/registry/v2"
94+
kafka_source_config.connection.schema_registry_config = {
95+
"pagination": 100
96+
}
9497

9598
kafka_source_report = KafkaSourceReport()
9699

0 commit comments

Comments
 (0)