Skip to content

Commit

Permalink
See #469. Refactored.
Browse files Browse the repository at this point in the history
  • Loading branch information
j3-signalroom committed Nov 18, 2024
1 parent 71f3aa1 commit 00bc345
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
15 changes: 7 additions & 8 deletions java/app/src/main/java/kickstarter/AvroDataGeneratorApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,21 @@ public static void main(String[] args) throws Exception {
Types.GENERIC(GenericRecord.class)
);

// --- Sets up a Flink POJO source to consume data.
// --- Sets up a Flink Avro Generic Record source to consume data.
DataStream<GenericRecord> skyOneStream =
env.fromSource(skyOneSource, WatermarkStrategy.noWatermarks(), "skyone_source");

/*
* Sets up a Flink Kafka sink to produce data to the Kafka topic `airline.skyone_avro` with the
* specified serializer.
*/
final String subject = AirlineData.NAMESPACE_THEJ3_MODEL + "." + AirlineData.SUBJECT_AIRLINE_DATA;
final org.apache.avro.Schema schema = AirlineData.buildSchema().rawSchema();
final String schemaRegistryUrl = producerProperties.getProperty("schema.registry.url");
KafkaRecordSerializationSchema<GenericRecord> skyOneSerializer =
KafkaRecordSerializationSchema.<GenericRecord>builder()
.setTopic("airline.skyone_avro")
.setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forGeneric(AirlineData.SUBJECT_AIRLINE_DATA,
AirlineData.buildSchema().rawSchema(),
producerProperties.getProperty("schema.registry.url")))
.setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forGeneric(subject, schema, schemaRegistryUrl))
.build();

/*
Expand All @@ -161,7 +162,7 @@ public static void main(String[] args) throws Exception {
*/
skyOneStream.sinkTo(skyOneSink).name("skyone_sink");

// --- Sets up a Flink POJO source to consume data.
// --- Sets up a Flink Avro Generic Record source to consume data.
DataGeneratorSource<GenericRecord> sunsetSource =
new DataGeneratorSource<>(
index -> DataGenerator.generateAirlineFlightData("SUN").toGenericRecord(),
Expand All @@ -180,9 +181,7 @@ public static void main(String[] args) throws Exception {
KafkaRecordSerializationSchema<GenericRecord> sunsetSerializer =
KafkaRecordSerializationSchema.<GenericRecord>builder()
.setTopic("airline.sunset_avro")
.setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forGeneric(AirlineData.SUBJECT_AIRLINE_DATA,
AirlineData.buildSchema().rawSchema(),
producerProperties.getProperty("schema.registry.url")))
.setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forGeneric(subject, schema, schemaRegistryUrl))
.build();

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Flink App, then caches the properties for use by any subsequent events that need these
* properties.
*/
public class ConfluentClientConfigurationLookup extends RichMapFunction<Properties, Properties>{
public class ConfluentClientConfigurationMapFunction extends RichMapFunction<Properties, Properties>{
private transient AtomicReference<Properties> _properties;
private volatile boolean _consumerKafkaClient;
private volatile String _serviceAccountUser;
Expand All @@ -43,7 +43,7 @@ public class ConfluentClientConfigurationLookup extends RichMapFunction<Properti
* @param serviceAccountUser
* @throws Exception - Exception occurs when the service account user is empty.
*/
public ConfluentClientConfigurationLookup(boolean consumerKafkaClient, String serviceAccountUser) throws Exception {
public ConfluentClientConfigurationMapFunction(boolean consumerKafkaClient, String serviceAccountUser) throws Exception {
// --- Check if the service account user is empty
if(serviceAccountUser.isEmpty()) {
throw new Exception("The service account user must be provided.");
Expand Down

0 comments on commit 00bc345

Please sign in to comment.