Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an ExtractWindowingInfo transform. #34051

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from collections.abc import Mapping
from typing import Any
from typing import Optional
from typing import NamedTuple
from typing import TypeVar
from typing import Union

Expand All @@ -41,6 +42,8 @@
from apache_beam.typehints.schemas import schema_from_element_type
from apache_beam.typehints.schemas import typing_from_runner_api
from apache_beam.utils import python_callable
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import Timestamp
from apache_beam.yaml import json_utils
from apache_beam.yaml import options
from apache_beam.yaml import yaml_provider
Expand Down Expand Up @@ -842,6 +845,107 @@ def _AssignTimestamps(
).with_input_types(T).with_output_types(T)


class PaneInfoTuple(NamedTuple):
is_first: bool
is_last: bool
timing: str
index: int
nonspeculative_index: int

@classmethod
def from_pane_info(cls, pane_info):
return cls(
pane_info.is_first,
pane_info.is_last,
windowed_value.PaneInfoTiming.to_string(pane_info.timing),
pane_info.index,
pane_info.nonspeculative_index)


_WINDOWING_INFO_TYPES = {
'timestamp': Timestamp,
'window_start': Optional[Timestamp],
'window_end': Timestamp,
'window_string': str,
'window_type': str,
'window_object': Any,
'pane_info': PaneInfoTuple,
}
_WINDOWING_INFO_EXTRACTORS = {
'timestamp': lambda locals: locals['timestamp'],
'window_start': lambda locals: getattr(locals['window'], 'start', None),
'window_end': lambda locals: locals['window'].end,
'window_string': lambda locals: str(locals['window']),
'window_type': lambda locals: type(locals['window']).__name__,
'window_object': lambda locals: locals['window'],
'pane_info': lambda locals: PaneInfoTuple.from_pane_info(
locals['pane_info']),
}
assert set(_WINDOWING_INFO_TYPES.keys()) == set(
_WINDOWING_INFO_EXTRACTORS.keys())


@beam.ptransform.ptransform_fn
def _ExtractWindowingInfo(pcoll, fields: Optional[Mapping[str, str]] = None):
"""
Extracts the implicit windowing information from an element and makes it
explicit as field(s) in the element itself.

The following windowing parameter values are supported:

* `timestamp`: The event timestamp of the current element.
* `window_start`: The start of the window iff it is an interval window.
* `window_end`: The (exclusive) end of the window.
* `window_string`: The string representation of the window.
* `window_type`: The type of the window as a string.
* `winodw_object`: The actual window object itself,
as a Java or Python object.
* `pane_info`: A schema'd representation of the current pane info, including
its index, whether it was the last firing, etc.

As a convenience, a list rather than a mapping of fields may be provide,
in which case the fields will be named according to the requested values.

Args:
fields: A mapping of new field names to various windowing parameters,
as documente above. If omitted, defaults to
`[timestamp, window_start, window_end]`.
"""
if fields is None:
fields = ['timestamp', 'window_start', 'window_end']
if not isinstance(fields, Mapping):
if isinstance(fields, Iterable) and not isinstance(fields, str):
fields = {fld: fld for fld in fields}
else:
raise TypeError(
'Fields must be a mapping or iterable of strings, got {fields}')

existing_fields = named_fields_from_element_type(pcoll.element_type)
new_fields = []
for field, value in fields.items():
if value not in _WINDOWING_INFO_TYPES:
raise ValueError(
f'{value} is not a valid windowing parameter; '
f'must be one of {list(_WINDOWING_INFO_TYPES.keys())}')
elif field in existing_fields:
raise ValueError(f'Input schema already has a field named {field}.')
else:
new_fields.append((field, _WINDOWING_INFO_TYPES[value]))

def augment_row(
row,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
pane_info=beam.DoFn.PaneInfoParam):
as_dict = row._asdict()
for field, value in fields.items():
as_dict[field] = _WINDOWING_INFO_EXTRACTORS[value](locals())
return beam.Row(**as_dict)

