-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
philippe thomy
committed
Feb 6, 2025
1 parent
0e98820
commit b4f7e0c
Showing
12 changed files
with
1,172 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
"""QualiCharge prefect indicators tests: infrastructure. | ||
I4: the number of publicly open points of charge. | ||
""" | ||
|
||
from datetime import datetime | ||
|
||
import pytest # type: ignore | ||
from sqlalchemy import text | ||
|
||
from indicators.infrastructure import i4 # type: ignore | ||
from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore | ||
|
||
from ..param_tests import ( | ||
PARAM_FLOW, | ||
PARAM_VALUE, | ||
PARAMETERS_CHUNK, | ||
) | ||
|
||
# expected result | ||
N_LEVEL = [65, 1068, 419, 3786] | ||
N_DPTS = 109 | ||
N_NAT_REG_DPT_EPCI_CITY = 36465 | ||
|
||
TIMESPAN = IndicatorTimeSpan(start=datetime.now(), period=IndicatorPeriod.DAY) | ||
PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] | ||
PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] | ||
|
||
|
||
@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) | ||
def test_task_get_values_for_target(db_connection, level, query, expected): | ||
"""Test the `get_values_for_target` task.""" | ||
result = db_connection.execute(text(query)) | ||
indexes = list(result.scalars().all()) | ||
values = i4.get_values_for_targets.fn(db_connection, level, indexes) | ||
assert len(values) == len(indexes) | ||
assert values["value"].sum() == expected | ||
|
||
|
||
def test_task_get_values_for_target_unexpected_level(db_connection): | ||
"""Test the `get_values_for_target` task (unknown level).""" | ||
with pytest.raises(NotImplementedError, match="Unsupported level"): | ||
i4.get_values_for_targets.fn(db_connection, Level.NATIONAL, []) | ||
|
||
|
||
@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) | ||
def test_flow_i4_for_level(db_connection, level, query, targets, expected): | ||
"""Test the `i4_for_level` flow.""" | ||
indicators = i4.i4_for_level(level, TIMESPAN, chunk_size=1000) | ||
assert len(indicators) == db_connection.execute(text(query)).scalars().one() | ||
assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected | ||
|
||
|
||
@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) | ||
def test_flow_i4_for_level_with_various_chunk_sizes(chunk_size): | ||
"""Test the `i4_for_level` flow with various chunk sizes.""" | ||
level, query, targets, expected = PARAMETERS_FLOW[2] | ||
indicators = i4.i4_for_level(level, TIMESPAN, chunk_size=chunk_size) | ||
assert len(indicators) == N_DPTS | ||
assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected | ||
|
||
|
||
def test_flow_i4_national(db_connection): | ||
"""Test the `i4_national` flow.""" | ||
result = db_connection.execute(text("SELECT COUNT(id) FROM Station")) | ||
expected = result.scalars().one() | ||
indicators = i4.i4_national(TIMESPAN) | ||
assert indicators.at[0, "value"] == expected | ||
|
||
|
||
def test_flow_i4_calculate(db_connection): | ||
"""Test the `calculate` flow.""" | ||
expected = N_NAT_REG_DPT_EPCI_CITY | ||
all_levels = [ | ||
Level.NATIONAL, | ||
Level.REGION, | ||
Level.DEPARTMENT, | ||
Level.CITY, | ||
Level.EPCI, | ||
] | ||
indicators = i4.calculate(TIMESPAN, all_levels, create_artifact=True) | ||
assert len(indicators) == expected |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
"""QualiCharge prefect indicators tests: infrastructure. | ||
I7: installed power. | ||
""" | ||
|
||
from datetime import datetime | ||
|
||
import pytest # type: ignore | ||
from sqlalchemy import text | ||
|
||
from indicators.infrastructure import i7 # type: ignore | ||
from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore | ||
|
||
from ..param_tests import ( | ||
PARAM_FLOW, | ||
PARAM_VALUE, | ||
PARAMETERS_CHUNK, | ||
) | ||
|
||
# expected result | ||
N_LEVEL = [18998, 137622, 132546, 664670] | ||
N_DPTS = 109 | ||
N_NAT_REG_DPT_EPCI_CITY = 36465 | ||
|
||
TIMESPAN = IndicatorTimeSpan(start=datetime.now(), period=IndicatorPeriod.DAY) | ||
PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] | ||
PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] | ||
|
||
|
||
@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) | ||
def test_task_get_values_for_target(db_connection, level, query, expected): | ||
"""Test the `get_values_for_target` task.""" | ||
result = db_connection.execute(text(query)) | ||
indexes = list(result.scalars().all()) | ||
values = i7.get_values_for_targets.fn(db_connection, level, indexes) | ||
assert len(values) == len(indexes) | ||
assert int(values["value"].sum()) == expected | ||
|
||
|
||
def test_task_get_values_for_target_unexpected_level(db_connection): | ||
"""Test the `get_values_for_target` task (unknown level).""" | ||
with pytest.raises(NotImplementedError, match="Unsupported level"): | ||
i7.get_values_for_targets.fn(db_connection, Level.NATIONAL, []) | ||
|
||
|
||
@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) | ||
def test_flow_i7_for_level(db_connection, level, query, targets, expected): | ||
"""Test the `i7_for_level` flow.""" | ||
indicators = i7.i7_for_level(level, TIMESPAN, chunk_size=1000) | ||
assert len(indicators) == db_connection.execute(text(query)).scalars().one() | ||
assert ( | ||
int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) | ||
== expected | ||
) | ||
|
||
|
||
@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) | ||
def test_flow_i7_for_level_with_various_chunk_sizes(chunk_size): | ||
"""Test the `i7_for_level` flow with various chunk sizes.""" | ||
level, query, targets, expected = PARAMETERS_FLOW[2] | ||
indicators = i7.i7_for_level(level, TIMESPAN, chunk_size=chunk_size) | ||
assert len(indicators) == N_DPTS | ||
assert ( | ||
int(indicators.loc[indicators["target"].isin(targets), "value"].sum()) | ||
== expected | ||
) | ||
|
||
|
||
def test_flow_i7_national(db_connection): | ||
"""Test the `i7_national` flow.""" | ||
result = db_connection.execute( | ||
text("SELECT sum(puissance_nominale) FROM pointdecharge") | ||
) | ||
expected = int(result.scalars().one()) | ||
indicators = i7.i7_national(TIMESPAN) | ||
assert int(indicators.at[0, "value"]) == expected | ||
|
||
|
||
def test_flow_i7_calculate(db_connection): | ||
"""Test the `calculate` flow.""" | ||
expected = N_NAT_REG_DPT_EPCI_CITY | ||
all_levels = [ | ||
Level.NATIONAL, | ||
Level.REGION, | ||
Level.DEPARTMENT, | ||
Level.CITY, | ||
Level.EPCI, | ||
] | ||
indicators = i7.calculate(TIMESPAN, all_levels, create_artifact=True) | ||
assert len(indicators) == expected |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
"""QualiCharge prefect indicators tests: infrastructure. | ||
T1: the number of publicly open points of charge by power level. | ||
""" | ||
|
||
from datetime import datetime | ||
|
||
import pytest # type: ignore | ||
from sqlalchemy import text | ||
|
||
from indicators.infrastructure import t1 # type: ignore | ||
from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore | ||
|
||
from ..param_tests import ( | ||
PARAM_FLOW, | ||
PARAM_VALUE, | ||
PARAMETERS_CHUNK, | ||
) | ||
|
||
# expected result | ||
N_LEVEL = [212, 2250, 1489, 8724] | ||
N_DPTS = 109 | ||
N_NAT_REG_DPT_EPCI_CITY = 36465 | ||
|
||
TIMESPAN = IndicatorTimeSpan(start=datetime.now(), period=IndicatorPeriod.DAY) | ||
PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] | ||
PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] | ||
|
||
|
||
@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) | ||
def test_task_get_values_for_target(db_connection, level, query, expected): | ||
"""Test the `get_values_for_target` task.""" | ||
result = db_connection.execute(text(query)) | ||
indexes = list(result.scalars().all()) | ||
poc_by_power = t1.get_values_for_targets.fn(db_connection, level, indexes) | ||
assert len(set(poc_by_power["level_id"])) == len(indexes) | ||
assert poc_by_power["value"].sum() == expected | ||
|
||
|
||
def test_task_get_values_for_target_unexpected_level(db_connection): | ||
"""Test the `get_values_for_target` task (unknown level).""" | ||
with pytest.raises(NotImplementedError, match="Unsupported level"): | ||
t1.get_values_for_targets.fn(db_connection, Level.NATIONAL, []) | ||
|
||
|
||
@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) | ||
def test_flow_t1_for_level(db_connection, level, query, targets, expected): | ||
"""Test the `t1_for_level` flow.""" | ||
indicators = t1.t1_for_level(level, TIMESPAN, chunk_size=1000) | ||
# assert len(indicators) == db_connection.execute(text(query)).scalars().one() | ||
assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected | ||
|
||
|
||
@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) | ||
def test_flow_t1_for_level_with_various_chunk_sizes(chunk_size): | ||
"""Test the `t1_for_level` flow with various chunk sizes.""" | ||
level, query, targets, expected = PARAMETERS_FLOW[2] | ||
indicators = t1.t1_for_level(level, TIMESPAN, chunk_size=chunk_size) | ||
assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected | ||
|
||
|
||
def test_flow_t1_national(db_connection): | ||
"""Test the `t1_national` flow.""" | ||
query = "SELECT COUNT(*) FROM PointDeCharge WHERE puissance_nominale::numeric >= 0" | ||
expected = db_connection.execute(text(query)).scalars().one() | ||
indicators = t1.t1_national(TIMESPAN) | ||
assert indicators["value"].sum() == expected | ||
|
||
|
||
def test_flow_t1_calculate(db_connection): | ||
"""Test the `calculate` flow.""" | ||
"""expected = sum( | ||
[ | ||
t1.t1_for_level(Level.CITY, TIMESPAN, chunk_size=1000)["value"].sum(), | ||
t1.t1_for_level(Level.EPCI, TIMESPAN, chunk_size=1000)["value"].sum(), | ||
t1.t1_for_level(Level.DEPARTMENT, TIMESPAN, chunk_size=1000)["value"].sum(), | ||
t1.t1_for_level(Level.REGION, TIMESPAN, chunk_size=1000)["value"].sum(), | ||
t1.t1_national(TIMESPAN)["value"].sum(), | ||
] | ||
)""" | ||
all_levels = [ | ||
Level.NATIONAL, | ||
Level.REGION, | ||
Level.DEPARTMENT, | ||
Level.CITY, | ||
Level.EPCI, | ||
] | ||
indicators = t1.calculate( | ||
TIMESPAN, all_levels, create_artifact=True, format_pd=True | ||
) | ||
# assert indicators["value"].sum() == expected | ||
assert list(indicators["level"].unique()) == all_levels |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
"""QualiCharge prefect indicators tests: usage. | ||
C1: Number of sessions by operator. | ||
""" | ||
|
||
from datetime import datetime | ||
|
||
import pytest # type: ignore | ||
from sqlalchemy import text | ||
|
||
from indicators.models import IndicatorPeriod, IndicatorTimeSpan, Level # type: ignore | ||
from indicators.usage import c1 # type: ignore | ||
|
||
from ..param_tests import ( | ||
PARAM_FLOW, | ||
PARAM_VALUE, | ||
PARAMETERS_CHUNK, | ||
) | ||
|
||
# expected result for level [city, epci, dpt, reg] | ||
N_LEVEL = [32, 307, 172, 1055] | ||
N_LEVEL_NATIONAL = 2718 | ||
|
||
TIMESPAN = IndicatorTimeSpan(start=datetime(2024, 12, 24), period=IndicatorPeriod.DAY) | ||
PARAMETERS_FLOW = [prm + (lvl,) for prm, lvl in zip(PARAM_FLOW, N_LEVEL, strict=True)] | ||
PARAMETERS_VALUE = [prm + (lvl,) for prm, lvl in zip(PARAM_VALUE, N_LEVEL, strict=True)] | ||
|
||
|
||
@pytest.mark.parametrize("level,query,expected", PARAMETERS_VALUE) | ||
def test_task_get_values_for_target(db_connection, level, query, expected): | ||
"""Test the `get_values_for_target` task.""" | ||
result = db_connection.execute(text(query)) | ||
indexes = list(result.scalars().all()) | ||
values = c1.get_values_for_targets.fn(db_connection, level, TIMESPAN, indexes) | ||
assert len(set(values["level_id"])) == len(indexes) | ||
assert values["value"].sum() == expected | ||
|
||
|
||
def test_task_get_values_for_target_unexpected_level(db_connection): | ||
"""Test the `get_values_for_target` task (unknown level).""" | ||
with pytest.raises(NotImplementedError, match="Unsupported level"): | ||
c1.get_values_for_targets.fn(db_connection, Level.NATIONAL, TIMESPAN, []) | ||
|
||
|
||
@pytest.mark.parametrize("level,query,targets,expected", PARAMETERS_FLOW) | ||
def test_flow_c1_for_level(db_connection, level, query, targets, expected): | ||
"""Test the `c1_for_level` flow.""" | ||
indicators = c1.c1_for_level(level, TIMESPAN, chunk_size=1000) | ||
# assert len(indicators) == db_connection.execute(text(query)).scalars().one() | ||
assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected | ||
|
||
|
||
@pytest.mark.parametrize("chunk_size", PARAMETERS_CHUNK) | ||
def test_flow_c1_for_level_with_various_chunk_sizes(chunk_size): | ||
"""Test the `c1_for_level` flow with various chunk sizes.""" | ||
level, query, targets, expected = PARAMETERS_FLOW[2] | ||
indicators = c1.c1_for_level(level, TIMESPAN, chunk_size=chunk_size) | ||
assert indicators.loc[indicators["target"].isin(targets), "value"].sum() == expected | ||
|
||
|
||
def test_flow_c1_national(db_connection): | ||
"""Test the `c1_national` flow.""" | ||
indicators = c1.c1_national(TIMESPAN) | ||
assert indicators["value"].sum() == N_LEVEL_NATIONAL | ||
|
||
|
||
def test_flow_c1_calculate(db_connection): | ||
"""Test the `calculate` flow.""" | ||
all_levels = [ | ||
Level.NATIONAL, | ||
Level.REGION, | ||
Level.DEPARTMENT, | ||
Level.CITY, | ||
Level.EPCI, | ||
] | ||
indicators = c1.calculate( | ||
TIMESPAN, all_levels, create_artifact=True, format_pd=True | ||
) | ||
assert list(indicators["level"].unique()) == all_levels | ||
|
||
|
||
# query used to get N_LEVEL | ||
N_LEVEL_NAT = """ | ||
SELECT | ||
count(*) AS value | ||
FROM | ||
SESSION | ||
INNER JOIN statique ON point_de_charge_id = pdc_id | ||
WHERE | ||
START >= timestamp '2024-12-24' | ||
AND START < timestamp '2024-12-25' | ||
""" | ||
N_LEVEL_3 = """ | ||
SELECT | ||
count(*) AS value | ||
FROM | ||
Session | ||
INNER JOIN statique ON point_de_charge_id = pdc_id | ||
LEFT JOIN City ON City.code = code_insee_commune | ||
INNER JOIN Department ON City.department_id = Department.id | ||
INNER JOIN Region ON Department.region_id = Region.id | ||
WHERE | ||
START >= timestamp '2024-12-24' | ||
AND START < timestamp '2024-12-25' | ||
AND region.code IN ('11', '84', '75') | ||
""" |
Oops, something went wrong.