Skip to content

Commit

Permalink
feat: update date generator to support backfills (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
ceholden authored Jan 30, 2025
1 parent f02c52e commit 129b902
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 40 deletions.
6 changes: 4 additions & 2 deletions integration_tests/test_link_fetching.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def test_that_link_fetching_invocation_executes_correctly(
queue_url: str,
):
execution_arn = step_function_client.start_execution(
stateMachineArn=step_function_arn, input=json.dumps({})
stateMachineArn=step_function_arn,
input=json.dumps({"platforms": ["S2A", "S2B"]}),
)["executionArn"]

polling2.poll(
Expand Down Expand Up @@ -88,7 +89,8 @@ def test_that_link_fetching_invocation_executes_correctly_when_a_duplicate_granu
db_session.commit()

execution_arn = step_function_client.start_execution(
stateMachineArn=step_function_arn, input=json.dumps({})
stateMachineArn=step_function_arn,
input=json.dumps({"platforms": ["S2A", "S2B"]}),
)["executionArn"]

polling2.poll(
Expand Down
38 changes: 34 additions & 4 deletions lambdas/date_generator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,37 @@

![Date Generator in S2 Downloader diagram](../../images/hls-s2-downloader-date-generator.png)

The Date Generators purpose is to generate a list of strings in the form `YYYY-MM-DD` for 21 days from `today - 1` into the past. It is invoked within the `Link Fetching` Step Function.
The Date Generator's purpose is to generate a list of date strings in the form `YYYY-MM-DD` for 5 days from `today - 1` into the past along with the satellite platforms you want to download (S2A, S2B, S2C, etc).
This component is the "brains of the operation" and instructs other components about what data should be downloaded.
It is invoked within the `Link Fetching` Step Function on daily schedule for standard forward processing operations.
The output of this function looks like,

```json
{
"query_dates_platform": [
["2025-01-28", "S2B"],
["2025-01-28", "S2C"],
["2025-01-27", "S2B"],
["2025-01-27", "S2C"],
["2025-01-26", "S2B"],
["2025-01-26", "S2C"],
["2025-01-25", "S2B"],
["2025-01-25", "S2C"],
["2025-01-24", "S2B"],
["2025-01-24", "S2C"],
]
}
```

It is also possible to invoke this function with specific parameters for backfilling missing data. For example you can invoke it for a specific time period and set of Sentinel-2 platforms by passing a payload into the StepFunction invocation,

```json
{
"now": "2025-01-22",
"lookback_days": 2,
"platforms": ["S2B", "S2C"]
}
```

---

Expand All @@ -14,14 +44,14 @@ Provided below is some pseudo-code to explain the process happening each time th

```python
yesterdays_date = get_yesterdays_date()
return generate_list_of_21_dates_into_the_past_from(yesterdays_date)
return generate_list_of_5_dates_into_the_past_from(yesterdays_date)
```

---

## Development

This Lambda makes use of `pipenv` for managing depedencies and for building the function when deploying it.
This Lambda makes use of `pipenv` for managing dependencies and for building the function when deploying it.

To get setup for developing this project, run:

Expand All @@ -47,7 +77,7 @@ A `Makefile` is provided to abstract commonly used commands away:
**`make format`**

> This will peform a run of `isort` and `black`, this **will** modify files if issues were found
> This will perform a run of `isort` and `black`, this **will** modify files if issues were found
**`make test`**

Expand Down
71 changes: 37 additions & 34 deletions lambdas/date_generator/handler.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,47 @@
import os
from collections.abc import Callable, Sequence
from collections.abc import Sequence
from datetime import datetime, timedelta
from itertools import product
from typing import TypedDict, Unpack
from typing import TypedDict


class HandlerKwargs(TypedDict, total=False):
class Event(TypedDict, total=False):
"""Input event payload
These inputs are not required but can be provided to override
behavior for unit testing or backfills. Defaults are set inside
of the handler function.
"""

platforms: Sequence[str]
now: Callable[[], datetime]
now: str
lookback_days: int


DATE_FORMAT_YMD = "%Y-%m-%d"
DEFAULT_LOOKBACK_DAYS = 5


def handler(
_event,
event: Event,
_context,
**kwargs: Unpack[HandlerKwargs],
):
"""
Return a `dict` with the single key `'query_dates_platforms'` mapped to a list of
2-tuples of the form `(date, platform)` produced from the cross-product of dates
given by `get_dates` (for `kwargs['lookback_days']` number of days _prior_ to the
date given by `kwargs['now']()`) and `kwargs['platforms']`.
given by `get_dates` (for `event['lookback_days']` number of days _prior_ to the
date given by `event['now']`) and `event['platforms']`.
NOTE: AWS will never pass kwargs. They are for unit testing purposes only, but
since they are all optional, AWS can safely call this function with none of them.
NOTE: Our StepFunction will never pass these kwargs in the payload by default.
They are for backfill and unit testing purposes only, but since they are all
optional, our daily scheduled StepFunction can safely call this function with none
of them.
Raises
------
KeyError: if the `platforms` kwarg is not specified and the environment variable
`PLATFORMS` is not defined
ValueError: if the `platforms` kwarg is an empty sequence, or it is not specified
KeyError: if `platforms` is not specified in the handler payload and the environment
variable `PLATFORMS` is not defined
ValueError: if the `platforms` input is an empty sequence, or it is not specified
and the `PLATFORMS` environment variable is set to a value that is either
empty, only whitespace, or a combination of commas and whitespace
Expand All @@ -42,36 +51,30 @@ def handler(
number of platforms, regardless of the current date:
>>> platforms = ("S2A", "S2B", "S2C")
>>> combos = handler(None, None, platforms=platforms)["query_dates_platforms"]
>>> combos = handler({"platforms": platforms}, None)["query_dates_platforms"]
>>> len(combos) == DEFAULT_LOOKBACK_DAYS * len(platforms)
True
For a known date and number of lookback days, we can enumerate the exact combos:
>>> handler( # doctest: +NORMALIZE_WHITESPACE
... {"platforms": platforms, "now": "2024-03-02", "lookback_days": 3},
... None,
... None,
... platforms=platforms,
... now=lambda: datetime(2024, 3, 2),
... lookback_days=3
... )
{'query_dates_platforms':
[('2024-03-01', 'S2A'), ('2024-03-01', 'S2B'), ('2024-03-01', 'S2C'),
('2024-02-29', 'S2A'), ('2024-02-29', 'S2B'), ('2024-02-29', 'S2C'),
('2024-02-28', 'S2A'), ('2024-02-28', 'S2B'), ('2024-02-28', 'S2C')]}
"""

default_kwargs: HandlerKwargs = {
"now": datetime.now,
"lookback_days": DEFAULT_LOOKBACK_DAYS,
}
kwargs = default_kwargs | kwargs

# We want to fail if neither platforms is supplied as a kwarg (during testing) nor
# PLATFORMS is defined as a (non-empty) environment variable.
platforms = kwargs.get("platforms") or parse_platforms(os.environ["PLATFORMS"])
now = kwargs["now"]
lookback_days = kwargs["lookback_days"]
platforms = event.get("platforms") or parse_platforms(os.environ["PLATFORMS"])
# By default "now" should be today to support cron usage, but allow overrides
# for backfill jobs
now = datetime.strptime(
event.get("now", datetime.now().strftime(DATE_FORMAT_YMD)), DATE_FORMAT_YMD
)
lookback_days = event.get("lookback_days", DEFAULT_LOOKBACK_DAYS)

return {
"query_dates_platforms": list(product(get_dates(now, lookback_days), platforms))
Expand Down Expand Up @@ -127,25 +130,25 @@ def parse_platforms(platforms: str) -> Sequence[str]:
return result


def get_dates(now: Callable[[], datetime], lookback_days: int) -> Sequence[str]:
def get_dates(now: datetime, lookback_days: int) -> Sequence[str]:
"""
Return one date string per day for `lookback_days` number of days, in reverse
chronological order, starting from the day before `now()` and formatted as
chronological order, starting from the day before `now` and formatted as
`%Y-%m-%d`.
Examples
--------
>>> len(get_dates(datetime.now, 10)) == 10
>>> len(get_dates(datetime.now(), 10)) == 10
True
>>> get_dates(lambda: datetime(2025, 1, 3), 3)
>>> get_dates(datetime(2025, 1, 3), 3)
['2025-01-02', '2025-01-01', '2024-12-31']
:returns: string dates (`%Y-%m-%d`) looking back the number of days given by
`lookback_days` in reverse chronological order starting from the day before
`now()`
"""
yesterdays_date = now().date() - timedelta(days=1)
yesterdays_date = now.date() - timedelta(days=1)
return [
(yesterdays_date - timedelta(days=day)).strftime("%Y-%m-%d")
(yesterdays_date - timedelta(days=day)).strftime(DATE_FORMAT_YMD)
for day in range(lookback_days)
]

0 comments on commit 129b902

Please sign in to comment.