Skip to content

Commit

Permalink
Resolved #435.
Browse files Browse the repository at this point in the history
  • Loading branch information
j3-signalroom committed Nov 7, 2024
1 parent 388ca95 commit a2221ef
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 2 deletions.
313 changes: 313 additions & 0 deletions .blog/apache-iceberg-in-action-with-apache-flink-using-java.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,316 @@ try {
- **Execute and Collect:** Adds newly created Producer Kafka Client Properties (`producerProperties`) to the DAG to ensure they can set up the Kafka sinks.
- **Error Handling:** If any exception occurs during this process, the application prints the error, logs it, and exits a non-zero status.

### Step 5 of 11. Create fictional airline Data Sources and add them to the DAG

**Sky One Source:**

```java
DataGeneratorSource<AirlineData> skyOneSource =
new DataGeneratorSource<>(
index -> DataGenerator.generateAirlineFlightData("SKY1"),
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1),
Types.POJO(AirlineData.class)
);
DataStream<AirlineData> skyOneStream =
env.fromSource(skyOneSource, WatermarkStrategy.noWatermarks(), "skyone_source");
```

- **Data Generator for Sky One:** Generates synthetic flight data (`AirlineData`) for `SkyOne` airline using `DataGeneratorSource`. The generator runs indefinitely (`Long.MAX_VALUE`) and generates one record per second (`RateLimiterStrategy.perSecond(1)`).
- **Create Data Stream:** Converts the data source into a `DataStream<AirlineData>` named `skyOneStream`.

**Sunset Source:**

```java
DataGeneratorSource<AirlineData> sunsetSource =
new DataGeneratorSource<>(
index -> DataGenerator.generateAirlineFlightData("SUN"),
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1),
Types.POJO(AirlineData.class)
);
DataStream<AirlineData> sunsetStream =
env.fromSource(sunsetSource, WatermarkStrategy.noWatermarks(), "sunset_source");
```

- **Data Generator for Sunset Air:** Similarly, creates a data generator for `Sunset Air` airline.
- **Create Data Stream:** Converts the data source into a `DataStream<AirlineData>` named `sunsetStream`.

### Step 6 of 11. Create Kafka Sinks and them to the DAG
> _This article focuses on Apache Iceberg with AWS Glue Data Catalog and Apache Flink, so I did not cover Confluent Cloud, AWS Secrets, or AWS Systems Manager Parameter Store. For related code, see [`confluent-resources.tf`](https://github.com/j3-signalroom/apache_flink-kickstarter/blob/main/confluent-resources.tf)._
**Sky One Sink:**
```java
KafkaRecordSerializationSchema<AirlineData> skyOneSerializer =
KafkaRecordSerializationSchema.<AirlineData>builder()
.setTopic("airline.skyone")
.setValueSerializationSchema(new JsonSerializationSchema<>(Common::getMapper))
.build();

KafkaSink<AirlineData> skyOneSink =
KafkaSink.<AirlineData>builder()
.setKafkaProducerConfig(producerProperties)
.setRecordSerializer(skyOneSerializer)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
skyOneStream.sinkTo(skyOneSink).name("skyone_sink");
```

- **Serialization Schema:** Creates a serialization schema for `SkyOne` using `JsonSerializationSchema`, which converts AirlineData objects to JSON format.
- Configures a Kafka sink (`KafkaSink<AirlineData>`) with the producer properties retrieved earlier.
- Uses `AT_LEAST_ONCE` delivery guarantee to ensure that messages are not lost, although duplicates may be possible.
- Attach Sink: Add the `KafkaSink` to the DAG.

**Sunset Sink:**
```java
KafkaRecordSerializationSchema<AirlineData> sunsetSerializer =
KafkaRecordSerializationSchema.<AirlineData>builder()
.setTopic("airline.sunset")
.setValueSerializationSchema(new JsonSerializationSchema<>(Common::getMapper))
.build();
KafkaSink<AirlineData> sunsetSink =
KafkaSink.<AirlineData>builder()
.setKafkaProducerConfig(producerProperties)
.setRecordSerializer(sunsetSerializer)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
sunsetStream.sinkTo(sunsetSink).name("sunset_sink");
```

- **Serialization Schema:** Similarly, creates a serializer for the `Sunset Air` data to be published to the `airline.sunset` topic.
- **Kafka Sink:** Sets up the Kafka sink for `Sunset Air` with the same configurations as `SkyOne`.
- Uses `AT_LEAST_ONCE` delivery guarantee to ensure that messages are not lost, although duplicates may be possible.
- **Attach Sink:** Add the `KafkaSink` to the DAG.

> _Only streams with sinks attached will be executed when the `StreamExecutionEnvironment.execute()` the method is called._
### Step 7 of 11. Configure, Register, and Set the Apache Iceberg Catalog
```java
String catalogName = "apache_kickstarter";
String bucketName = serviceAccountUser.replace("_", "-"); // --- To follow S3 bucket naming convention, replace underscores with hyphens if exist in string.
String catalogImpl = "org.apache.iceberg.aws.glue.GlueCatalog";
String databaseName = "airlines";
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put("type", "iceberg");
catalogProperties.put("warehouse", "s3://" + bucketName + "/warehouse");
catalogProperties.put("catalog-impl", catalogImpl);
catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
catalogProperties.put("glue.skip-archive", "true");
catalogProperties.put("glue.region", awsRegion);
CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName, org.apache.flink.configuration.Configuration.fromMap(catalogProperties));
tblEnv.createCatalog(catalogName, catalogDescriptor);
tblEnv.useCatalog(catalogName);
org.apache.flink.table.catalog.Catalog catalog = tblEnv.getCatalog(catalogName).orElseThrow(() -> new RuntimeException("Catalog not found"));

// --- Print the current catalog name.
System.out.println("Current catalog: " + tblEnv.getCurrentCatalog());
```

**Configure the Apache Iceberg Catalog:**
- **catalogName:** The name of the Iceberg catalog (`apache_kickstarter`), which will reference this catalog in the Flink environment.
- **bucketName:** The S3 bucket where the data will be stored. The code ensures the bucket name follows S3 naming conventions by replacing underscores (`_`) with hyphens (`-`).
- **catalogImpl:** The implementation class for the Iceberg catalog (`org.apache.iceberg.aws.glue.GlueCatalog`). This means that AWS Glue will be used for metadata management.
- **databaseName:** The database within the catalog (`airlines`), which will store related tables.
- **catalogProperties map type:** Defines the catalog type as `iceberg`.
- **catalogProperties map warehouse:** Specifies the warehouse location in S3 (`s3://<bucketName>/warehouse`). This is where Apache Iceberg Tables' data will be stored.
- **catalogProperties map catalog-impl:** Specifies the implementation (`GlueCatalog`) to use for managing metadata.
- **catalogProperties map io-impl:** Specifies the I/O implementation (`S3FileIO`) to read from and write to Amazon S3.
- **catalogProperties map glue.skip-archive:** By setting `"true"`, Glue can skip archiving old table metadata, making operations faster.
- **catalogProperties map glue.region:** Sets the AWS region for AWS Glue.

**Register and Set the Apache Iceberg Catalog:**
- **CatalogDescriptor:** Registers the Apache Iceberg Catalog with Flink using the above catalog properties.
- **tblEnv.createCatalog(catalogName, catalogDescriptor):** Registers the catalog with the specified name (`catalogName`) in the `StreamTableEnvironment (tblEnv)`. This makes the catalog available for use within the Flink environment.
- **tblEnv.useCatalog(catalogName):** Sets the newly created catalog as the current one in use, meaning any subsequent table-related commands will reference this one.

### Step 8 of 11. Check if the Apache Flink Catalog Database Exists and Create It if it Does Not
> _What is an Apache Flink Catalog Database?_
>
> _The Apache Flink catalog database is a logical namespace that stores metadata about data sources, including databases, tables, and views. The catalog provides a unified API for managing metadata accessible from the Table API and SQL Queries._
```java
try {
if (!catalog.databaseExists(databaseName)) {
catalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap<>(), "The Airlines flight data database."), false);
}
tblEnv.useDatabase(databaseName);
} catch (Exception e) {
System.out.println("A critical error occurred during the processing of the database because " + e.getMessage());
e.printStackTrace();
System.exit(1);
}

// --- Print the current database name.
System.out.println("Current database: " + tblEnv.getCurrentDatabase());
```

### Step 9 of 11. Define the `RowType` for the `RowData`
```java
RowType rowType = RowType.of(
new LogicalType[] {
DataTypes.STRING().getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.BIGINT().getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.DECIMAL(10, 2).getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.STRING().getLogicalType()
},
new String[] {
"email_address",
"departure_time",
"departure_airport_code",
"arrival_time",
"arrival_airport_code",
"flight_duration",
"flight_number",
"confirmation_code",
"ticket_price",
"aircraft",
"booking_agency_email"
}
);
```

- **`RowType`:** Defines the schema for the rows of data that will be used in the Flink data stream and written to Iceberg tables.

### Step 10 of 11. Call the `SinkToIcebergTable()` helper method for both DataStreams
```java
// --- Use the CatalogLoader since AWS Glue Catalog is used as the external metastore.
CatalogLoader catalogLoader = CatalogLoader.custom(catalogName, catalogProperties, new Configuration(false), catalogImpl);

// --- Sink the datastreams to their respective Apache Iceberg tables.
SinkToIcebergTable(tblEnv, catalog, catalogLoader, databaseName, rowType.getFieldCount(), "skyone_airline", skyOneStream);
SinkToIcebergTable(tblEnv, catalog, catalogLoader, databaseName, rowType.getFieldCount(), "sunset_airline", sunsetStream);
```

- **Catalog Loader:** This indicates that Apache Iceberg uses Flink as its compute engine, while AWS Glue serves as the Catalog managing the metadata and data/delete files for Apache Iceberg.

```java
/**
* This method is used to sink the data from the input data stream into the iceberg table.
*
* @param tblEnv The StreamTableEnvironment.
* @param catalog The Catalog.
* @param catalogLoader The CatalogLoader.
* @param databaseName The name of the database.
* @param fieldCount The number of fields in the input data stream.
* @param tableName The name of the table.
* @param airlineDataStream The input data stream.
*/
private static void SinkToIcebergTable(final StreamTableEnvironment tblEnv, final org.apache.flink.table.catalog.Catalog catalog, final CatalogLoader catalogLoader, final String databaseName, final int fieldCount, final String tableName, DataStream<AirlineData> airlineDataStream) {
// --- Convert DataStream<AirlineData> to DataStream<RowData>
DataStream<RowData> skyOneRowData = airlineDataStream.map(new MapFunction<AirlineData, RowData>() {
@Override
public RowData map(AirlineData airlineData) throws Exception {
GenericRowData rowData = new GenericRowData(RowKind.INSERT, fieldCount);
rowData.setField(0, StringData.fromString(airlineData.getEmailAddress()));
rowData.setField(1, StringData.fromString(airlineData.getDepartureTime()));
rowData.setField(2, StringData.fromString(airlineData.getDepartureAirportCode()));
rowData.setField(3, StringData.fromString(airlineData.getArrivalTime()));
rowData.setField(4, StringData.fromString(airlineData.getArrivalAirportCode()));
rowData.setField(5, airlineData.getFlightDuration());
rowData.setField(6, StringData.fromString(airlineData.getFlightNumber()));
rowData.setField(7, StringData.fromString(airlineData.getConfirmationCode()));
rowData.setField(8, DecimalData.fromBigDecimal(airlineData.getTicketPrice(), 10, 2));
rowData.setField(9, StringData.fromString(airlineData.getAircraft()));
rowData.setField(10, StringData.fromString(airlineData.getBookingAgencyEmail()));
return rowData;
}
});

TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);

// Create the table if it does not exist
if (!catalog.tableExists(ObjectPath.fromString(databaseName + "." + tableName))) {
tblEnv.executeSql(
"CREATE TABLE " + databaseName + "." + tableName + " ("
+ "email_address STRING, "
+ "departure_time STRING, "
+ "departure_airport_code STRING, "
+ "arrival_time STRING, "
+ "arrival_airport_code STRING, "
+ "flight_duration BIGINT,"
+ "flight_number STRING, "
+ "confirmation_code STRING, "
+ "ticket_price DECIMAL(10,2), "
+ "aircraft STRING, "
+ "booking_agency_email STRING) "
+ "WITH ("
+ "'write.format.default' = 'parquet',"
+ "'write.target-file-size-bytes' = '134217728',"
+ "'partitioning' = 'arrival_airport_code',"
+ "'format-version' = '2');"
);
}

/*
* Serializable loader to load an Apache Iceberg Table. Apache Flink needs to get Table objects in the cluster,
* not just on the client side. So we need an Iceberg table loader to get the Table object.
*/
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier);

/*
* Writes data from the Apache Flink datastream to an Apache Iceberg table using upsert logic, where updates or insertions
* are decided based on the specified equality fields (i.e., "email_address", "departure_airport_code", "arrival_airport_code").
*/
FlinkSink
.forRowData(skyOneRowData)
.tableLoader(tableLoader)
.upsert(true)
.equalityFieldColumns(Arrays.asList("email_address", "departure_airport_code", "arrival_airport_code"))
.append();
}
```

- Transforms each `AirlineData` object in the input data stream into a `GenericRowData` (`RowData` type). The RowData is used downstream in Apache Flink for writing to sinks. It allows structured Java objects to be converted into a form that can be integrated with Flink's table APIs, which are better suited for processing and querying structured data.
- **TableIdentifier:** Represents a unique identifier for the Iceberg table, which consists of the `databaseName` and `tableName`. This helps identify the specific Apache Iceberg Table that the data will be written to.
- Creates the Apache Iceberg Table if it does not already exist.
- Sinks the data to an Apache Iceberg Table.

> _Only streams with sinks attached will be executed when the `StreamExecutionEnvironment.execute()` the method is called._
### Step 11 of 11. Execute the DAG
```java
// --- Execute the Flink job graph (DAG)
try {
env.execute("DataGeneratorApp");
} catch (Exception e) {
logger.error("The App stopped early due to the following: {}", e.getMessage());
}
```

- Triggers the Flink Application execution.

## Give it a spin!
First, build the Java application and then execute the Flink application. Run the following command in your Flink cluster environment from the terminal command line, as shown in the example below:

```bash
flink run -class kickstarter.DataGeneratorApp apache_flink-kickstarter-x.xx.xx.xxx.jar -service-account-user <SERVICE_ACCOUNT_USER> -aws-region <AWS_REGION_NAME>
```

If you don’t have your own Flink cluster environment, you can run it from Docker. I have created one specific to this project [here](https://github.com/j3-signalroom/apache_flink-kickstarter/blob/main/README.md) that you can use.

To check the status of the running app, visit the Apache Flink Dashboard as shown below:
![screenshot-datageneratorapp-running-in-flink](images/screenshot-datageneratorapp-running-in-flink.png)

## Summary
The [`DataGeneratorApp`](https://github.com/j3-signalroom/apache_flink-kickstarter/blob/main/java/app/src/main/java/kickstarter/DataGeneratorApp.java) class is a well-rounded Flink application that demonstrates the following:

- **Data Stream Generation:** Using [`DataGenerator`](https://github.com/j3-signalroom/apache_flink-kickstarter/blob/main/java/app/src/main/java/kickstarter/DataGenerator.java) class object to create realistic flight data.
- **Integration with Kafka and Iceberg:** Consuming and Publishing data to/from Kafka for real-time analytics and to Iceberg for historical analysis.
- **AWS Glue for Metadata Management:** Integrating AWS Glue with Iceberg to manage metadata in a centralized, consistent manner.
- **Resiliency and Fault Tolerance:** Implementing checkpointing and delivery guarantees to ensure the stability and reliability of the data pipeline.

This code example embodies the principles of modern data architectures, such as data lakehouses, by seamlessly integrating the strengths of data lakes and data warehouses. It empowers real-time data processing, efficient storage, and in-depth historical analysis — all while offering unmatched flexibility, scalability, and cost-efficiency.

## Resources
Jeffrey Jonathan Jennings. [Apache Flink + Apache Iceberg + AWS Glue: Get Your JAR Versions Right!](). Medium, 2024.
Tomer Shiran, Jason Hughes & Alex Merced. [Apache Iceberg — The Definitive Guide](https://www.dremio.com/wp-content/uploads/2023/02/apache-iceberg-TDG_ER1.pdf). O’Reilly, 2024.
Jeffrey Jonathan Jennings. [Apache Flink Kickstarter](https://github.com/j3-signalroom/apache_flink-kickstarter/tree/main). GitHub, 2024.
Apache Iceberg Community. [Apache Iceberg v1.6.1 Documentation](https://iceberg.apache.org/docs/1.6.1/). The Apache Software Foundation, 2024.
2 changes: 1 addition & 1 deletion java/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,4 @@ Flink App|Flink Run Command
## 3.0 Resources
[Apache Flink + Apache Iceberg + AWS Glue: Get Your JAR Versions Right!](../.blog/get-your-jar-versions-right.md)
[Apache Iceberg in Action with Apache Flink using Java](https://thej3.com/apache-iceberg-in-action-with-apache-flink-using-java-158500688ead)
[Apache Iceberg in Action with Apache Flink using Java](../.blog/apache-iceberg-in-action-with-apache-flink-using-java.md)
4 changes: 3 additions & 1 deletion python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,6 @@ When running Streamlit in this example with Apache Flink, Streamlit runs the Pyt
[Apache Flink® Table API on Confluent Cloud - Examples](https://github.com/confluentinc/flink-table-api-python-examples)
[How to create a User-Defined Table Function (UDTF) in PyFlink to fetch data from an external source for your Flink App?](../.blog/how-create-a-pyflink-udtf.md)
[How to create a User-Defined Table Function (UDTF) in PyFlink to fetch data from an external source for your Flink App?](../.blog/how-create-a-pyflink-udtf.md)
[Apache Iceberg in Action with Apache Flink using Python](../.blog/apache-iceberg-in-action-with-apache-flink-using-python.md)

0 comments on commit a2221ef

Please sign in to comment.