diff --git a/sdks/python/apache_beam/yaml/examples/transforms/aggregation/combine_sum_windowed.yaml b/sdks/python/apache_beam/yaml/examples/transforms/aggregation/combine_sum_windowed.yaml new file mode 100644 index 000000000000..fb3bef248612 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/aggregation/combine_sum_windowed.yaml @@ -0,0 +1,92 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This is an example that illustrates how to use session windows and +# then extract windowing information for further processing. + +pipeline: + type: chain + transforms: + # Create some fake data. + - type: Create + name: CreateVisits + config: + elements: + - user: alice + timestamp: 1 + - user: alice + timestamp: 3 + - user: bob + timestamp: 7 + - user: bob + timestamp: 12 + - user: bob + timestamp: 20 + - user: alice + timestamp: 101 + - user: alice + timestamp: 109 + - user: alice + timestamp: 115 + + # Use the timestamp field as the element timestamp. + # (Typically this would be assigned by the source.) + - type: AssignTimestamps + config: + timestamp: timestamp + + # Group the data by user for each session window count the number of events + # in each per session. + # See https://beam.apache.org/documentation/programming-guide/#session-windows + - type: Combine + name: SumVisitsPerUser + config: + language: python + group_by: user + combine: + visits: + value: user + fn: count + windowing: + type: sessions + gap: 10s + + # Extract the implicit Beam windowing data (including what the final + # merged session values were) into explicit fields of our rows. + - type: ExtractWindowingInfo + config: + fields: [window_start, window_end, window_string] + + # Drop "short" sessions (in this case, Alice's first two visits.) + - type: Filter + config: + language: python + keep: window_end - window_start > 15 + + # Only keep a couple of fields. + - type: MapToFields + config: + fields: + user: user + window_string: window_string + + - type: LogForTesting + +# Expected: +# Row(user='bob', window_string='[7.0, 30.0)') +# Row(user='alice', window_string='[101.0, 125.0)') diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 54a585611dca..7ffee5c2039b 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -24,6 +24,7 @@ from collections.abc import Iterable from collections.abc import Mapping from typing import Any +from typing import NamedTuple from typing import Optional from typing import TypeVar from typing import Union @@ -41,6 +42,8 @@ from apache_beam.typehints.schemas import schema_from_element_type from apache_beam.typehints.schemas import typing_from_runner_api from apache_beam.utils import python_callable +from apache_beam.utils import windowed_value +from apache_beam.utils.timestamp import Timestamp from apache_beam.yaml import json_utils from apache_beam.yaml import options from apache_beam.yaml import yaml_provider @@ -842,6 +845,109 @@ def _AssignTimestamps( ).with_input_types(T).with_output_types(T) +class PaneInfoTuple(NamedTuple): + is_first: bool + is_last: bool + timing: str + index: int # type: ignore[assignment] + nonspeculative_index: int + + @classmethod + def from_pane_info(cls, pane_info): + return cls( + pane_info.is_first, + pane_info.is_last, + windowed_value.PaneInfoTiming.to_string(pane_info.timing), + pane_info.index, + pane_info.nonspeculative_index) + + +_WINDOWING_INFO_TYPES = { + 'timestamp': Timestamp, + 'window_start': Optional[Timestamp], + 'window_end': Timestamp, + 'window_string': str, + 'window_type': str, + 'window_object': Any, + 'pane_info': PaneInfoTuple, +} +_WINDOWING_INFO_EXTRACTORS = { + 'timestamp': lambda locals: locals['timestamp'], + 'window_start': lambda locals: getattr(locals['window'], 'start', None), + 'window_end': lambda locals: locals['window'].end, + 'window_string': lambda locals: str(locals['window']), + 'window_type': lambda locals: type(locals['window']).__name__, + 'window_object': lambda locals: locals['window'], + 'pane_info': lambda locals: PaneInfoTuple.from_pane_info( + locals['pane_info']), +} +assert set(_WINDOWING_INFO_TYPES.keys()) == set( + _WINDOWING_INFO_EXTRACTORS.keys()) + + +@beam.ptransform.ptransform_fn +def _ExtractWindowingInfo( + pcoll, fields: Optional[Union[Mapping[str, str], Iterable[str]]] = None): + """ + Extracts the implicit windowing information from an element and makes it + explicit as field(s) in the element itself. + + The following windowing parameter values are supported: + + * `timestamp`: The event timestamp of the current element. + * `window_start`: The start of the window iff it is an interval window. + * `window_end`: The (exclusive) end of the window. + * `window_string`: The string representation of the window. + * `window_type`: The type of the window as a string. + * `winodw_object`: The actual window object itself, + as a Java or Python object. + * `pane_info`: A schema'd representation of the current pane info, including + its index, whether it was the last firing, etc. + + As a convenience, a list rather than a mapping of fields may be provided, + in which case the fields will be named according to the requested values. + + Args: + fields: A mapping of new field names to various windowing parameters, + as documented above. If omitted, defaults to + `[timestamp, window_start, window_end]`. + """ + if fields is None: + fields = ['timestamp', 'window_start', 'window_end'] + if not isinstance(fields, Mapping): + if isinstance(fields, Iterable) and not isinstance(fields, str): + fields = {fld: fld for fld in fields} + else: + raise TypeError( + 'Fields must be a mapping or iterable of strings, got {fields}') + + existing_fields = named_fields_from_element_type(pcoll.element_type) + new_fields = [] + for field, value in fields.items(): + if value not in _WINDOWING_INFO_TYPES: + raise ValueError( + f'{value} is not a valid windowing parameter; ' + f'must be one of {list(_WINDOWING_INFO_TYPES.keys())}') + elif field in existing_fields: + raise ValueError(f'Input schema already has a field named {field}.') + else: + new_fields.append((field, _WINDOWING_INFO_TYPES[value])) + + def augment_row( + row, + timestamp=beam.DoFn.TimestampParam, + window=beam.DoFn.WindowParam, + pane_info=beam.DoFn.PaneInfoParam): + as_dict = row._asdict() + for field, value in fields.items(): + as_dict[field] = _WINDOWING_INFO_EXTRACTORS[value](locals()) + return beam.Row(**as_dict) + + return pcoll | beam.Map(augment_row).with_output_types( + row_type.RowTypeConstraint.from_fields( + existing_fields + new_fields)) # type: ignore[operator] + + def create_mapping_providers(): # These are MetaInlineProviders because their expansion is in terms of other # YamlTransforms, but in a way that needs to be deferred until the input @@ -852,6 +958,7 @@ def create_mapping_providers(): 'AssignTimestamps-javascript': _AssignTimestamps, 'AssignTimestamps-generic': _AssignTimestamps, 'Explode': _Explode, + 'ExtractWindowingInfo': _ExtractWindowingInfo, 'Filter-python': _PyJsFilter, 'Filter-javascript': _PyJsFilter, 'Filter-generic': _PyJsFilter, diff --git a/sdks/python/apache_beam/yaml/yaml_mapping_test.py b/sdks/python/apache_beam/yaml/yaml_mapping_test.py index 2c5feec18278..1054f73dc130 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping_test.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping_test.py @@ -16,6 +16,7 @@ # import logging +import typing import unittest import numpy as np @@ -25,6 +26,8 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.typehints import schemas +from apache_beam.utils.timestamp import Timestamp +from apache_beam.yaml import yaml_mapping from apache_beam.yaml.yaml_transform import YamlTransform DATA = [ @@ -457,6 +460,97 @@ def test_append_type_inference(self): (('label', str), ('conductor', np.int64), ('rank', np.int64), ('new_label', str))) + def test_extract_windowing_info(self): + T = typing.TypeVar('T') + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + elements = ( + p + | beam.Create( + [beam.Row(value=1), beam.Row(value=2), beam.Row(value=11)]) + | beam.Map( + lambda x: beam.transforms.window.TimestampedValue( + x, timestamp=x.value)).with_input_types(T).with_output_types( + T) + | beam.WindowInto(beam.transforms.window.FixedWindows(10))) + result = elements | YamlTransform( + ''' + type: ExtractWindowingInfo + config: + fields: + timestamp: timestamp + window_start: window_start + window_end: window_end + window_string: window_string + window_type: window_type + window_object: window_object + pane_info_field: pane_info + ''') + assert_that( + result, + equal_to([ + beam.Row( + value=1, + timestamp=Timestamp(1), + window_start=Timestamp(0), + window_end=Timestamp(10), + window_string='[0.0, 10.0)', + window_type='IntervalWindow', + window_object=beam.transforms.window.IntervalWindow(0, 10), + pane_info_field=yaml_mapping.PaneInfoTuple( + True, True, 'UNKNOWN', 0, 0)), + beam.Row( + value=2, + timestamp=Timestamp(2), + window_start=Timestamp(0), + window_end=Timestamp(10), + window_string='[0.0, 10.0)', + window_type='IntervalWindow', + window_object=beam.transforms.window.IntervalWindow(0, 10), + pane_info_field=yaml_mapping.PaneInfoTuple( + True, True, 'UNKNOWN', 0, 0)), + beam.Row( + value=11, + timestamp=Timestamp(11), + window_start=Timestamp(10), + window_end=Timestamp(20), + window_string='[10.0, 20.0)', + window_type='IntervalWindow', + window_object=beam.transforms.window.IntervalWindow(10, 20), + pane_info_field=yaml_mapping.PaneInfoTuple( + True, True, 'UNKNOWN', 0, 0)), + ])) + + def test_extract_windowing_info_iterable(self): + T = typing.TypeVar('T') + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + elements = ( + p + | beam.Create( + [beam.Row(value=1), beam.Row(value=2), beam.Row(value=11)]) + | beam.Map( + lambda x: beam.transforms.window.TimestampedValue( + x, timestamp=x.value)).with_input_types(T).with_output_types( + T)) + result = elements | YamlTransform( + ''' + type: ExtractWindowingInfo + config: + fields: [timestamp, window_type] + ''') + assert_that( + result, + equal_to([ + beam.Row( + value=1, timestamp=Timestamp(1), window_type='GlobalWindow'), + beam.Row( + value=2, timestamp=Timestamp(2), window_type='GlobalWindow'), + beam.Row( + value=11, timestamp=Timestamp(11), + window_type='GlobalWindow'), + ])) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)