Skip to content

Commit

Permalink
Merge pull request #380 from j3-signalroom/github_issue-378
Browse files Browse the repository at this point in the history
GitHub issue 378
  • Loading branch information
j3-signalroom authored Oct 19, 2024
2 parents f44edca + e6326b5 commit 9cc0d65
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 28 deletions.
Binary file modified .blog/images/streamlit-screenshot.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion linux-Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ RUN pip install pyflink
RUN pip install py4j==0.10.9.7
RUN pip install google-api-python-client
RUN pip install "pyiceberg[s3fs,glue]"
RUN pip install streamlit
RUN pip install streamlit pandas matplotlib plotly
RUN pip install streamlit-aggrid

# Install AWS CLI to interact with AWS services for debugging services
Expand Down
2 changes: 1 addition & 1 deletion mac-Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ RUN pip install pyflink
RUN pip install py4j==0.10.9.7
RUN pip install google-api-python-client
RUN pip install "pyiceberg[s3fs,glue]"
RUN pip install streamlit
RUN pip install streamlit pandas matplotlib plotly
RUN pip install streamlit-aggrid

# Install AWS CLI to interact with AWS services for debugging services
Expand Down
4 changes: 2 additions & 2 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Curious about the differences between the DataStream API and Table API? Click [h
+ [1.0 Important Note(s)](#10-important-notes)
+ [2.0 Power up the Apache Flink Docker containers](#20-power-up-the-apache-flink-docker-containers)
+ [3.0 Discover What You Can Do with These Flink Apps](#30-discover-what-you-can-do-with-these-flink-apps)
+ [4.0 Unleash Flink for Data Visualization](#40-unleash-flink-for-data-visualization)
+ [4.0 Unleash the Full Power of Flink to Bring Your Data Visualizations to Life!](#40-unleash-the-full-power-of-flink-to-bring-your-data-visualizations-to-life)
+ [5.0 Resources](#50-resources)
<!-- tocstop -->

Expand Down Expand Up @@ -68,7 +68,7 @@ Flink App|Flink Run Command
> `<AWS_S3_BUCKET>`|specify name of the AWS S3 bucket you chosen during the Terraform creation or created yourself separately. The AWS S3 bucket is used to store the Apache Iceberg files (i.e., data files, manifest files, manifest list file, and metadata files).
> `<AWS_REGION_NAME>`|specify the AWS Region your AWS Glue infrastructure resides.
## 4.0 Unleash Flink for Data Visualization
## 4.0 Unleash the Full Power of Flink to Bring Your Data Visualizations to Life!
The exciting part is that after running all your Flink applications, the data now flows seamlessly into your Kafka Topics and Apache Iceberg Tables. But data alone doesn’t tell the story—it’s time to share those insights with the world! One fantastic way to do that is with Streamlit, which allows you to easily create interactive visualizations. Streamlit is intuitive, powerful, and designed with Python developers in mind, making it a breeze to turn raw data into captivating dashboards. 😉
![iceberg-flink-streamlit-drawing](../.blog/images/iceberg-flink-streamlit-drawing.png)
Expand Down
90 changes: 66 additions & 24 deletions python/kickstarter/flink_kickstarter_visualization.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pandas as pd
import streamlit as st
from st_aggrid import AgGrid, GridOptionsBuilder
import matplotlib.pyplot as plt

from helper.utilities import catalog_exist

Expand Down Expand Up @@ -41,7 +42,7 @@ def main(args):
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)

# Create a Table Environment
tbl_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().build())
tbl_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=EnvironmentSettings.new_instance().in_batch_mode().build())

# Create the Apache Iceberg catalog with integration with AWS Glue back by AWS S3
catalog_name = "apache_kickstarter"
Expand Down Expand Up @@ -92,8 +93,6 @@ def main(args):
flight_table = tbl_env.sql_query(f"SELECT * FROM {database_name}.flight")
df_flight_table = flight_table.to_pandas()



# Create grid options with only specific columns
gb = GridOptionsBuilder.from_dataframe(df_flight_table)
gb.configure_columns(["email_address", "departure_time", "departure_airport_code", "arrival_time", "arrival_airport_code", "flight_number", "confirmation_code", "airline"])
Expand All @@ -105,27 +104,70 @@ def main(args):
width='100%'
)

# # Read `flight` data from the Iceberg table and aggregate the data
# flight_aggregration_table = tbl_env.sql_query(f"""
# SELECT
# CONCAT(departure_airport_code, '-', arrival_airport_code),
# COUNT(*) AS total
# FROM {database_name}.flight
# WHERE departure_airport_code = 'ATL'
# GROUP BY CONCAT(departure_airport_code, '-', arrival_airport_code)
# """)
# df_flight_aggregration_table = flight_aggregration_table.to_pandas()
# df_flight_aggregration_table = df_flight_aggregration_table.sort_values(by=["total"], ascending=False)

# # Create grid options with only specific columns
# gb = GridOptionsBuilder.from_dataframe(df_flight_aggregration_table)
# gridOptions = gb.build()
# AgGrid(
# df_flight_aggregration_table,
# gridOptions=gridOptions,
# height=300,
# width='100%'
# )
#
ranked_airports_table = tbl_env.sql_query(f"""
with cte_ranked as (
select
airline,
departure_airport_code,
flight_count,
ROW_NUMBER() OVER (PARTITION BY airline ORDER BY flight_count DESC) AS row_num
from (
select
airline,
departure_airport_code,
count(*) as flight_count
from
airlines.flight
group by
airline,
departure_airport_code
) tbl
)
select
departure_airport_code,
flight_count
from
cte_ranked
where
airline = 'SkyOne' and
row_num <= 5;
""")
df_ranked_airports_table = ranked_airports_table.to_pandas()

fig, ax = plt.subplots()
ax.set_title('Top 5 Airports with the Most Departures for SkyOne')
ax.pie(df_ranked_airports_table['flight_count'], labels=df_ranked_airports_table['departure_airport_code'], autopct='%1.1f%%', startangle=90)
ax.axis('equal') # Equal aspect ratio ensures that the pie is drawn as a circle.

# Display the pie chart in Streamlit
st.pyplot(fig)

#
airline_monthly_flights_table = tbl_env.sql_query(f"""
select
extract(month from to_timestamp(departure_time)) as departure_month,
count(*) as flight_count
from
airlines.flight
where
airline = 'SkyOne' and
extract(year from to_timestamp(departure_time)) = 2025
group by
extract(month from to_timestamp(departure_time))
order by
departure_month asc;
""")
df_airline_monthly_flights_table = airline_monthly_flights_table.to_pandas()

fig, ax = plt.subplots()
ax.bar(df_airline_monthly_flights_table['departure_month'], df_airline_monthly_flights_table['flight_count'])
ax.set_xlabel('departure_month')
ax.set_ylabel('flight_count')
ax.set_title('SkyOne Monthly Flights in 2025')

# Display the bar chart in Streamlit
st.pyplot(fig)


if __name__ == "__main__":
Expand Down

0 comments on commit 9cc0d65

Please sign in to comment.