From 0dba719989b622c1201955c055e48cf401be3907 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 21 Feb 2025 13:49:51 -0800 Subject: [PATCH 1/5] Add ExtractWindowingInfo transform. --- sdks/python/apache_beam/yaml/yaml_mapping.py | 93 +++++++++++++++++++ .../apache_beam/yaml/yaml_mapping_test.py | 64 +++++++++++++ 2 files changed, 157 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 54a585611dca..7a1bc2e71c69 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -25,6 +25,7 @@ from collections.abc import Mapping from typing import Any from typing import Optional +from typing import NamedTuple from typing import TypeVar from typing import Union @@ -41,6 +42,8 @@ from apache_beam.typehints.schemas import schema_from_element_type from apache_beam.typehints.schemas import typing_from_runner_api from apache_beam.utils import python_callable +from apache_beam.utils import windowed_value +from apache_beam.utils.timestamp import Timestamp from apache_beam.yaml import json_utils from apache_beam.yaml import options from apache_beam.yaml import yaml_provider @@ -842,6 +845,95 @@ def _AssignTimestamps( ).with_input_types(T).with_output_types(T) +class PaneInfoTuple(NamedTuple): + is_first: bool + is_last: bool + timing: str + index: int + nonspeculative_index: int + + @classmethod + def from_pane_info(cls, pane_info): + return cls( + pane_info.is_first, + pane_info.is_last, + windowed_value.PaneInfoTiming.to_string(pane_info.timing), + pane_info.index, + pane_info.nonspeculative_index) + + +_WINDOWING_INFO_TYPES = { + 'timestamp': Timestamp, + 'window_start': Optional[Timestamp], + 'window_end': Timestamp, + 'window_string': str, + 'window_type': str, + 'window_object': Any, + 'pane_info': PaneInfoTuple, +} +_WINDOWING_INFO_EXTRACTORS = { + 'timestamp': lambda locals: locals['timestamp'], + 'window_start': lambda locals: getattr(locals['window'], 'start', None), + 'window_end': lambda locals: locals['window'].end, + 'window_string': lambda locals: str(locals['window']), + 'window_type': lambda locals: type(locals['window']).__name__, + 'window_object': lambda locals: locals['window'], + 'pane_info': lambda locals: PaneInfoTuple.from_pane_info( + locals['pane_info']), +} +assert set(_WINDOWING_INFO_TYPES.keys()) == set( + _WINDOWING_INFO_EXTRACTORS.keys()) + + +@beam.ptransform.ptransform_fn +def _ExtractWindowingInfo(pcoll, fields: Optional[Mapping[str, str]] = None): + """ + Extracts the implicit windowing information from an element and makes it + explicit as field(s) in the element itself. + + The following windowing parameter values are supported: + + * `timestamp`: The event timestamp of the current element. + * `window_start`: The start of the window iff it is an interval window. + * `window_end`: The (exclusive) end of the window. + * `window_string`: The string representation of the window. + * `window_type`: The type of the window as a string. + * `winodw_object`: The actual window object itself, + as a Java or Python object. + * `pane_info`: A schema'd representation of the current pane info, including + its index, whether it was the last firing, etc. + + Args: + fields: A mapping of new field names to various windowing parameters, + as documente above. If omitted, defaults to + `[timestamp, window_start, window_end]`. + """ + existing_fields = named_fields_from_element_type(pcoll.element_type) + new_fields = [] + for field, value in fields.items(): + if value not in _WINDOWING_INFO_TYPES: + raise ValueError( + f'{value} is not a valid windowing parameter; ' + f'must be one of {list(_WINDOWING_INFO_TYPES.keys())}') + elif field in existing_fields: + raise ValueError(f'Input schema already has a field named {field}.') + else: + new_fields.append((field, _WINDOWING_INFO_TYPES[value])) + + def augment_row( + row, + timestamp=beam.DoFn.TimestampParam, + window=beam.DoFn.WindowParam, + pane_info=beam.DoFn.PaneInfoParam): + as_dict = row._asdict() + for field, value in fields.items(): + as_dict[field] = _WINDOWING_INFO_EXTRACTORS[value](locals()) + return beam.Row(**as_dict) + + return pcoll | beam.Map(augment_row).with_output_types( + row_type.RowTypeConstraint.from_fields(existing_fields + new_fields)) + + def create_mapping_providers(): # These are MetaInlineProviders because their expansion is in terms of other # YamlTransforms, but in a way that needs to be deferred until the input @@ -852,6 +944,7 @@ def create_mapping_providers(): 'AssignTimestamps-javascript': _AssignTimestamps, 'AssignTimestamps-generic': _AssignTimestamps, 'Explode': _Explode, + 'ExtractWindowingInfo': _ExtractWindowingInfo, 'Filter-python': _PyJsFilter, 'Filter-javascript': _PyJsFilter, 'Filter-generic': _PyJsFilter, diff --git a/sdks/python/apache_beam/yaml/yaml_mapping_test.py b/sdks/python/apache_beam/yaml/yaml_mapping_test.py index 2c5feec18278..bb0b1d43e1d7 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping_test.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping_test.py @@ -16,6 +16,7 @@ # import logging +import typing import unittest import numpy as np @@ -25,6 +26,8 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.typehints import schemas +from apache_beam.utils.timestamp import Timestamp +from apache_beam.yaml import yaml_mapping from apache_beam.yaml.yaml_transform import YamlTransform DATA = [ @@ -457,6 +460,67 @@ def test_append_type_inference(self): (('label', str), ('conductor', np.int64), ('rank', np.int64), ('new_label', str))) + def test_extract_windowing_info(self): + T = typing.TypeVar('T') + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + elements = ( + p + | beam.Create( + [beam.Row(value=1), beam.Row(value=2), beam.Row(value=11)]) + | beam.Map( + lambda x: beam.transforms.window.TimestampedValue( + x, timestamp=x.value)).with_input_types(T).with_output_types( + T) + | beam.WindowInto(beam.transforms.window.FixedWindows(10))) + result = elements | YamlTransform( + ''' + type: ExtractWindowingInfo + config: + fields: + timestamp: timestamp + window_start: window_start + window_end: window_end + window_string: window_string + window_type: window_type + window_object: window_object + pane_info_field: pane_info + ''') + assert_that( + result, + equal_to([ + beam.Row( + value=1, + timestamp=Timestamp(1), + window_start=Timestamp(0), + window_end=Timestamp(10), + window_string='[0.0, 10.0)', + window_type='IntervalWindow', + window_object=beam.transforms.window.IntervalWindow(0, 10), + pane_info_field=yaml_mapping.PaneInfoTuple( + True, True, 'UNKNOWN', 0, 0)), + beam.Row( + value=2, + timestamp=Timestamp(2), + window_start=Timestamp(0), + window_end=Timestamp(10), + window_string='[0.0, 10.0)', + window_type='IntervalWindow', + window_object=beam.transforms.window.IntervalWindow(0, 10), + pane_info_field=yaml_mapping.PaneInfoTuple( + True, True, 'UNKNOWN', 0, 0)), + beam.Row( + value=11, + timestamp=Timestamp(11), + window_start=Timestamp(10), + window_end=Timestamp(20), + window_string='[10.0, 20.0)', + window_type='IntervalWindow', + window_object=beam.transforms.window.IntervalWindow(10, 20), + pane_info_field=yaml_mapping.PaneInfoTuple( + True, True, 'UNKNOWN', 0, 0)), + ])) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 8534f2570c34795aafcaa4b04fcad861795a0a54 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 21 Feb 2025 13:56:35 -0800 Subject: [PATCH 2/5] Add default and iterable syntactic sugar. --- sdks/python/apache_beam/yaml/yaml_mapping.py | 12 ++++++++ .../apache_beam/yaml/yaml_mapping_test.py | 30 +++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 7a1bc2e71c69..f59a27034b34 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -903,11 +903,23 @@ def _ExtractWindowingInfo(pcoll, fields: Optional[Mapping[str, str]] = None): * `pane_info`: A schema'd representation of the current pane info, including its index, whether it was the last firing, etc. + As a convenience, a list rather than a mapping of fields may be provide, + in which case the fields will be named according to the requested values. + Args: fields: A mapping of new field names to various windowing parameters, as documente above. If omitted, defaults to `[timestamp, window_start, window_end]`. """ + if fields is None: + fields = ['timestamp', 'window_start', 'window_end'] + if not isinstance(fields, Mapping): + if isinstance(fields, Iterable) and not isinstance(fields, str): + fields = {fld: fld for fld in fields} + else: + raise TypeError( + 'Fields must be a mapping or iterable of strings, got {fields}') + existing_fields = named_fields_from_element_type(pcoll.element_type) new_fields = [] for field, value in fields.items(): diff --git a/sdks/python/apache_beam/yaml/yaml_mapping_test.py b/sdks/python/apache_beam/yaml/yaml_mapping_test.py index bb0b1d43e1d7..1054f73dc130 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping_test.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping_test.py @@ -521,6 +521,36 @@ def test_extract_windowing_info(self): True, True, 'UNKNOWN', 0, 0)), ])) + def test_extract_windowing_info_iterable(self): + T = typing.TypeVar('T') + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + elements = ( + p + | beam.Create( + [beam.Row(value=1), beam.Row(value=2), beam.Row(value=11)]) + | beam.Map( + lambda x: beam.transforms.window.TimestampedValue( + x, timestamp=x.value)).with_input_types(T).with_output_types( + T)) + result = elements | YamlTransform( + ''' + type: ExtractWindowingInfo + config: + fields: [timestamp, window_type] + ''') + assert_that( + result, + equal_to([ + beam.Row( + value=1, timestamp=Timestamp(1), window_type='GlobalWindow'), + beam.Row( + value=2, timestamp=Timestamp(2), window_type='GlobalWindow'), + beam.Row( + value=11, timestamp=Timestamp(11), + window_type='GlobalWindow'), + ])) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From b98e331b3ecd6ebe13225a41e63d6daac2b0eb5e Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 24 Feb 2025 17:00:29 -0800 Subject: [PATCH 3/5] Docstring cleanup. Co-authored-by: Danny McCormick --- sdks/python/apache_beam/yaml/yaml_mapping.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index f59a27034b34..08645773e225 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -886,7 +886,7 @@ def from_pane_info(cls, pane_info): @beam.ptransform.ptransform_fn -def _ExtractWindowingInfo(pcoll, fields: Optional[Mapping[str, str]] = None): +def _ExtractWindowingInfo(pcoll, fields: Optional[Union[Mapping[str, str], Iterable[str]]] = None): """ Extracts the implicit windowing information from an element and makes it explicit as field(s) in the element itself. @@ -903,12 +903,12 @@ def _ExtractWindowingInfo(pcoll, fields: Optional[Mapping[str, str]] = None): * `pane_info`: A schema'd representation of the current pane info, including its index, whether it was the last firing, etc. - As a convenience, a list rather than a mapping of fields may be provide, + As a convenience, a list rather than a mapping of fields may be provided, in which case the fields will be named according to the requested values. Args: fields: A mapping of new field names to various windowing parameters, - as documente above. If omitted, defaults to + as documented above. If omitted, defaults to `[timestamp, window_start, window_end]`. """ if fields is None: From 911a3becdf421dec2c9c8ca1b0cf656c86aea8fe Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 24 Feb 2025 17:11:24 -0800 Subject: [PATCH 4/5] Make mypy happy. --- sdks/python/apache_beam/yaml/yaml_mapping.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 08645773e225..7ffee5c2039b 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -24,8 +24,8 @@ from collections.abc import Iterable from collections.abc import Mapping from typing import Any -from typing import Optional from typing import NamedTuple +from typing import Optional from typing import TypeVar from typing import Union @@ -849,7 +849,7 @@ class PaneInfoTuple(NamedTuple): is_first: bool is_last: bool timing: str - index: int + index: int # type: ignore[assignment] nonspeculative_index: int @classmethod @@ -886,7 +886,8 @@ def from_pane_info(cls, pane_info): @beam.ptransform.ptransform_fn -def _ExtractWindowingInfo(pcoll, fields: Optional[Union[Mapping[str, str], Iterable[str]]] = None): +def _ExtractWindowingInfo( + pcoll, fields: Optional[Union[Mapping[str, str], Iterable[str]]] = None): """ Extracts the implicit windowing information from an element and makes it explicit as field(s) in the element itself. @@ -943,7 +944,8 @@ def augment_row( return beam.Row(**as_dict) return pcoll | beam.Map(augment_row).with_output_types( - row_type.RowTypeConstraint.from_fields(existing_fields + new_fields)) + row_type.RowTypeConstraint.from_fields( + existing_fields + new_fields)) # type: ignore[operator] def create_mapping_providers(): From 86795a0a94db5f0ceccbaf25ed6f89eeca05edac Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 24 Feb 2025 17:32:53 -0800 Subject: [PATCH 5/5] Add a yaml windowing info extraction example. --- .../aggregation/combine_sum_windowed.yaml | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/examples/transforms/aggregation/combine_sum_windowed.yaml diff --git a/sdks/python/apache_beam/yaml/examples/transforms/aggregation/combine_sum_windowed.yaml b/sdks/python/apache_beam/yaml/examples/transforms/aggregation/combine_sum_windowed.yaml new file mode 100644 index 000000000000..fb3bef248612 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/aggregation/combine_sum_windowed.yaml @@ -0,0 +1,92 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This is an example that illustrates how to use session windows and +# then extract windowing information for further processing. + +pipeline: + type: chain + transforms: + # Create some fake data. + - type: Create + name: CreateVisits + config: + elements: + - user: alice + timestamp: 1 + - user: alice + timestamp: 3 + - user: bob + timestamp: 7 + - user: bob + timestamp: 12 + - user: bob + timestamp: 20 + - user: alice + timestamp: 101 + - user: alice + timestamp: 109 + - user: alice + timestamp: 115 + + # Use the timestamp field as the element timestamp. + # (Typically this would be assigned by the source.) + - type: AssignTimestamps + config: + timestamp: timestamp + + # Group the data by user for each session window count the number of events + # in each per session. + # See https://beam.apache.org/documentation/programming-guide/#session-windows + - type: Combine + name: SumVisitsPerUser + config: + language: python + group_by: user + combine: + visits: + value: user + fn: count + windowing: + type: sessions + gap: 10s + + # Extract the implicit Beam windowing data (including what the final + # merged session values were) into explicit fields of our rows. + - type: ExtractWindowingInfo + config: + fields: [window_start, window_end, window_string] + + # Drop "short" sessions (in this case, Alice's first two visits.) + - type: Filter + config: + language: python + keep: window_end - window_start > 15 + + # Only keep a couple of fields. + - type: MapToFields + config: + fields: + user: user + window_string: window_string + + - type: LogForTesting + +# Expected: +# Row(user='bob', window_string='[7.0, 30.0)') +# Row(user='alice', window_string='[101.0, 125.0)')