Skip to content

Commit

Permalink
See #469. Refactored and Recommited.
Browse files Browse the repository at this point in the history
  • Loading branch information
j3-signalroom committed Nov 19, 2024
1 parent e89ca00 commit f5108d8
Show file tree
Hide file tree
Showing 6 changed files with 617 additions and 86 deletions.
42 changes: 21 additions & 21 deletions java/app/src/main/java/kickstarter/AvroDataGeneratorApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,34 @@ public static void main(String[] args) throws Exception {
registryConfigs.put("schema.registry.basic.auth.user.info", producerProperties.getProperty("basic.auth.user.info"));

// --- Create a data generator source.
DataGeneratorSource<AirlineData> skyOneSource =
DataGeneratorSource<AirlineAvroData> skyOneSource =
new DataGeneratorSource<>(
index -> DataGenerator.generateAirlineFlightData("SKY1"),
index -> DataGenerator.generateAirlineAvroData("SKY1"),
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1),
Types.POJO(AirlineData.class)
Types.POJO(AirlineAvroData.class)
);

// --- Sets up a Flink Avro Generic Record source to consume data.
DataStream<AirlineData> skyOneStream =
DataStream<AirlineAvroData> skyOneStream =
env.fromSource(skyOneSource, WatermarkStrategy.noWatermarks(), "skyone_source");

/*
* Sets up a Flink Kafka sink to produce data to the Kafka topic `airline.skyone_avro` with the
* specified serializer.
*/
KafkaRecordSerializationSchema<AirlineData> skyOneSerializer =
KafkaRecordSerializationSchema.<AirlineData>builder()
KafkaRecordSerializationSchema<AirlineAvroData> skyOneSerializer =
KafkaRecordSerializationSchema.<AirlineAvroData>builder()
.setTopic("airline.skyone_avro")
.setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(AirlineData.class, AirlineData.SUBJECT, schemaRegistryUrl, registryConfigs))
.setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(AirlineAvroData.class, AirlineAvroData.SUBJECT, schemaRegistryUrl, registryConfigs))
.build();

/*
* Takes the results of the Kafka sink and attaches the unbounded data stream to the Flink
* environment (a.k.a. the Flink job graph -- the DAG).
*/
KafkaSink<AirlineData> skyOneSink =
KafkaSink.<AirlineData>builder()
KafkaSink<AirlineAvroData> skyOneSink =
KafkaSink.<AirlineAvroData>builder()
.setKafkaProducerConfig(producerProperties)
.setRecordSerializer(skyOneSerializer)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
Expand All @@ -165,33 +165,33 @@ public static void main(String[] args) throws Exception {
skyOneStream.sinkTo(skyOneSink).name("skyone_sink");

// --- Sets up a Flink Avro Generic Record source to consume data.
DataGeneratorSource<AirlineData> sunsetSource =
DataGeneratorSource<AirlineAvroData> sunsetSource =
new DataGeneratorSource<>(
index -> DataGenerator.generateAirlineFlightData("SUN"),
index -> DataGenerator.generateAirlineAvroData("SUN"),
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1),
Types.POJO(AirlineData.class)
Types.POJO(AirlineAvroData.class)
);

DataStream<AirlineData> sunsetStream =
DataStream<AirlineAvroData> sunsetStream =
env.fromSource(sunsetSource, WatermarkStrategy.noWatermarks(), "sunset_source");

/*
* Sets up a Flink Kafka sink to produce data to the Kafka topic `airline.sunset_avro` with the
* specified serializer.
*/
KafkaRecordSerializationSchema<AirlineData> sunsetSerializer =
KafkaRecordSerializationSchema.<AirlineData>builder()
KafkaRecordSerializationSchema<AirlineAvroData> sunsetSerializer =
KafkaRecordSerializationSchema.<AirlineAvroData>builder()
.setTopic("airline.sunset_avro")
.setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(AirlineData.class, AirlineData.SUBJECT, schemaRegistryUrl, registryConfigs))
.setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(AirlineAvroData.class, AirlineAvroData.SUBJECT, schemaRegistryUrl, registryConfigs))
.build();

/*
* Takes the results of the Kafka sink and attaches the unbounded data stream to the Flink
* environment (a.k.a. the Flink job graph -- the DAG).
*/
KafkaSink<AirlineData> sunsetSink =
KafkaSink.<AirlineData>builder()
KafkaSink<AirlineAvroData> sunsetSink =
KafkaSink.<AirlineAvroData>builder()
.setKafkaProducerConfig(producerProperties)
.setRecordSerializer(sunsetSerializer)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
Expand Down Expand Up @@ -296,11 +296,11 @@ public static void main(String[] args) throws Exception {
* @param tableName The name of the table.
* @param airlineDataStream The input data stream.
*/
private static void SinkToIcebergTable(final StreamTableEnvironment tblEnv, final org.apache.flink.table.catalog.Catalog catalog, final CatalogLoader catalogLoader, final String databaseName, final int fieldCount, final String tableName, DataStream<AirlineData> airlineDataStream) {
private static void SinkToIcebergTable(final StreamTableEnvironment tblEnv, final org.apache.flink.table.catalog.Catalog catalog, final CatalogLoader catalogLoader, final String databaseName, final int fieldCount, final String tableName, DataStream<AirlineAvroData> airlineDataStream) {
// --- Convert DataStream<AirlineData> to DataStream<RowData>
DataStream<RowData> skyOneRowData = airlineDataStream.map(new MapFunction<AirlineData, RowData>() {
DataStream<RowData> skyOneRowData = airlineDataStream.map(new MapFunction<AirlineAvroData, RowData>() {
@Override
public RowData map(AirlineData airlineData) throws Exception {
public RowData map(AirlineAvroData airlineData) throws Exception {
GenericRowData rowData = new GenericRowData(RowKind.INSERT, fieldCount);
rowData.setField(0, StringData.fromString(airlineData.getEmailAddress()));
rowData.setField(1, StringData.fromString(airlineData.getDepartureTime()));
Expand Down
26 changes: 26 additions & 0 deletions java/app/src/main/java/kickstarter/DataGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,30 @@ public static AirlineData generateAirlineFlightData(final String airlinePrefix)

return airlineData;
}

/**
* Generate an AirlineFlightData object.
*
* @param airlinePrefix The prefix for the airline.
* @return An AirlineFlightData object.
*/
public static AirlineAvroData generateAirlineAvroData(final String airlinePrefix) {
AirlineAvroData airlineData = new AirlineAvroData();
final LocalDateTime localDepartureTime = generateDepartureTime();
final LocalDateTime localArrivalTime = generateArrivalTime(localDepartureTime);

airlineData.setEmailAddress(generateEmail());
airlineData.setDepartureTime(localDepartureTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
airlineData.setDepartureAirportCode(generateAirportCode());
airlineData.setArrivalTime(localArrivalTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
airlineData.setArrivalAirportCode(generateAirportCode());
airlineData.setFlightDuration(Duration.between(localDepartureTime, localArrivalTime).toMinutes());
airlineData.setFlightNumber(airlinePrefix + random.nextInt(1000));
airlineData.setConfirmationCode(airlinePrefix + generateString(6));
airlineData.setTicketPrice(BigDecimal.valueOf(500L + (long)random.nextInt(1000)));
airlineData.setAircraft("Aircraft" + generateString(3));
airlineData.setBookingAgencyEmail(generateEmail());

return airlineData;
}
}
Loading

0 comments on commit f5108d8

Please sign in to comment.