Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolved #414. #415

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion linux-Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ WORKDIR /opt/flink/python_apps
RUN uv sync --frozen

# Zip Python files
RUN zip -r python_files.zip /opt/flink/python_apps/src/kickstarter/* -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*'
WORKDIR /opt/flink/python_apps/src/kickstarter
RUN zip -r python_files.zip * -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*'

# Set the entrypoint to Flink's entrypoint script
CMD ["./bin/start-cluster.sh"]
3 changes: 2 additions & 1 deletion mac-Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ WORKDIR /opt/flink/python_apps
RUN uv sync --frozen

# Zip Python files
RUN zip -r python_files.zip /opt/flink/python_apps/src/kickstarter/* -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*'
WORKDIR /opt/flink/python_apps/src/kickstarter
RUN zip -r python_files.zip * -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*'

# Set the entrypoint to Flink's entrypoint script
ENTRYPOINT ["/docker-entrypoint.sh"]
2 changes: 1 addition & 1 deletion python/src/kickstarter/flight_importer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink, KafkaRecordSerializationSchema, KafkaOffsetsInitializer, DeliveryGuarantee
from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.table import StreamTableEnvironment
from pyflink.table.catalog import ObjectPath, HiveCatalog, Catalog
from pyflink.table.catalog import ObjectPath
from datetime import datetime, timezone
import argparse

Expand Down
17 changes: 11 additions & 6 deletions python/src/kickstarter/flink_kickstarter_visualization.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableEnvironment, EnvironmentSettings, StreamTableEnvironment
from pyflink.table.catalog import ObjectPath
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
import argparse
from typing import Tuple
import pandas as pd
Expand Down Expand Up @@ -33,7 +32,7 @@ def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd.
Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: is a tuple of Pandas DataFrames.
"""
# Get the number of flights per month by airline, year, and month
airline_monthly_flights_table = _tbl_env.sql_query(f"""
airline_monthly_flights_table = _tbl_env.sql_query("""
select
airline,
extract(year from to_timestamp(departure_time)) as departure_year,
Expand All @@ -54,7 +53,7 @@ def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd.
df_airline_monthly_flights_table = airline_monthly_flights_table.to_pandas()

# Get the top airports with the most departures by airport, airline, year, and rank
ranked_airports_table = _tbl_env.sql_query(f"""
ranked_airports_table = _tbl_env.sql_query("""
with cte_ranked as (
select
airline,
Expand Down Expand Up @@ -88,7 +87,13 @@ def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd.
df_ranked_airports_table = ranked_airports_table.to_pandas()

# Get the flight data by airline and year
flight_table = _tbl_env.sql_query(f"SELECT *, extract(year from to_timestamp(departure_time)) as departure_year FROM {database_name}.flight")
flight_table = _tbl_env.sql_query(f"""
select
*,
extract(year from to_timestamp(departure_time)) as departure_year
from
{database_name}.flight
""")
df_flight_table = flight_table.to_pandas()

return df_airline_monthly_flights_table, df_ranked_airports_table, df_flight_table
Expand Down Expand Up @@ -122,7 +127,7 @@ def main(args):
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)

# --- Add the Python dependency script files to the environment
env.add_python_archive("/opt/flink/python_apps/kickstarter/python_files.zip")
env.add_python_archive("/opt/flink/python_apps/src/kickstarter/python_files.zip")

# --- Create a Table Environment
tbl_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=EnvironmentSettings.new_instance().in_batch_mode().build())
Expand Down