Skip to content

Commit

Permalink
Merge pull request apache#34051 Add an ExtractWindowingInfo transform.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Feb 25, 2025
2 parents 1d2b9c5 + 86795a0 commit 82d3d8d
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This is an example that illustrates how to use session windows and
# then extract windowing information for further processing.

pipeline:
type: chain
transforms:
# Create some fake data.
- type: Create
name: CreateVisits
config:
elements:
- user: alice
timestamp: 1
- user: alice
timestamp: 3
- user: bob
timestamp: 7
- user: bob
timestamp: 12
- user: bob
timestamp: 20
- user: alice
timestamp: 101
- user: alice
timestamp: 109
- user: alice
timestamp: 115

# Use the timestamp field as the element timestamp.
# (Typically this would be assigned by the source.)
- type: AssignTimestamps
config:
timestamp: timestamp

# Group the data by user for each session window count the number of events
# in each per session.
# See https://beam.apache.org/documentation/programming-guide/#session-windows
- type: Combine
name: SumVisitsPerUser
config:
language: python
group_by: user
combine:
visits:
value: user
fn: count
windowing:
type: sessions
gap: 10s

# Extract the implicit Beam windowing data (including what the final
# merged session values were) into explicit fields of our rows.
- type: ExtractWindowingInfo
config:
fields: [window_start, window_end, window_string]

# Drop "short" sessions (in this case, Alice's first two visits.)
- type: Filter
config:
language: python
keep: window_end - window_start > 15

# Only keep a couple of fields.
- type: MapToFields
config:
fields:
user: user
window_string: window_string

- type: LogForTesting

# Expected:
# Row(user='bob', window_string='[7.0, 30.0)')
# Row(user='alice', window_string='[101.0, 125.0)')
107 changes: 107 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from collections.abc import Iterable
from collections.abc import Mapping
from typing import Any
from typing import NamedTuple
from typing import Optional
from typing import TypeVar
from typing import Union
Expand All @@ -41,6 +42,8 @@
from apache_beam.typehints.schemas import schema_from_element_type
from apache_beam.typehints.schemas import typing_from_runner_api
from apache_beam.utils import python_callable
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import Timestamp
from apache_beam.yaml import json_utils
from apache_beam.yaml import options
from apache_beam.yaml import yaml_provider
Expand Down Expand Up @@ -842,6 +845,109 @@ def _AssignTimestamps(
).with_input_types(T).with_output_types(T)


class PaneInfoTuple(NamedTuple):
is_first: bool
is_last: bool
timing: str
index: int # type: ignore[assignment]
nonspeculative_index: int

@classmethod
def from_pane_info(cls, pane_info):
return cls(
pane_info.is_first,
pane_info.is_last,
windowed_value.PaneInfoTiming.to_string(pane_info.timing),
pane_info.index,
pane_info.nonspeculative_index)


_WINDOWING_INFO_TYPES = {
'timestamp': Timestamp,
'window_start': Optional[Timestamp],
'window_end': Timestamp,
'window_string': str,
'window_type': str,
'window_object': Any,
'pane_info': PaneInfoTuple,
}
_WINDOWING_INFO_EXTRACTORS = {
'timestamp': lambda locals: locals['timestamp'],
'window_start': lambda locals: getattr(locals['window'], 'start', None),
'window_end': lambda locals: locals['window'].end,
'window_string': lambda locals: str(locals['window']),
'window_type': lambda locals: type(locals['window']).__name__,
'window_object': lambda locals: locals['window'],
'pane_info': lambda locals: PaneInfoTuple.from_pane_info(
locals['pane_info']),
}
assert set(_WINDOWING_INFO_TYPES.keys()) == set(
_WINDOWING_INFO_EXTRACTORS.keys())


@beam.ptransform.ptransform_fn
def _ExtractWindowingInfo(
pcoll, fields: Optional[Union[Mapping[str, str], Iterable[str]]] = None):
"""
Extracts the implicit windowing information from an element and makes it
explicit as field(s) in the element itself.
The following windowing parameter values are supported:
* `timestamp`: The event timestamp of the current element.
* `window_start`: The start of the window iff it is an interval window.
* `window_end`: The (exclusive) end of the window.
* `window_string`: The string representation of the window.
* `window_type`: The type of the window as a string.
* `winodw_object`: The actual window object itself,
as a Java or Python object.
* `pane_info`: A schema'd representation of the current pane info, including
its index, whether it was the last firing, etc.
As a convenience, a list rather than a mapping of fields may be provided,
in which case the fields will be named according to the requested values.
Args:
fields: A mapping of new field names to various windowing parameters,
as documented above. If omitted, defaults to
`[timestamp, window_start, window_end]`.
"""
if fields is None:
fields = ['timestamp', 'window_start', 'window_end']
if not isinstance(fields, Mapping):
if isinstance(fields, Iterable) and not isinstance(fields, str):
fields = {fld: fld for fld in fields}
else:
raise TypeError(
'Fields must be a mapping or iterable of strings, got {fields}')

