Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔖(prefect) add static and session indicators #368

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/prefect.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ jobs:
run: |
pg_restore -s -F c -d "${QUALICHARGE_DATABASE_URL_NO_DRIVER}" ../../data/qualicharge-api-schema.sql
pg_restore -a -F c -d "${QUALICHARGE_DATABASE_URL_NO_DRIVER}" ../../data/qualicharge-api-data.sql
psql "postgresql://qualicharge:pass@localhost:5432/qualicharge-api" -c "REFRESH MATERIALIZED VIEW Statique;"
- name: Install pipenv
run: pipx install pipenv
- name: Set up Python 3.12
Expand Down
15 changes: 15 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
}
]
}
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ bootstrap: \
seed-dashboard \
jupytext--to-ipynb \
run \
seed-api
seed-api \
refresh-api-static
.PHONY: bootstrap

bootstrap-dashboard: ## bootstrap the dashboard project for development
Expand Down
17 changes: 15 additions & 2 deletions src/prefect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,20 @@ and this project adheres to

### Added

- Implement t1 workflow
- Implement i1 workflow
- Implement i4 workflow
- Implement i7 workflow
- Implement c1 workflow
- Implement c2 workflow
- Implement u5 workflow
- Implement u6 workflow
- Implement u9 workflow
- Implement u10 workflow
- Implement u11 workflow
- Implement u12 workflow
- Implement u13 workflow

### Updated

- use Statique table instead of PointDeCharge table

[unreleased]: https://github.com/MTES-MCT/qualicharge/
64 changes: 29 additions & 35 deletions src/prefect/indicators/infrastructure/i1.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,37 @@
I1: the number of publicly open points of charge.
"""

from datetime import datetime
from string import Template
from typing import List
from uuid import UUID

import numpy as np
import pandas as pd # type: ignore
from prefect import flow, runtime, task
from prefect.artifacts import create_markdown_artifact
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
from sqlalchemy import text
from sqlalchemy.engine import Connection

from ..conf import settings
from ..models import Indicator, IndicatorPeriod, Level
from ..models import Indicator, IndicatorTimeSpan, Level
from ..utils import (
export_indic,
get_database_engine,
get_num_for_level_query_params,
get_targets_for_level,
)

NUM_POCS_FOR_LEVEL_QUERY_TEMPLATE = """
SELECT
COUNT(DISTINCT PointDeCharge.id_pdc_itinerance) AS value,
COUNT(DISTINCT id_pdc_itinerance) AS value,
$level_id AS level_id
FROM
PointDeCharge
INNER JOIN Station ON PointDeCharge.station_id = Station.id
INNER JOIN Localisation ON Station.localisation_id = Localisation.id
INNER JOIN City ON Localisation.code_insee_commune = City.code
Statique
--PointDeCharge
--INNER JOIN Station ON PointDeCharge.station_id = Station.id
--INNER JOIN Localisation ON Station.localisation_id = Localisation.id
INNER JOIN City ON code_insee_commune = City.code
$join_extras
WHERE $level_id IN ($indexes)
GROUP BY $level_id
Expand All @@ -46,22 +46,23 @@ def get_values_for_targets(
) -> pd.DataFrame:
"""Fetch points of charge given input level and target index."""
query_template = Template(NUM_POCS_FOR_LEVEL_QUERY_TEMPLATE)
query_params: dict = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))}
query_params = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))}
query_params |= get_num_for_level_query_params(level)
return pd.read_sql_query(query_template.substitute(query_params), con=connection)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="i1-{period.value}-{level:02d}-{at:%y-%m-%d}",
flow_run_name="i1-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}",
)
def i1_for_level(
level: Level,
period: IndicatorPeriod,
at: datetime,
timespan: IndicatorTimeSpan,
chunk_size=settings.DEFAULT_CHUNK_SIZE,
) -> pd.DataFrame:
"""Calculate i1 for a level."""
if level == Level.NATIONAL:
return i1_national(timespan)
engine = get_database_engine()
with engine.connect() as connection:
targets = get_targets_for_level(connection, level)
Expand All @@ -87,8 +88,8 @@ def i1_for_level(
"value": merged["value"].fillna(0),
"code": "i1",
"level": level,
"period": period,
"timestamp": at,
"period": timespan.period,
"timestamp": timespan.start.isoformat(),
"category": None,
"extras": None,
}
Expand All @@ -97,9 +98,9 @@ def i1_for_level(

@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="i1-{period.value}-00-{at:%y-%m-%d}",
flow_run_name="i1-{timespan.period.value}-00-{timespan.start:%y-%m-%d}",
)
def i1_national(period: IndicatorPeriod, at: datetime) -> pd.DataFrame:
def i1_national(timespan: IndicatorTimeSpan) -> pd.DataFrame:
"""Calculate i1 at the national level."""
engine = get_database_engine()
with engine.connect() as connection:
Expand All @@ -111,37 +112,30 @@ def i1_national(period: IndicatorPeriod, at: datetime) -> pd.DataFrame:
Indicator(
code="i1",
level=Level.NATIONAL,
period=period,
period=timespan.period,
value=count,
timestamp=at,
timestamp=timespan.start.isoformat(),
).model_dump(),
]
)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="meta-i1-{period.value}",
flow_run_name="meta-i1-{timespan.period.value}",
)
def calculate(
period: IndicatorPeriod, create_artifact: bool = False, chunk_size: int = 1000
timespan: IndicatorTimeSpan,
levels: List[Level],
create_artifact: bool = False,
chunk_size: int = 1000,
format_pd: bool = False,
) -> List[Indicator]:
"""Run all i1 subflows."""
now = pd.Timestamp.now()
subflows_results = [
i1_national(period, now),
i1_for_level(Level.REGION, period, now, chunk_size=chunk_size),
i1_for_level(Level.DEPARTMENT, period, now, chunk_size=chunk_size),
i1_for_level(Level.EPCI, period, now, chunk_size=chunk_size),
i1_for_level(Level.CITY, period, now, chunk_size=chunk_size),
i1_for_level(level, timespan, chunk_size=chunk_size) for level in levels
]
indicators = pd.concat(subflows_results, ignore_index=True)

