Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NU-1823] Fix for schemaless topics in kafka source/sink #7232

Merged
merged 4 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
* [#7257](https://github.com/TouK/nussknacker/pull/7257) `components-api` module: Replaced wide dependency to `async-http-client-backend-future`
by the narrowest possible dependency to sttp's core
* [#7259](https://github.com/TouK/nussknacker/pull/7259) `flink-executor` and `lite-runtime` modules: Added compile-time
dependency to `http-utils` (which depends on `async-http-client-backend-future` and indirectly on Netty)
dependency to `http-utils` (which depends on `async-http-client-backend-future` and indirectly on Netty)
* [#7066](https://github.com/TouK/nussknacker/pull/7066) Kafka source and sink can now operate with schemaless topics. They accept any json. Data will not be validated with schema.

## 1.18

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
)(implicit nodeId: NodeId): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = {
if (schemaRegistryClient.isTopicWithSchema(
preparedTopic.prepared.topicName.toUnspecialized.name,
topicSelectionStrategy,
kafkaConfig
)) {
val versions = schemaRegistryClient.getAllVersions(preparedTopic.prepared.toUnspecialized, isKey = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package pl.touk.nussknacker.engine.schemedkafka
import cats.data.Validated
import org.apache.kafka.clients.admin.ListTopicsOptions
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.TimeoutException
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils, UnspecializedTopicName}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClient, SchemaRegistryError}

import java.util.concurrent.ExecutionException
import java.util.regex.Pattern
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -36,11 +38,11 @@ class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy {
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
val topicsFromSchemaRegistry = schemaRegistryClient.getAllTopics
val validatorConfig = kafkaConfig.topicsExistenceValidationConfig.validatorConfig

val schemaLessTopics: List[UnspecializedTopicName] = {
try {
KafkaUtils.usingAdminClient(kafkaConfig) {
_.listTopics(new ListTopicsOptions().timeoutMs(validatorConfig.adminClientTimeout.toMillis.toInt))
_.listTopics(new ListTopicsOptions().timeoutMs(5000))
.names()
.get()
.asScala
Expand All @@ -51,6 +53,11 @@ class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy {
}
} catch {
// In some tests we pass dummy kafka address, so when we try to get topics from kafka it fails
case err: ExecutionException =>
err.getCause match {
case _: TimeoutException => List.empty
case _ => throw err
}
case _: KafkaException =>
List.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry
import cats.data.Validated
import io.confluent.kafka.schemaregistry.ParsedSchema
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, UnspecializedTopicName}
import pl.touk.nussknacker.engine.schemedkafka.{
TopicSelectionStrategy,
TopicsMatchingPatternWithExistingSubjectsSelectionStrategy,
TopicsWithExistingSubjectSelectionStrategy
}
import pl.touk.nussknacker.engine.schemedkafka.TopicsWithExistingSubjectSelectionStrategy

trait SchemaRegistryClient extends Serializable {

Expand Down Expand Up @@ -44,12 +40,13 @@ trait SchemaRegistryClient extends Serializable {

def getAllVersions(topic: UnspecializedTopicName, isKey: Boolean): Validated[SchemaRegistryError, List[Integer]]

def isTopicWithSchema(topic: String, strategy: TopicSelectionStrategy, kafkaConfig: KafkaConfig): Boolean = {
val topicsWithSchema = strategy match {
case strategy: TopicsMatchingPatternWithExistingSubjectsSelectionStrategy => strategy.getTopics(this, kafkaConfig)
case _ => new TopicsWithExistingSubjectSelectionStrategy().getTopics(this, kafkaConfig)
def isTopicWithSchema(topic: String, kafkaConfig: KafkaConfig): Boolean = {
if (!kafkaConfig.showTopicsWithoutSchema) {
true
} else {
val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy().getTopics(this, kafkaConfig)
topicsWithSchema.exists(_.map(_.name).contains(topic))
}
topicsWithSchema.exists(_.map(_.name).contains(topic))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import pl.touk.nussknacker.engine.api.process.TopicName
import pl.touk.nussknacker.engine.api.test.TestRecord
import pl.touk.nussknacker.engine.kafka.consumerrecord.SerializableConsumerRecord
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, RecordFormatter, serialization}
import pl.touk.nussknacker.engine.schemedkafka.TopicsWithExistingSubjectSelectionStrategy
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
ContentTypes,
ContentTypesSchemas,
Expand Down Expand Up @@ -114,7 +113,6 @@ abstract class AbstractSchemaBasedRecordFormatter[K: ClassTag, V: ClassTag] exte

if (schemaRegistryClient.isTopicWithSchema(
topic.name,
new TopicsWithExistingSubjectSelectionStrategy,
kafkaConfig
)) {
val valueSchemaOpt = record.valueSchemaId.map(schemaRegistryClient.getSchemaById).map(_.schema)
Expand Down Expand Up @@ -144,7 +142,7 @@ abstract class AbstractSchemaBasedRecordFormatter[K: ClassTag, V: ClassTag] exte

}

record.consumerRecord.toKafkaConsumerRecord(topic, serializeKeyValue)
record.consumerRecord.copy(topic = Some(topic.name)).toKafkaConsumerRecord(topic, serializeKeyValue)
}

protected def readRecordKeyMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.serialization.Deserializer
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.schemedkafka.{RuntimeSchemaData, TopicsWithExistingSubjectSelectionStrategy}
import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.SchemaRegistryBasedDeserializerFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
ChainedSchemaIdFromMessageExtractor,
Expand Down Expand Up @@ -42,7 +42,7 @@ class UniversalKafkaDeserializer[T](
.getOrElse(throw MessageWithoutSchemaIdException)

val schemaWithMetadata = {
if (schemaRegistryClient.isTopicWithSchema(topic, new TopicsWithExistingSubjectSelectionStrategy, kafkaConfig)) {
if (schemaRegistryClient.isTopicWithSchema(topic, kafkaConfig)) {
schemaRegistryClient.getSchemaById(writerSchemaId.value)
} else {
writerSchemaId.value match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Optional
@Network
class AzureSchemaBasedSerdeProviderIntegrationTest extends AnyFunSuite with OptionValues with Matchers {

ignore("serialization round-trip") {
test("serialization round-trip") {
val eventHubsNamespace = Option(System.getenv("AZURE_EVENT_HUBS_NAMESPACE")).getOrElse("nu-cloud")
val config = Map(
"schema.registry.url" -> s"https://$eventHubsNamespace.servicebus.windows.net",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class AzureTestsFromFileIntegrationTest

private val kafkaConfig = KafkaConfig(Some(schemaRegistryConfigMap), None, showTopicsWithoutSchema = false)

ignore("test from file round-trip") {
test("test from file round-trip") {
val schemaRegistryClient = AzureSchemaRegistryClientFactory.create(kafkaConfig.schemaRegistryClientKafkaConfig)
val serdeProvider = UniversalSchemaBasedSerdeProvider.create(UniversalSchemaRegistryClientFactory)
val factory = serdeProvider.deserializationSchemaFactory.create[String, GenericRecord](kafkaConfig, None, None)
Expand Down
Loading