existing_fields = named_fields_from_element_type(pcoll.element_type)
new_fields = []
for field, value in fields.items():
if value not in _WINDOWING_INFO_TYPES:
raise ValueError(
f'{value} is not a valid windowing parameter; '
f'must be one of {list(_WINDOWING_INFO_TYPES.keys())}')
elif field in existing_fields:
raise ValueError(f'Input schema already has a field named {field}.')
else:
new_fields.append((field, _WINDOWING_INFO_TYPES[value]))

def augment_row(
row,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
pane_info=beam.DoFn.PaneInfoParam):
as_dict = row._asdict()
for field, value in fields.items():
as_dict[field] = _WINDOWING_INFO_EXTRACTORS[value](locals())
return beam.Row(**as_dict)

return pcoll | beam.Map(augment_row).with_output_types(
row_type.RowTypeConstraint.from_fields(
existing_fields + new_fields)) # type: ignore[operator]


def create_mapping_providers():
# These are MetaInlineProviders because their expansion is in terms of other
# YamlTransforms, but in a way that needs to be deferred until the input
Expand All @@ -852,6 +958,7 @@ def create_mapping_providers():
'AssignTimestamps-javascript': _AssignTimestamps,
'AssignTimestamps-generic': _AssignTimestamps,
'Explode': _Explode,
'ExtractWindowingInfo': _ExtractWindowingInfo,
'Filter-python': _PyJsFilter,
'Filter-javascript': _PyJsFilter,
'Filter-generic': _PyJsFilter,
Expand Down
94 changes: 94 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_mapping_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import logging
import typing
import unittest

import numpy as np
Expand All @@ -25,6 +26,8 @@
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.typehints import schemas
from apache_beam.utils.timestamp import Timestamp
from apache_beam.yaml import yaml_mapping
from apache_beam.yaml.yaml_transform import YamlTransform

DATA = [
Expand Down Expand Up @@ -457,6 +460,97 @@ def test_append_type_inference(self):
(('label', str), ('conductor', np.int64), ('rank', np.int64),
('new_label', str)))

def test_extract_windowing_info(self):
T = typing.TypeVar('T')
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = (
p
| beam.Create(
[beam.Row(value=1), beam.Row(value=2), beam.Row(value=11)])
| beam.Map(
lambda x: beam.transforms.window.TimestampedValue(
x, timestamp=x.value)).with_input_types(T).with_output_types(
T)
| beam.WindowInto(beam.transforms.window.FixedWindows(10)))
result = elements | YamlTransform(
'''
type: ExtractWindowingInfo
config:
fields:
timestamp: timestamp
window_start: window_start
window_end: window_end
window_string: window_string
window_type: window_type
window_object: window_object
pane_info_field: pane_info
''')
assert_that(
result,
equal_to([
beam.Row(
value=1,
timestamp=Timestamp(1),
window_start=Timestamp(0),
window_end=Timestamp(10),
window_string='[0.0, 10.0)',
window_type='IntervalWindow',
window_object=beam.transforms.window.IntervalWindow(0, 10),
pane_info_field=yaml_mapping.PaneInfoTuple(
True, True, 'UNKNOWN', 0, 0)),
beam.Row(
value=2,
timestamp=Timestamp(2),
window_start=Timestamp(0),
window_end=Timestamp(10),
window_string='[0.0, 10.0)',
window_type='IntervalWindow',
window_object=beam.transforms.window.IntervalWindow(0, 10),
pane_info_field=yaml_mapping.PaneInfoTuple(
True, True, 'UNKNOWN', 0, 0)),
beam.Row(
value=11,
timestamp=Timestamp(11),
window_start=Timestamp(10),
window_end=Timestamp(20),
window_string='[10.0, 20.0)',
window_type='IntervalWindow',
window_object=beam.transforms.window.IntervalWindow(10, 20),
pane_info_field=yaml_mapping.PaneInfoTuple(
True, True, 'UNKNOWN', 0, 0)),
]))

def test_extract_windowing_info_iterable(self):
T = typing.TypeVar('T')
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = (
p
| beam.Create(
[beam.Row(value=1), beam.Row(value=2), beam.Row(value=11)])
| beam.Map(
lambda x: beam.transforms.window.TimestampedValue(
x, timestamp=x.value)).with_input_types(T).with_output_types(
T))
result = elements | YamlTransform(
'''
type: ExtractWindowingInfo
config:
fields: [timestamp, window_type]
''')
assert_that(
result,
equal_to([
beam.Row(
value=1, timestamp=Timestamp(1), window_type='GlobalWindow'),
beam.Row(
value=2, timestamp=Timestamp(2), window_type='GlobalWindow'),
beam.Row(
value=11, timestamp=Timestamp(11),
window_type='GlobalWindow'),
]))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down

0 comments on commit 82d3d8d

Please sign in to comment.