(Kafka Avro4s Schema Registry)
Scala client library that provide an Avro schema registry with Kafka persistency and Avro4s serializer support. It allows resolving AVRO schemas across multiple applications. It can be used instead of Confluent Schema Registry.
For serialization, Single object AVRO encoding is used, where only a schema id (hash) is persisted within the record. Thanks to Avro4s, all Scala case classes and primitive types can be serialized and deserialized.
kaa.schemaregistry.KaaSchemaRegistry
: a simple embeddable schema registry that read and write schemas to Kafkakaa.schemaregistry.avro.SingleObjectSerializer[T]
: an avro serializer/deserializer for case classes, based on Avro4s, that internally usesKaaSchemaRegistry
for schema resolutionkaa.schemaregistry.avro.GenericRecordSingleObjectSerializer
: an avro serializer/deserializer forGenericRecord
classes that internally usesKaaSchemaRegistry
for schema resolutionkaa.schemaregistry.kafka.KaaSerde[T]
: an implementation of Kafka'sSerde[T]
based onSingleObjectSerializer
, that can be used with Kafka Streamkaa.schemaregistry.kafka.KaaGenericRecordSerde[T]
: an implementation of Kafka'sSerde[T]
based onGenericRecordSingleObjectSerializer
, that can be used with Kafka Stream
During serialization, a schema hash is generated and stored inside Kafka with the schema (key=hash, value=schema).
When deserializing the schema is retrieved from Kafka and used for the deserialization.
KaaSchemaRegistry
internally runs a Kafka consumer to read all schemas that will be cached in memory.
You can use kaa.schemaregistry.KaaSchemaRegistryAdmin
to programmatically create Kafka's schema topic.
NOTE: if you want to create the topic manually, remember to put cleanup policy to compact
to maintain all the schemas.
The main advantage of Kaa is that it doesn't require an external services to retrieve schemas. This library automatically reads and writes to Kafka. This can simplify installation and configuration of client applications and it is especially useful for applications that already interact with Kafka.
Confluent Schema Registry on the other end requires you to install a dedicated service.
Compiled with:
- Scala 2.12, 2.13
- Kafka 2.4
- Avro4s 4.0
Official releases (published in Maven Central):
libraryDependencies += "com.davideicardi" %% "kaa" % "<version>"
Packages are also available in Sonatype, also with snapshots versions:
externalResolvers += Resolver.sonatypeRepo("snapshots")
// externalResolvers += Resolver.sonatypeRepo("public") // for official releases
You can use KaaSchemaRegistryAdmin
to create the Kafka topic to store schema the first time.
// create the topic
val admin = new KaaSchemaRegistryAdmin(brokers)
try {
if (!admin.topicExists()) admin.createTopic()
} finally {
admin.close()
}
Just create an instance of KaaSchemaRegistry
and call the start
function.
Remember to close it calling close
at the end.
// create the schema registry
val schemaRegistry = new KaaSchemaRegistry(brokers, e => println(e))
try {
schemaRegistry.start()
// put your code here
} finally {
schemaRegistry.close()
}
case class SuperheroV1(name: String)
// given a schema registry
val schemaRegistry: KaaSchemaRegistry = ???
// create the serializer
val serializerV1 = new SingleObjectSerializer[SuperheroV1](schemaRegistry)
// serialize
val bytesV1 = serializerV1.serialize(SuperheroV1("Spiderman"))
// deserialize
val result = serializerV1.deserialize(bytesV1)
All you have to do is to import KaaSerde
and add an implicit SchemaRegistry
.
// Import the serializer
import kaa.schemaregistry.kafka.KaaSerde._
// Import scala implicit conversions
import org.apache.kafka.streams.scala.ImplicitConversions._
// Define an implicit schema registry
implicit val schemaRegistry: SchemaRegistry = ???
After that you can directly define your case classes and use Kafka Streams functions, like:
streamsBuilder.stream[TKey, TValue]("topicName")
For a complete example see es4kafka.
For a simple server implementation see ./kaa-registry-server
.
Use GET /schemas/ids/{schemaId}
method to retrieve schemas.
- Avro: https://avro.apache.org/
- Single object encoding: https://avro.apache.org/docs/current/spec.html#single_object_encoding
- Avro4s: https://github.com/sksamuel/avro4s
- Kafka: https://kafka.apache.org/
- Avro formats: https://gist.github.com/davideicardi/e8c5a69b98e2a0f18867b637069d03a9
- Agile Lab's Darwin Schema Registry: https://github.com/agile-lab-dev/darwin
- Confluent's Schema Registry: https://github.com/confluentinc/schema-registry
Run unit tests:
sbt test
Run integration tests:
docker-compose up -d
sbt it:test
docker-compose down
Run example application:
docker-compose up -d
sbt sample/run
docker-compose down
Run http server:
sbt kaaRegistryServer/run