if create_artifact:
create_markdown_artifact(
key=runtime.flow_run.name,
markdown=indicators.to_markdown(),
description=f"i1 report at {now} (period: {period.value})",
)

return [Indicator(**record) for record in indicators.to_dict(orient="records")] # type: ignore[misc]
description = f"i1 report at {timespan.start} (period: {timespan.period.value})"
flow_name = runtime.flow_run.name
return export_indic(indicators, create_artifact, flow_name, description, format_pd)
139 changes: 139 additions & 0 deletions src/prefect/indicators/infrastructure/i4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""QualiCharge prefect indicators: infrastructure.

I4: the number of publicly open stations.
"""

from string import Template
from typing import List
from uuid import UUID

import numpy as np
import pandas as pd # type: ignore
from prefect import flow, runtime, task
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
from sqlalchemy import text
from sqlalchemy.engine import Connection

from ..conf import settings
from ..models import Indicator, IndicatorTimeSpan, Level
from ..utils import (
export_indic,
get_database_engine,
get_num_for_level_query_params,
get_targets_for_level,
)

NUM_STATIONS_FOR_LEVEL_QUERY_TEMPLATE = """
SELECT
COUNT(DISTINCT id_station_itinerance) AS value,
$level_id AS level_id
FROM
Station
INNER JOIN Localisation ON Station.localisation_id = Localisation.id
INNER JOIN City ON Localisation.code_insee_commune = City.code
$join_extras
WHERE $level_id IN ($indexes)
GROUP BY $level_id
"""


@task(task_run_name="values-for-target-{level:02d}")
def get_values_for_targets(
connection: Connection, level: Level, indexes: List[UUID]
) -> pd.DataFrame:
"""Fetch station given input level and target index."""
query_template = Template(NUM_STATIONS_FOR_LEVEL_QUERY_TEMPLATE)
query_params: dict = {"indexes": ",".join(f"'{i}'" for i in map(str, indexes))}
query_params |= get_num_for_level_query_params(level)
return pd.read_sql_query(query_template.substitute(query_params), con=connection)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="i4-{timespan.period.value}-{level:02d}-{timespan.start:%y-%m-%d}",
)
def i4_for_level(
level: Level,
timespan: IndicatorTimeSpan,
chunk_size=settings.DEFAULT_CHUNK_SIZE,
) -> pd.DataFrame:
"""Calculate i4 for a level."""
if level == Level.NATIONAL:
return i4_national(timespan)
engine = get_database_engine()
with engine.connect() as connection:
targets = get_targets_for_level(connection, level)
ids = targets["id"]
chunks = (
np.array_split(ids, int(len(ids) / chunk_size))
if len(ids) > chunk_size
else [ids.to_numpy()]
)
futures = [
get_values_for_targets.submit(connection, level, chunk) # type: ignore[call-overload]
for chunk in chunks
]
wait(futures)

# Concatenate results and serialize indicators
results = pd.concat([future.result() for future in futures], ignore_index=True)
merged = targets.merge(results, how="left", left_on="id", right_on="level_id")

# Build result DataFrame
indicators = {
"target": merged["code"],
"value": merged["value"].fillna(0),
"code": "i4",
"level": level,
"period": timespan.period,
"timestamp": timespan.start.isoformat(),
"category": None,
"extras": None,
}
return pd.DataFrame(indicators)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="i4-{timespan.period.value}-00-{timespan.start:%y-%m-%d}",
)
def i4_national(timespan: IndicatorTimeSpan) -> pd.DataFrame:
"""Calculate i4 at the national level."""
engine = get_database_engine()
with engine.connect() as connection:
result = connection.execute(text("SELECT COUNT(*) FROM Station"))
count = result.one()[0]

return pd.DataFrame.from_records(
[
Indicator(
code="i4",
level=Level.NATIONAL,
period=timespan.period,
value=count,
timestamp=timespan.start.isoformat(),
).model_dump(),
]
)


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=settings.THREAD_POOL_MAX_WORKERS),
flow_run_name="meta-i4-{timespan.period.value}",
)
def calculate(
timespan: IndicatorTimeSpan,
levels: List[Level],
create_artifact: bool = False,
chunk_size: int = 1000,
format_pd: bool = False,
) -> List[Indicator]:
"""Run all i4 subflows."""
subflows_results = [
i4_for_level(level, timespan, chunk_size=chunk_size) for level in levels
]
indicators = pd.concat(subflows_results, ignore_index=True)
description = f"i4 report at {timespan.start} (period: {timespan.period.value})"
flow_name = runtime.flow_run.name
return export_indic(indicators, create_artifact, flow_name, description, format_pd)
Loading
Loading