From f5108d81d5074f017411ec0a7454b1aebaef85c0 Mon Sep 17 00:00:00 2001 From: "Jeffrey Jonathan Jennings (J3)" Date: Mon, 18 Nov 2024 23:00:14 -0500 Subject: [PATCH] See #469. Refactored and Recommited. --- .../kickstarter/AvroDataGeneratorApp.java | 42 +-- .../main/java/kickstarter/DataGenerator.java | 26 ++ .../kickstarter/model/AirlineAvroData.java | 294 ++++++++++++++++++ .../java/kickstarter/model/AirlineData.java | 150 +++++---- .../kickstarter/FlightImporterAppTest.java | 84 +++++ .../model/AirlineFlightDataTest.java | 107 +++++++ 6 files changed, 617 insertions(+), 86 deletions(-) create mode 100644 java/app/src/main/java/kickstarter/model/AirlineAvroData.java create mode 100644 java/app/src/test/java/kickstarter/FlightImporterAppTest.java create mode 100644 java/app/src/test/java/kickstarter/model/AirlineFlightDataTest.java diff --git a/java/app/src/main/java/kickstarter/AvroDataGeneratorApp.java b/java/app/src/main/java/kickstarter/AvroDataGeneratorApp.java index 21a3fff..d59ed3d 100644 --- a/java/app/src/main/java/kickstarter/AvroDataGeneratorApp.java +++ b/java/app/src/main/java/kickstarter/AvroDataGeneratorApp.java @@ -125,34 +125,34 @@ public static void main(String[] args) throws Exception { registryConfigs.put("schema.registry.basic.auth.user.info", producerProperties.getProperty("basic.auth.user.info")); // --- Create a data generator source. - DataGeneratorSource skyOneSource = + DataGeneratorSource skyOneSource = new DataGeneratorSource<>( - index -> DataGenerator.generateAirlineFlightData("SKY1"), + index -> DataGenerator.generateAirlineAvroData("SKY1"), Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), - Types.POJO(AirlineData.class) + Types.POJO(AirlineAvroData.class) ); // --- Sets up a Flink Avro Generic Record source to consume data. - DataStream skyOneStream = + DataStream skyOneStream = env.fromSource(skyOneSource, WatermarkStrategy.noWatermarks(), "skyone_source"); /* * Sets up a Flink Kafka sink to produce data to the Kafka topic `airline.skyone_avro` with the * specified serializer. */ - KafkaRecordSerializationSchema skyOneSerializer = - KafkaRecordSerializationSchema.builder() + KafkaRecordSerializationSchema skyOneSerializer = + KafkaRecordSerializationSchema.builder() .setTopic("airline.skyone_avro") - .setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(AirlineData.class, AirlineData.SUBJECT, schemaRegistryUrl, registryConfigs)) + .setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(AirlineAvroData.class, AirlineAvroData.SUBJECT, schemaRegistryUrl, registryConfigs)) .build(); /* * Takes the results of the Kafka sink and attaches the unbounded data stream to the Flink * environment (a.k.a. the Flink job graph -- the DAG). */ - KafkaSink skyOneSink = - KafkaSink.builder() + KafkaSink skyOneSink = + KafkaSink.builder() .setKafkaProducerConfig(producerProperties) .setRecordSerializer(skyOneSerializer) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) @@ -165,33 +165,33 @@ public static void main(String[] args) throws Exception { skyOneStream.sinkTo(skyOneSink).name("skyone_sink"); // --- Sets up a Flink Avro Generic Record source to consume data. - DataGeneratorSource sunsetSource = + DataGeneratorSource sunsetSource = new DataGeneratorSource<>( - index -> DataGenerator.generateAirlineFlightData("SUN"), + index -> DataGenerator.generateAirlineAvroData("SUN"), Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), - Types.POJO(AirlineData.class) + Types.POJO(AirlineAvroData.class) ); - DataStream sunsetStream = + DataStream sunsetStream = env.fromSource(sunsetSource, WatermarkStrategy.noWatermarks(), "sunset_source"); /* * Sets up a Flink Kafka sink to produce data to the Kafka topic `airline.sunset_avro` with the * specified serializer. */ - KafkaRecordSerializationSchema sunsetSerializer = - KafkaRecordSerializationSchema.builder() + KafkaRecordSerializationSchema sunsetSerializer = + KafkaRecordSerializationSchema.builder() .setTopic("airline.sunset_avro") - .setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(AirlineData.class, AirlineData.SUBJECT, schemaRegistryUrl, registryConfigs)) + .setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(AirlineAvroData.class, AirlineAvroData.SUBJECT, schemaRegistryUrl, registryConfigs)) .build(); /* * Takes the results of the Kafka sink and attaches the unbounded data stream to the Flink * environment (a.k.a. the Flink job graph -- the DAG). */ - KafkaSink sunsetSink = - KafkaSink.builder() + KafkaSink sunsetSink = + KafkaSink.builder() .setKafkaProducerConfig(producerProperties) .setRecordSerializer(sunsetSerializer) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) @@ -296,11 +296,11 @@ public static void main(String[] args) throws Exception { * @param tableName The name of the table. * @param airlineDataStream The input data stream. */ - private static void SinkToIcebergTable(final StreamTableEnvironment tblEnv, final org.apache.flink.table.catalog.Catalog catalog, final CatalogLoader catalogLoader, final String databaseName, final int fieldCount, final String tableName, DataStream airlineDataStream) { + private static void SinkToIcebergTable(final StreamTableEnvironment tblEnv, final org.apache.flink.table.catalog.Catalog catalog, final CatalogLoader catalogLoader, final String databaseName, final int fieldCount, final String tableName, DataStream airlineDataStream) { // --- Convert DataStream to DataStream - DataStream skyOneRowData = airlineDataStream.map(new MapFunction() { + DataStream skyOneRowData = airlineDataStream.map(new MapFunction() { @Override - public RowData map(AirlineData airlineData) throws Exception { + public RowData map(AirlineAvroData airlineData) throws Exception { GenericRowData rowData = new GenericRowData(RowKind.INSERT, fieldCount); rowData.setField(0, StringData.fromString(airlineData.getEmailAddress())); rowData.setField(1, StringData.fromString(airlineData.getDepartureTime())); diff --git a/java/app/src/main/java/kickstarter/DataGenerator.java b/java/app/src/main/java/kickstarter/DataGenerator.java index 0703312..d7858bf 100644 --- a/java/app/src/main/java/kickstarter/DataGenerator.java +++ b/java/app/src/main/java/kickstarter/DataGenerator.java @@ -90,4 +90,30 @@ public static AirlineData generateAirlineFlightData(final String airlinePrefix) return airlineData; } + + /** + * Generate an AirlineFlightData object. + * + * @param airlinePrefix The prefix for the airline. + * @return An AirlineFlightData object. + */ + public static AirlineAvroData generateAirlineAvroData(final String airlinePrefix) { + AirlineAvroData airlineData = new AirlineAvroData(); + final LocalDateTime localDepartureTime = generateDepartureTime(); + final LocalDateTime localArrivalTime = generateArrivalTime(localDepartureTime); + + airlineData.setEmailAddress(generateEmail()); + airlineData.setDepartureTime(localDepartureTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); + airlineData.setDepartureAirportCode(generateAirportCode()); + airlineData.setArrivalTime(localArrivalTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); + airlineData.setArrivalAirportCode(generateAirportCode()); + airlineData.setFlightDuration(Duration.between(localDepartureTime, localArrivalTime).toMinutes()); + airlineData.setFlightNumber(airlinePrefix + random.nextInt(1000)); + airlineData.setConfirmationCode(airlinePrefix + generateString(6)); + airlineData.setTicketPrice(BigDecimal.valueOf(500L + (long)random.nextInt(1000))); + airlineData.setAircraft("Aircraft" + generateString(3)); + airlineData.setBookingAgencyEmail(generateEmail()); + + return airlineData; + } } diff --git a/java/app/src/main/java/kickstarter/model/AirlineAvroData.java b/java/app/src/main/java/kickstarter/model/AirlineAvroData.java new file mode 100644 index 0000000..4346e34 --- /dev/null +++ b/java/app/src/main/java/kickstarter/model/AirlineAvroData.java @@ -0,0 +1,294 @@ +/** + * Copyright (c) 2024 Jeffrey Jonathan Jennings + * + * @author Jeffrey Jonathan Jennings (J3) + * + * + */ +package kickstarter.model; + +import com.fasterxml.jackson.annotation.*; +import org.apache.avro.specific.*; +import org.apache.avro.io.*; +import org.apache.avro.*; +import org.apache.avro.generic.*; +import io.confluent.kafka.schemaregistry.avro.*; +import java.math.*; +import java.util.*; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import org.apache.avro.generic.GenericFixed; + +import kickstarter.helper.*; + + +@SuppressWarnings("all") +public class AirlineAvroData extends SpecificRecordBase implements SpecificRecord { + public static final Schema SCHEMA$ = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AirlineData\",\"namespace\":\"com.thej3.apache_flink_kickstater.model\",\"fields\":[{\"name\":\"email_address\",\"type\":\"string\"},{\"name\":\"departure_time\",\"type\":\"string\"},{\"name\":\"departure_airport_code\",\"type\":\"string\"},{\"name\":\"arrival_time\",\"type\":\"int\"},{\"name\":\"arrival_airport_code\",\"type\":\"string\"},{\"name\":\"flight_duration\",\"type\":\"long\"},{\"name\":\"flight_number\",\"type\":\"string\"},{\"name\":\"confirmation_code\",\"type\":\"string\"},{\"name\":\"ticket_price\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}},{\"name\":\"aircraft\",\"type\":\"string\"},{\"name\":\"booking_agency_email\",\"type\":\"string\"}]}"); + + // --- The field names of the class object + public static final String FIELD_EMAIL_ADDRESS = "email_address"; + public static final String FIELD_DEPARTURE_TIME = "departure_time"; + public static final String FIELD_DEPARTURE_AIRPORT_CODE = "departure_airport_code"; + public static final String FIELD_ARRIVAL_TIME = "arrival_time"; + public static final String FIELD_ARRIVAL_AIRPORT_CODE = "arrival_airport_code"; + public static final String FIELD_FLIGHT_DURATION = "flight_duration"; + public static final String FIELD_FLIGHT_NUMBER = "flight_number"; + public static final String FIELD_CONFIRMATION_CODE = "confirmation_code"; + public static final String FIELD_TICKET_PRICE = "ticket_price"; + public static final String FIELD_AIRCRAFT = "aircraft"; + public static final String FIELD_BOOKING_AGENCY_EMAIL = "booking_agency_email"; + + // --- The subject name of the class object + public static final String NAMESPACE = "com.thej3.apache_flink_kickstater.model"; + public static final String SUBJECT = NAMESPACE + ".AirlineData"; + + + @JsonProperty(FIELD_EMAIL_ADDRESS) + private String email_address; + + @JsonProperty(FIELD_DEPARTURE_TIME) + private String departure_time; + + @JsonProperty(FIELD_DEPARTURE_AIRPORT_CODE) + private String departure_airport_code; + + @JsonProperty(FIELD_ARRIVAL_TIME) + private String arrival_time; + + @JsonProperty(FIELD_ARRIVAL_AIRPORT_CODE) + private String arrival_airport_code; + + @JsonProperty(FIELD_FLIGHT_DURATION) + private long flight_duration; + + @JsonProperty(FIELD_FLIGHT_NUMBER) + private String flight_number; + + @JsonProperty(FIELD_CONFIRMATION_CODE) + private String confirmation_code; + + @JsonProperty(FIELD_TICKET_PRICE) + private BigDecimal ticket_price; + + @JsonProperty(FIELD_AIRCRAFT) + private String aircraft; + + @JsonProperty(FIELD_BOOKING_AGENCY_EMAIL) + private String booking_agency_email; + + + /** + * Default constructor. + */ + public AirlineAvroData() {} + + + public String getEmailAddress() { + return this.email_address; + } + + public void setEmailAddress(String emailAddress) { + this.email_address = emailAddress; + } + + public String getDepartureTime() { + return this.departure_time; + } + + public void setDepartureTime(String departureTime) { + this.departure_time = departureTime; + } + + public String getDepartureAirportCode() { + return this.departure_airport_code; + } + + public void setDepartureAirportCode(String departureAirport) { + this.departure_airport_code = departureAirport; + } + + public String getArrivalTime() { + return this.arrival_time; + } + + public void setArrivalTime(String arrivalTime) { + this.arrival_time = arrivalTime; + } + + public String getArrivalAirportCode() { + return this.arrival_airport_code; + } + + public void setArrivalAirportCode(String arrivalAirport) { + this.arrival_airport_code = arrivalAirport; + } + + public long getFlightDuration() { + return this.flight_duration; + } + + public void setFlightDuration(long flightDuration) { + this.flight_duration = flightDuration; + } + + public String getFlightNumber() { + return this.flight_number; + } + + public void setFlightNumber(String flightNumber) { + this.flight_number = flightNumber; + } + + public String getConfirmationCode() { + return this.confirmation_code; + } + + public void setConfirmationCode(String confirmationCode) { + this.confirmation_code = confirmationCode; + } + + public BigDecimal getTicketPrice() { + return this.ticket_price; + } + + public void setTicketPrice(BigDecimal totalPrice) { + this.ticket_price = totalPrice; + } + + public String getAircraft() { + return this.aircraft; + } + + public void setAircraft(String aircraft) { + this.aircraft = aircraft; + } + + public String getBookingAgencyEmail() { + return this.booking_agency_email; + } + + public void setBookingAgencyEmail(String bookingAgencyEmail) { + this.booking_agency_email = bookingAgencyEmail; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AirlineAvroData that = (AirlineAvroData) o; + return Objects.equals(getEmailAddress(), that.email_address) && + Objects.equals(getDepartureTime(), that.departure_time) && + Objects.equals(getDepartureAirportCode(), that.departure_airport_code) && + Objects.equals(getArrivalTime(), that.arrival_time) && + Objects.equals(getArrivalAirportCode(), that.arrival_airport_code) && + Objects.equals(getFlightDuration(), that.flight_duration) && + Objects.equals(getFlightNumber(), that.flight_number) && + Objects.equals(getConfirmationCode(), that.confirmation_code) && + Objects.equals(getTicketPrice(), that.ticket_price) && + Objects.equals(getAircraft(), that.aircraft) && + Objects.equals(getBookingAgencyEmail(), that.booking_agency_email); + } + + @Override + public int hashCode() { + return Objects.hash(getEmailAddress(), + getDepartureTime(), + getDepartureAirportCode(), + getArrivalTime(), + getArrivalAirportCode(), + getFlightDuration(), + getFlightNumber(), + getConfirmationCode(), + getTicketPrice(), + getAircraft(), + getBookingAgencyEmail()); + } + + @Override + public String toString() { + return "AirlineData{" + + "email_address='" + getEmailAddress() + '\'' + + ", departure_time=" + getDepartureTime() + + ", departure_airport_code='" + getDepartureAirportCode() + '\'' + + ", arrival_time=" + getArrivalTime() + + ", arrival_airport_code='" + getArrivalAirportCode() + '\'' + + ", flight_duration=" + getFlightDuration() + + ", flight_number='" + getFlightNumber() + '\'' + + ", confirmation_code='" + getConfirmationCode() + '\'' + + ", ticket_price=" + getTicketPrice() + + ", aircraft='" + getAircraft() + '\'' + + ", booking_agency_email='" + getBookingAgencyEmail() + '\'' + + '}'; + } + public FlightData toFlightData(final String airline) { + FlightData flightData = new FlightData(); + + flightData.setEmailAddress(getEmailAddress()); + flightData.setDepartureTime(getDepartureTime()); + flightData.setDepartureAirportCode(getDepartureAirportCode()); + flightData.setArrivalTime(getArrivalTime()); + flightData.setArrivalAirportCode(getArrivalAirportCode()); + flightData.setFlightNumber(getFlightNumber()); + flightData.setConfirmationCode(getConfirmationCode()); + flightData.setAirline(airline); + + return flightData; + } + + @Override + public Schema getSchema() { + return SCHEMA$; + } + + @Override + public Object get(int field$) { + switch (field$) { + case 0: return email_address; + case 1: return departure_time; + case 2: return departure_airport_code; + case 3: return arrival_time; + case 4: return arrival_airport_code; + case 5: return flight_duration; + case 6: return flight_number; + case 7: return confirmation_code; + case 8: + return ByteBuffer.wrap(getTicketPrice().toString().getBytes(StandardCharsets.UTF_8)); + case 9: + return getAircraft(); + case 10: + return getBookingAgencyEmail(); + default: + throw new IndexOutOfBoundsException("Invalid field index: " + field$); + } + } + + @Override + @SuppressWarnings("unchecked") + public void put(int field$, Object value$) { + switch (field$) { + case 0: email_address = (String) value$; break; + case 1: departure_time = value$.toString(); break; + case 2: departure_airport_code = (String) value$; break; + case 3: arrival_time = (String) value$; break; + case 4: arrival_airport_code = value$.toString(); break; + case 5: flight_duration = (long) value$; break; + case 6: flight_number = (String) value$; break; + case 7: confirmation_code = value$.toString(); break; + case 8: + setTicketPrice(new BigDecimal(new String(ByteBuffer.wrap(value$.toString().getBytes(StandardCharsets.UTF_8)).array(), StandardCharsets.UTF_8))); + break; + case 9: + setAircraft(value$.toString()); + break; + case 10: + setBookingAgencyEmail((String) value$); + break; + default: + throw new IndexOutOfBoundsException("Invalid field index: " + field$); + } + } +} \ No newline at end of file diff --git a/java/app/src/main/java/kickstarter/model/AirlineData.java b/java/app/src/main/java/kickstarter/model/AirlineData.java index e18541e..dd4b418 100644 --- a/java/app/src/main/java/kickstarter/model/AirlineData.java +++ b/java/app/src/main/java/kickstarter/model/AirlineData.java @@ -8,27 +8,17 @@ package kickstarter.model; import com.fasterxml.jackson.annotation.*; -import org.apache.avro.specific.*; -import org.apache.avro.io.*; import org.apache.avro.*; import org.apache.avro.generic.*; import io.confluent.kafka.schemaregistry.avro.*; +import java.io.Serializable; import java.math.*; import java.util.*; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; - -import org.apache.avro.generic.GenericFixed; import kickstarter.helper.*; -@SuppressWarnings("all") -public class AirlineData extends SpecificRecordBase implements SpecificRecord { - public static final Schema SCHEMA$ = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AirlineData\",\"namespace\":\"com.thej3.apache_flink_kickstater.model\",\"fields\":[{\"name\":\"email_address\",\"type\":\"string\"},{\"name\":\"departure_time\",\"type\":\"string\"},{\"name\":\"departure_airport_code\",\"type\":\"string\"},{\"name\":\"arrival_time\",\"type\":\"int\"},{\"name\":\"arrival_airport_code\",\"type\":\"string\"},{\"name\":\"flight_duration\",\"type\":\"long\"},{\"name\":\"flight_number\",\"type\":\"string\"},{\"name\":\"confirmation_code\",\"type\":\"string\"},{\"name\":\"ticket_price\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}},{\"name\":\"aircraft\",\"type\":\"string\"},{\"name\":\"booking_agency_email\",\"type\":\"string\"}]}"); - +public class AirlineData implements Serializable { // --- The field names of the class object public static final String FIELD_EMAIL_ADDRESS = "email_address"; public static final String FIELD_DEPARTURE_TIME = "departure_time"; @@ -43,11 +33,10 @@ public class AirlineData extends SpecificRecordBase implements SpecificRecord { public static final String FIELD_BOOKING_AGENCY_EMAIL = "booking_agency_email"; // --- The subject name of the class object - public static final String NAMESPACE = "com.thej3.apache_flink_kickstater.model"; - public static final String SUBJECT = NAMESPACE + ".AirlineData"; - + public static final String SUBJECT_AIRLINE_DATA = "AirlineData"; + public static final String NAMESPACE_THEJ3_MODEL = "com.thej3.apache_flink_kickstater.model"; - @JsonProperty(FIELD_EMAIL_ADDRESS) + @JsonProperty(FIELD_EMAIL_ADDRESS) private String email_address; @JsonProperty(FIELD_DEPARTURE_TIME) @@ -239,56 +228,87 @@ public FlightData toFlightData(final String airline) { return flightData; } - @Override - public Schema getSchema() { - return SCHEMA$; - } - - @Override - public Object get(int field$) { - switch (field$) { - case 0: return email_address; - case 1: return departure_time; - case 2: return departure_airport_code; - case 3: return arrival_time; - case 4: return arrival_airport_code; - case 5: return flight_duration; - case 6: return flight_number; - case 7: return confirmation_code; - case 8: - return ByteBuffer.wrap(getTicketPrice().toString().getBytes(StandardCharsets.UTF_8)); - case 9: - return getAircraft(); - case 10: - return getBookingAgencyEmail(); - default: - throw new IndexOutOfBoundsException("Invalid field index: " + field$); - } + /** + * The method converts the instantiated class object into a generic record. + * + * @return the generic record of the instantiated class object. + * @throws AvroSchemaFieldNotExistException - The exception is thrown when the + * schema field does not exist. + */ + public GenericRecord toGenericRecord() throws AvroSchemaFieldNotExistException { + Schema schema = buildSchema().rawSchema(); + GenericRecord genericRecord = new GenericData.Record(schema); + AvroHelper.setRecordField(genericRecord, schema, FIELD_EMAIL_ADDRESS, getEmailAddress()); + AvroHelper.setRecordField(genericRecord, schema, FIELD_DEPARTURE_TIME, getDepartureTime()); + AvroHelper.setRecordField(genericRecord, schema, FIELD_DEPARTURE_AIRPORT_CODE, getDepartureAirportCode()); + AvroHelper.setRecordField(genericRecord, schema, FIELD_ARRIVAL_TIME, getArrivalTime()); + AvroHelper.setRecordField(genericRecord, schema, FIELD_ARRIVAL_AIRPORT_CODE, getArrivalAirportCode()); + AvroHelper.setRecordField(genericRecord, schema, FIELD_FLIGHT_DURATION, getFlightDuration()); + AvroHelper.setRecordField(genericRecord, schema, FIELD_FLIGHT_NUMBER, getFlightNumber()); + AvroHelper.setRecordField(genericRecord, schema, FIELD_CONFIRMATION_CODE, getConfirmationCode()); + AvroHelper.setRecordField(genericRecord, schema, FIELD_TICKET_PRICE, getTicketPrice()); + AvroHelper.setRecordField(genericRecord, schema, FIELD_AIRCRAFT, getAircraft()); + AvroHelper.setRecordField(genericRecord, schema, FIELD_BOOKING_AGENCY_EMAIL, getBookingAgencyEmail()); + return genericRecord; } - @Override - @SuppressWarnings("unchecked") - public void put(int field$, Object value$) { - switch (field$) { - case 0: email_address = (String) value$; break; - case 1: departure_time = value$.toString(); break; - case 2: departure_airport_code = (String) value$; break; - case 3: arrival_time = (String) value$; break; - case 4: arrival_airport_code = value$.toString(); break; - case 5: flight_duration = (long) value$; break; - case 6: flight_number = (String) value$; break; - case 7: confirmation_code = value$.toString(); break; - case 8: - setTicketPrice(new BigDecimal(new String(ByteBuffer.wrap(value$.toString().getBytes(StandardCharsets.UTF_8)).array(), StandardCharsets.UTF_8))); - break; - case 9: - setAircraft(value$.toString()); - break; - case 10: - setBookingAgencyEmail((String) value$); - break; - default: - throw new IndexOutOfBoundsException("Invalid field index: " + field$); - } + /** + * The method creates the class object record schema. + * + * @return the class objects converts the underlying org.apache.avro.Schema + * to an io.confluent.kafka.schemaregistry.avro.AvroSchema because it is tailored + * to the needs of schema management within the ecosystem of Kafka and the + * Confluent Schema Registry. Moreover, the capabilities of + * io.confluent.kafka.schemaregistry.avro.AvroSchema extends the ability to support + * the Confluent Schema Registry for registering, retrieving, and managing schemas. + * These include compatibility checks, schema versioning, and other registry-specific + * operations. + */ + public static AvroSchema buildSchema() { + // --- Returns the defined schema + return + new AvroSchema(SchemaBuilder + .record(SUBJECT_AIRLINE_DATA).namespace(NAMESPACE_THEJ3_MODEL) + .doc("") + .fields() + .name(FIELD_EMAIL_ADDRESS) + .doc("") + .type().stringType().noDefault() + .name(FIELD_DEPARTURE_TIME) + .doc("") + .type().stringType().noDefault() + .name(FIELD_DEPARTURE_AIRPORT_CODE) + .doc("") + .type().stringType().noDefault() + .name(FIELD_ARRIVAL_TIME) + .doc("") + .type().intType().noDefault() + .name(FIELD_ARRIVAL_AIRPORT_CODE) + .doc("") + .type().stringType().noDefault() + .name(FIELD_FLIGHT_DURATION) + .doc("") + .type().longType().noDefault() + .name(FIELD_FLIGHT_NUMBER) + .doc("") + .type().stringType().noDefault() + .name(FIELD_CONFIRMATION_CODE) + .doc("") + .type().stringType().noDefault() + .name(FIELD_TICKET_PRICE) + .doc("") + /* + * Since Avro does not have a built-in decimal type, the logical type of decimal + * is used to represent the decimal type. The logical type of decimal is represented + * by bytes. + */ + .type(LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES))).noDefault() + .name(FIELD_AIRCRAFT) + .doc("") + .type().stringType().noDefault() + .name(FIELD_BOOKING_AGENCY_EMAIL) + .doc("") + .type().stringType().noDefault() + .endRecord()); } } \ No newline at end of file diff --git a/java/app/src/test/java/kickstarter/FlightImporterAppTest.java b/java/app/src/test/java/kickstarter/FlightImporterAppTest.java new file mode 100644 index 0000000..e422ceb --- /dev/null +++ b/java/app/src/test/java/kickstarter/FlightImporterAppTest.java @@ -0,0 +1,84 @@ +/** + * Copyright (c) 2024 Jeffrey Jonathan Jennings + * + * @author Jeffrey Jonathan Jennings (J3) + * + * + */ +package kickstarter; + +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.*; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.*; +import java.time.format.*; +import java.util.*; +import static org.junit.jupiter.api.Assertions.*; + +import kickstarter.model.*; + + +class FlightImporterAppTest { + + StreamExecutionEnvironment env; + DataStream.Collector collector; + + static final MiniClusterResourceConfiguration miniClusterConfig = new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(2) + .setNumberTaskManagers(1) + .build(); + + @RegisterExtension + static final MiniClusterExtension FLINK = new MiniClusterExtension(miniClusterConfig); + + private void assertContains(DataStream.Collector collector, List expected) { + List actual = new ArrayList<>(); + collector.getOutput().forEachRemaining(actual::add); + + assertEquals(expected.size(), actual.size()); + + assertTrue(actual.containsAll(expected)); + } + + @BeforeEach + void setup() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + collector = new DataStream.Collector<>(); + } + + @Test + void defineWorkflow_shouldConvertDataFromTwoStreams() throws Exception { + AirlineData skyOneFlight = new TestHelpers.AirlineFlightDataBuilder().build(); + AirlineData sunsetFlight = new TestHelpers.AirlineFlightDataBuilder().build(); + + DataStreamSource skyOneStream = env.fromData(skyOneFlight); + DataStreamSource sunsetStream = env.fromData(sunsetFlight); + + FlightImporterApp.defineWorkflow(skyOneStream, sunsetStream).collectAsync(collector); + env.executeAsync(); + assertContains(collector, Arrays.asList(skyOneFlight.toFlightData("SkyOne"), sunsetFlight.toFlightData("Sunset"))); + } + + @Test + void defineWorkflow_shouldFilterOutFlightsInThePast() throws Exception { + final String addMinuteToNow = LocalDateTime.now().plusMinutes(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + final String subtractSecondToNow = LocalDateTime.now().minusSeconds(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + + AirlineData newSkyOneFlight = new TestHelpers.AirlineFlightDataBuilder().setArrivalTime(addMinuteToNow).build(); + AirlineData oldSkyOneFlight = new TestHelpers.AirlineFlightDataBuilder().setArrivalTime(subtractSecondToNow).build(); + AirlineData newSunsetFlight = new TestHelpers.AirlineFlightDataBuilder().setArrivalTime(addMinuteToNow).build(); + AirlineData oldSunsetFlight = new TestHelpers.AirlineFlightDataBuilder().setArrivalTime(subtractSecondToNow).build(); + + DataStreamSource skyOneStream = env.fromData(newSkyOneFlight, oldSkyOneFlight); + DataStreamSource sunsetStream = env.fromData(newSunsetFlight, oldSunsetFlight); + + FlightImporterApp.defineWorkflow(skyOneStream, sunsetStream).collectAsync(collector); + + env.executeAsync(); + + assertContains(collector, Arrays.asList(newSkyOneFlight.toFlightData("SkyOne"), newSunsetFlight.toFlightData("Sunset"))); + } +} \ No newline at end of file diff --git a/java/app/src/test/java/kickstarter/model/AirlineFlightDataTest.java b/java/app/src/test/java/kickstarter/model/AirlineFlightDataTest.java new file mode 100644 index 0000000..6a690ab --- /dev/null +++ b/java/app/src/test/java/kickstarter/model/AirlineFlightDataTest.java @@ -0,0 +1,107 @@ +/** + * Copyright (c) 2024 Jeffrey Jonathan Jennings + * + * @author Jeffrey Jonathan Jennings (J3) + * + * + */ +package kickstarter.model; + +import org.junit.jupiter.api.Test; +import static org.apache.flink.types.PojoTestUtils.assertSerializedAsPojo; +import static org.junit.jupiter.api.Assertions.*; + +import kickstarter.*; + + + +class AirlineFlightDataTest { + + @Test + void theClass_shouldBeSerializableAsAPOJO() { + assertSerializedAsPojo(AirlineData.class); + } + + @Test + void setters_shouldPopulateExpectedFields() { + AirlineData expected = new TestHelpers.AirlineFlightDataBuilder().build(); + AirlineData actual = new AirlineData(); + actual.setEmailAddress(expected.getEmailAddress()); + actual.setDepartureTime(expected.getDepartureTime()); + actual.setDepartureAirportCode(expected.getDepartureAirportCode()); + actual.setArrivalTime(expected.getArrivalTime()); + actual.setArrivalAirportCode(expected.getArrivalAirportCode()); + actual.setFlightNumber(expected.getFlightNumber()); + actual.setConfirmationCode(expected.getConfirmationCode()); + + assertEquals(expected.getEmailAddress(), actual.getEmailAddress()); + assertEquals(expected.getDepartureTime(), actual.getDepartureTime()); + assertEquals(expected.getDepartureAirportCode(), actual.getDepartureAirportCode()); + assertEquals(expected.getArrivalTime(), actual.getArrivalTime()); + assertEquals(expected.getArrivalAirportCode(), actual.getArrivalAirportCode()); + assertEquals(expected.getFlightNumber(), actual.getFlightNumber()); + assertEquals(expected.getConfirmationCode(), actual.getConfirmationCode()); + } + + @Test + void equals_shouldReturnTrue_forTwoEquivalentFlights() { + AirlineData flight1 = new TestHelpers.AirlineFlightDataBuilder().build(); + AirlineData flight2 = new AirlineData(); + flight2.setEmailAddress(flight1.getEmailAddress()); + flight2.setDepartureTime(flight1.getDepartureTime()); + flight2.setDepartureAirportCode(flight1.getDepartureAirportCode()); + flight2.setArrivalTime(flight1.getArrivalTime()); + flight2.setArrivalAirportCode(flight1.getArrivalAirportCode()); + flight2.setFlightNumber(flight1.getFlightNumber()); + flight2.setConfirmationCode(flight1.getConfirmationCode()); + + assertNotSame(flight1, flight2); + assertEquals(flight1, flight2); + assertEquals(flight1.hashCode(), flight2.hashCode()); + } + + @Test + void equals_shouldReturnFalse_forTwoDifferentFlights() { + AirlineData flight1 = new TestHelpers.AirlineFlightDataBuilder().build(); + AirlineData flight2 = new TestHelpers.AirlineFlightDataBuilder().build(); + + assertNotSame(flight1, flight2); + assertNotEquals(flight1, flight2); + assertNotEquals(flight1.hashCode(), flight2.hashCode()); + } + + @Test + void toString_shouldReturnTheExpectedResults() { + AirlineData flightData = new TestHelpers.AirlineFlightDataBuilder().build(); + + String expected = "AirlineData{" + + "email_address='" + flightData.getEmailAddress() + '\'' + + ", departure_time=" + flightData.getDepartureTime() + + ", departure_airport_code='" + flightData.getDepartureAirportCode() + '\'' + + ", arrival_time=" + flightData.getArrivalTime() + + ", arrival_airport_code='" + flightData.getArrivalAirportCode() + '\'' + + ", flight_number='" + flightData.getFlightNumber() + '\'' + + ", confirmation_code='" + flightData.getConfirmationCode() + '\'' + + '}'; + assertNotEquals(expected, flightData.toString()); + } + + @Test + void toFlightData_shouldConvertToAFlightDataObject() { + AirlineData skyOne = new TestHelpers.AirlineFlightDataBuilder().build(); + FlightData expected = new FlightData(); + expected.setEmailAddress(skyOne.getEmailAddress()); + expected.setDepartureTime(skyOne.getDepartureTime()); + expected.setDepartureAirportCode(skyOne.getDepartureAirportCode()); + expected.setArrivalTime(skyOne.getArrivalTime()); + expected.setArrivalAirportCode(skyOne.getArrivalAirportCode()); + expected.setFlightNumber(skyOne.getFlightNumber()); + expected.setConfirmationCode(skyOne.getConfirmationCode()); + expected.setAirline("SkyOne"); + + + FlightData actual = skyOne.toFlightData("SkyOne"); + + assertEquals(expected, actual); + } +} \ No newline at end of file