return pcoll | beam.Map(augment_row).with_output_types(
row_type.RowTypeConstraint.from_fields(existing_fields + new_fields))


def create_mapping_providers():
# These are MetaInlineProviders because their expansion is in terms of other
# YamlTransforms, but in a way that needs to be deferred until the input
Expand All @@ -852,6 +956,7 @@ def create_mapping_providers():
'AssignTimestamps-javascript': _AssignTimestamps,
'AssignTimestamps-generic': _AssignTimestamps,
'Explode': _Explode,
'ExtractWindowingInfo': _ExtractWindowingInfo,
'Filter-python': _PyJsFilter,
'Filter-javascript': _PyJsFilter,
'Filter-generic': _PyJsFilter,
Expand Down
94 changes: 94 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_mapping_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import logging
import typing
import unittest

import numpy as np
Expand All @@ -25,6 +26,8 @@
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.typehints import schemas
from apache_beam.utils.timestamp import Timestamp
from apache_beam.yaml import yaml_mapping
from apache_beam.yaml.yaml_transform import YamlTransform

DATA = [
Expand Down Expand Up @@ -457,6 +460,97 @@ def test_append_type_inference(self):
(('label', str), ('conductor', np.int64), ('rank', np.int64),
('new_label', str)))

def test_extract_windowing_info(self):
T = typing.TypeVar('T')
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = (
p
| beam.Create(
[beam.Row(value=1), beam.Row(value=2), beam.Row(value=11)])
| beam.Map(
lambda x: beam.transforms.window.TimestampedValue(
x, timestamp=x.value)).with_input_types(T).with_output_types(
T)
| beam.WindowInto(beam.transforms.window.FixedWindows(10)))
result = elements | YamlTransform(
'''
type: ExtractWindowingInfo
config:
fields:
timestamp: timestamp
window_start: window_start
window_end: window_end
window_string: window_string
window_type: window_type
window_object: window_object
pane_info_field: pane_info
''')
assert_that(
result,
equal_to([
beam.Row(
value=1,
timestamp=Timestamp(1),
window_start=Timestamp(0),
window_end=Timestamp(10),
window_string='[0.0, 10.0)',
window_type='IntervalWindow',
window_object=beam.transforms.window.IntervalWindow(0, 10),
pane_info_field=yaml_mapping.PaneInfoTuple(
True, True, 'UNKNOWN', 0, 0)),
beam.Row(
value=2,
timestamp=Timestamp(2),
window_start=Timestamp(0),
window_end=Timestamp(10),
window_string='[0.0, 10.0)',
window_type='IntervalWindow',
window_object=beam.transforms.window.IntervalWindow(0, 10),
pane_info_field=yaml_mapping.PaneInfoTuple(
True, True, 'UNKNOWN', 0, 0)),
beam.Row(
value=11,
timestamp=Timestamp(11),
window_start=Timestamp(10),
window_end=Timestamp(20),
window_string='[10.0, 20.0)',
window_type='IntervalWindow',
window_object=beam.transforms.window.IntervalWindow(10, 20),
pane_info_field=yaml_mapping.PaneInfoTuple(
True, True, 'UNKNOWN', 0, 0)),
]))

def test_extract_windowing_info_iterable(self):
T = typing.TypeVar('T')
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = (
p
| beam.Create(
[beam.Row(value=1), beam.Row(value=2), beam.Row(value=11)])
| beam.Map(
lambda x: beam.transforms.window.TimestampedValue(
x, timestamp=x.value)).with_input_types(T).with_output_types(
T))
result = elements | YamlTransform(
'''
type: ExtractWindowingInfo
config:
fields: [timestamp, window_type]
''')
assert_that(
result,
equal_to([
beam.Row(
value=1, timestamp=Timestamp(1), window_type='GlobalWindow'),
beam.Row(
value=2, timestamp=Timestamp(2), window_type='GlobalWindow'),
beam.Row(
value=11, timestamp=Timestamp(11),
window_type='GlobalWindow'),
]))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading