A Prefect pipeline that ingests data from two CSV sources, persists the raw data to a database, and then transforms the data into a normalized form.
This pipeline created using tools:
- Prefect for workflow management
- Pandas for data manipulation
- SQLAlchemy for database interaction
- Pydantic for data validation
- Postgres as a database
make init
To run the pipeline:
make local_run
It will run database in Docker and Prefect flow in the local environment.
Access the Prefect UI at http://localhost:4200/runs
Copy .env.example
to .env
and fill in the required environment variables.
Update DATABASE_DSN
in the .env
file with your database connection string.
Example queries can be found in the analytics.sql
file.
- Run flows in Docker container
- Add unittests for the pipeline tasks
- Add integration tests for the pipeline
- Optimize the pipeline for large datasets (current implementation works for relatively small datasets, work with raw data should be done in chunks)
- Depending on the data source, consider using an incremental load strategy
- If needed, historical data can be stored in and calculated in
staging
schema - To efficiently handle large datasets, consider using columnar storage like Parquet or Redshift.
- Add
analytics
schema to create customized views and tables for analytics purposes:- Create a materialized view for the most frequently used queries
- Assuming pipeline will be run on a regular basis, add
cron
scheduler to the flow and download data for the last period from the source