From 0dba719989b622c1201955c055e48cf401be3907 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 21 Feb 2025 13:49:51 -0800 Subject: [PATCH 1/2] Add ExtractWindowingInfo transform. --- sdks/python/apache_beam/yaml/yaml_mapping.py | 93 +++++++++++++++++++ .../apache_beam/yaml/yaml_mapping_test.py | 64 +++++++++++++ 2 files changed, 157 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 54a585611dca..7a1bc2e71c69 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -25,6 +25,7 @@ from collections.abc import Mapping from typing import Any from typing import Optional +from typing import NamedTuple from typing import TypeVar from typing import Union @@ -41,6 +42,8 @@ from apache_beam.typehints.schemas import schema_from_element_type from apache_beam.typehints.schemas import typing_from_runner_api from apache_beam.utils import python_callable +from apache_beam.utils import windowed_value +from apache_beam.utils.timestamp import Timestamp from apache_beam.yaml import json_utils from apache_beam.yaml import options from apache_beam.yaml import yaml_provider @@ -842,6 +845,95 @@ def _AssignTimestamps( ).with_input_types(T).with_output_types(T) +class PaneInfoTuple(NamedTuple): + is_first: bool + is_last: bool + timing: str + index: int + nonspeculative_index: int + + @classmethod + def from_pane_info(cls, pane_info): + return cls( + pane_info.is_first, + pane_info.is_last, + windowed_value.PaneInfoTiming.to_string(pane_info.timing), + pane_info.index, + pane_info.nonspeculative_index) + + +_WINDOWING_INFO_TYPES = { + 'timestamp': Timestamp, + 'window_start': Optional[Timestamp], + 'window_end': Timestamp, + 'window_string': str, + 'window_type': str, + 'window_object': Any, + 'pane_info': PaneInfoTuple, +} +_WINDOWING_INFO_EXTRACTORS = { + 'timestamp': lambda locals: locals['timestamp'], + 'window_start': lambda locals: getattr(locals['window'], 'start', None), + 'window_end': lambda locals: locals['window'].end, + 'window_string': lambda locals: str(locals['window']), + 'window_type': lambda locals: type(locals['window']).__name__, + 'window_object': lambda locals: locals['window'], + 'pane_info': lambda locals: PaneInfoTuple.from_pane_info( + locals['pane_info']), +} +assert set(_WINDOWING_INFO_TYPES.keys()) == set( + _WINDOWING_INFO_EXTRACTORS.keys()) + + +@beam.ptransform.ptransform_fn +def _ExtractWindowingInfo(pcoll, fields: Optional[Mapping[str, str]] = None): + """ + Extracts the implicit windowing information from an element and makes it + explicit as field(s) in the element itself. + + The following windowing parameter values are supported: + + * `timestamp`: The event timestamp of the current element. + * `window_start`: The start of the window iff it is an interval window. + * `window_end`: The (exclusive) end of the window. + * `window_string`: The string representation of the window. + * `window_type`: The type of the window as a string. + * `winodw_object`: The actual window object itself, + as a Java or Python object. + * `pane_info`: A schema'd representation of the current pane info, including + its index, whether it was the last firing, etc. + + Args: + fields: A mapping of new field names to various windowing parameters, + as documente above. If omitted, defaults to + `[timestamp, window_start, window_end]`. + """ + existing_fields = named_fields_from_element_type(pcoll.element_type) + new_fields = [] + for field, value in fields.items(): + if value not in _WINDOWING_INFO_TYPES: + raise ValueError( + f'{value} is not a valid windowing parameter; ' + f'must be one of {list(_WINDOWING_INFO_TYPES.keys())}') + elif field in existing_fields: + raise ValueError(f'Input schema already has a field named {field}.') + else: + new_fields.append((field, _WINDOWING_INFO_TYPES[value])) + + def augment_row( + row, + timestamp=beam.DoFn.TimestampParam, + window=beam.DoFn.WindowParam, + pane_info=beam.DoFn.PaneInfoParam): + as_dict = row._asdict() + for field, value in fields.items(): + as_dict[field] = _WINDOWING_INFO_EXTRACTORS[value](locals()) + return beam.Row(**as_dict) + + return pcoll | beam.Map(augment_row).with_output_types( + row_type.RowTypeConstraint.from_fields(existing_fields + new_fields)) + + def create_mapping_providers(): # These are MetaInlineProviders because their expansion is in terms of other # YamlTransforms, but in a way that needs to be deferred until the input @@ -852,6 +944,7 @@ def create_mapping_providers(): 'AssignTimestamps-javascript': _AssignTimestamps, 'AssignTimestamps-generic': _AssignTimestamps, 'Explode': _Explode, + 'ExtractWindowingInfo': _ExtractWindowingInfo, 'Filter-python': _PyJsFilter, 'Filter-javascript': _PyJsFilter, 'Filter-generic': _PyJsFilter, diff --git a/sdks/python/apache_beam/yaml/yaml_mapping_test.py b/sdks/python/apache_beam/yaml/yaml_mapping_test.py index 2c5feec18278..bb0b1d43e1d7 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping_test.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping_test.py @@ -16,6 +16,7 @@ # import logging +import typing import unittest import numpy as np @@ -25,6 +26,8 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.typehints import schemas +from apache_beam.utils.timestamp import Timestamp +from apache_beam.yaml import yaml_mapping from apache_beam.yaml.yaml_transform import YamlTransform DATA = [ @@ -457,6 +460,67 @@ def test_append_type_inference(self): (('label', str), ('conductor', np.int64), ('rank', np.int64), ('new_label', str))) + def test_extract_windowing_info(self): + T = typing.TypeVar('T') + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + elements = ( + p + | beam.Create( + [beam.Row(value=1), beam.Row(value=2), beam.Row(value=11)]) + | beam.Map( + lambda x: beam.transforms.window.TimestampedValue( + x, timestamp=x.value)).with_input_types(T).with_output_types( + T) + | beam.WindowInto(beam.transforms.window.FixedWindows(10))) + result = elements | YamlTransform( + ''' + type: ExtractWindowingInfo + config: + fields: + timestamp: timestamp + window_start: window_start + window_end: window_end + window_string: window_string + window_type: window_type + window_object: window_object + pane_info_field: pane_info + ''') + assert_that( + result, + equal_to([ + beam.Row( + value=1, + timestamp=Timestamp(1), + window_start=Timestamp(0), + window_end=Timestamp(10), + window_string='[0.0, 10.0)', + window_type='IntervalWindow', + window_object=beam.transforms.window.IntervalWindow(0, 10), + pane_info_field=yaml_mapping.PaneInfoTuple( + True, True, 'UNKNOWN', 0, 0)), + beam.Row( + value=2, + timestamp=Timestamp(2), + window_start=Timestamp(0), + window_end=Timestamp(10), + window_string='[0.0, 10.0)', + window_type='IntervalWindow', + window_object=beam.transforms.window.IntervalWindow(0, 10), + pane_info_field=yaml_mapping.PaneInfoTuple( + True, True, 'UNKNOWN', 0, 0)), + beam.Row( + value=11, + timestamp=Timestamp(11), + window_start=Timestamp(10), + window_end=Timestamp(20), + window_string='[10.0, 20.0)', + window_type='IntervalWindow', + window_object=beam.transforms.window.IntervalWindow(10, 20), + pane_info_field=yaml_mapping.PaneInfoTuple( + True, True, 'UNKNOWN', 0, 0)), + ])) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 8534f2570c34795aafcaa4b04fcad861795a0a54 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 21 Feb 2025 13:56:35 -0800 Subject: [PATCH 2/2] Add default and iterable syntactic sugar. --- sdks/python/apache_beam/yaml/yaml_mapping.py | 12 ++++++++ .../apache_beam/yaml/yaml_mapping_test.py | 30 +++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 7a1bc2e71c69..f59a27034b34 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -903,11 +903,23 @@ def _ExtractWindowingInfo(pcoll, fields: Optional[Mapping[str, str]] = None): * `pane_info`: A schema'd representation of the current pane info, including its index, whether it was the last firing, etc. + As a convenience, a list rather than a mapping of fields may be provide, + in which case the fields will be named according to the requested values. + Args: fields: A mapping of new field names to various windowing parameters, as documente above. If omitted, defaults to `[timestamp, window_start, window_end]`. """ + if fields is None: + fields = ['timestamp', 'window_start', 'window_end'] + if not isinstance(fields, Mapping): + if isinstance(fields, Iterable) and not isinstance(fields, str): + fields = {fld: fld for fld in fields} + else: + raise TypeError( + 'Fields must be a mapping or iterable of strings, got {fields}') + existing_fields = named_fields_from_element_type(pcoll.element_type) new_fields = [] for field, value in fields.items(): diff --git a/sdks/python/apache_beam/yaml/yaml_mapping_test.py b/sdks/python/apache_beam/yaml/yaml_mapping_test.py index bb0b1d43e1d7..1054f73dc130 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping_test.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping_test.py @@ -521,6 +521,36 @@ def test_extract_windowing_info(self): True, True, 'UNKNOWN', 0, 0)), ])) + def test_extract_windowing_info_iterable(self): + T = typing.TypeVar('T') + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + elements = ( + p + | beam.Create( + [beam.Row(value=1), beam.Row(value=2), beam.Row(value=11)]) + | beam.Map( + lambda x: beam.transforms.window.TimestampedValue( + x, timestamp=x.value)).with_input_types(T).with_output_types( + T)) + result = elements | YamlTransform( + ''' + type: ExtractWindowingInfo + config: + fields: [timestamp, window_type] + ''') + assert_that( + result, + equal_to([ + beam.Row( + value=1, timestamp=Timestamp(1), window_type='GlobalWindow'), + beam.Row( + value=2, timestamp=Timestamp(2), window_type='GlobalWindow'), + beam.Row( + value=11, timestamp=Timestamp(11), + window_type='GlobalWindow'), + ])) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)