Skip to content

Commit 0bfe06b

Browse files
authored
Merge pull request #30587 Properly handle timestamp prefixing of unkown window types.
2 parents da26f5a + 0ba3573 commit 0bfe06b

File tree

8 files changed

+217
-5
lines changed

8 files changed

+217
-5
lines changed

sdks/python/apache_beam/coders/coder_impl.py

+59-4
Original file line numberDiff line numberDiff line change
@@ -1641,10 +1641,65 @@ def decode_from_stream(self, stream, nested):
16411641
return self._window_coder_impl.decode_from_stream(stream, nested)
16421642

16431643
def estimate_size(self, value: Any, nested: bool = False) -> int:
1644-
estimated_size = 0
1645-
estimated_size += TimestampCoderImpl().estimate_size(value)
1646-
estimated_size += self._window_coder_impl.estimate_size(value, nested)
1647-
return estimated_size
1644+
return (
1645+
TimestampCoderImpl().estimate_size(value.max_timestamp()) +
1646+
self._window_coder_impl.estimate_size(value, nested))
1647+
1648+
1649+
_OpaqueWindow = None
1650+
1651+
1652+
def _create_opaque_window(end, encoded_window):
1653+
# This is lazy to avoid circular import issues.
1654+
global _OpaqueWindow
1655+
if _OpaqueWindow is None:
1656+
from apache_beam.transforms.window import BoundedWindow
1657+
1658+
class _OpaqueWindow(BoundedWindow):
1659+
def __init__(self, end, encoded_window):
1660+
super().__init__(end)
1661+
self.encoded_window = encoded_window
1662+
1663+
def __repr__(self):
1664+
return 'OpaqueWindow(%s, %s)' % (self.end, self.encoded_window)
1665+
1666+
def __hash__(self):
1667+
return hash(self.encoded_window)
1668+
1669+
def __eq__(self, other):
1670+
return (
1671+
type(self) == type(other) and self.end == other.end and
1672+
self.encoded_window == other.encoded_window)
1673+
1674+
return _OpaqueWindow(end, encoded_window)
1675+
1676+
1677+
class TimestampPrefixingOpaqueWindowCoderImpl(StreamCoderImpl):
1678+
"""For internal use only; no backwards-compatibility guarantees.
1679+
1680+
A coder for unknown window types, which prefix required max_timestamp to
1681+
encoded original window.
1682+
1683+
The coder encodes and decodes custom window types with following format:
1684+
window's max_timestamp()
1685+
length prefixed encoded window
1686+
"""
1687+
def __init__(self) -> None:
1688+
pass
1689+
1690+
def encode_to_stream(self, value, stream, nested):
1691+
TimestampCoderImpl().encode_to_stream(value.max_timestamp(), stream, True)
1692+
stream.write(value.encoded_window, True)
1693+
1694+
def decode_from_stream(self, stream, nested):
1695+
max_timestamp = TimestampCoderImpl().decode_from_stream(stream, True)
1696+
return _create_opaque_window(
1697+
max_timestamp.successor(), stream.read_all(True))
1698+
1699+
def estimate_size(self, value: Any, nested: bool = False) -> int:
1700+
return (
1701+
TimestampCoderImpl().estimate_size(value.max_timestamp()) +
1702+
len(value.encoded_window))
16481703

16491704

16501705
row_coders_registered = False

sdks/python/apache_beam/coders/coders.py

+28
Original file line numberDiff line numberDiff line change
@@ -1628,6 +1628,34 @@ def __hash__(self):
16281628
common_urns.coders.CUSTOM_WINDOW.urn, TimestampPrefixingWindowCoder)
16291629

16301630

1631+
class TimestampPrefixingOpaqueWindowCoder(FastCoder):
1632+
"""For internal use only; no backwards-compatibility guarantees.
1633+
1634+
Coder which decodes windows as bytes."""
1635+
def __init__(self) -> None:
1636+
pass
1637+
1638+
def _create_impl(self):
1639+
return coder_impl.TimestampPrefixingOpaqueWindowCoderImpl()
1640+
1641+
def is_deterministic(self) -> bool:
1642+
return True
1643+
1644+
def __repr__(self):
1645+
return 'TimestampPrefixingOpaqueWindowCoder'
1646+
1647+
def __eq__(self, other):
1648+
return type(self) == type(other)
1649+
1650+
def __hash__(self):
1651+
return hash((type(self)))
1652+
1653+
1654+
Coder.register_structured_urn(
1655+
python_urns.TIMESTAMP_PREFIXED_OPAQUE_WINDOW_CODER,
1656+
TimestampPrefixingOpaqueWindowCoder)
1657+
1658+
16311659
class BigIntegerCoder(FastCoder):
16321660
def _create_impl(self):
16331661
return coder_impl.BigIntegerCoderImpl()

sdks/python/apache_beam/coders/coders_test_common.py

+10
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ def tearDownClass(cls):
164164
coders.SinglePrecisionFloatCoder,
165165
coders.ToBytesCoder,
166166
coders.BigIntegerCoder, # tested in DecimalCoder
167+
coders.TimestampPrefixingOpaqueWindowCoder,
167168
])
168169
cls.seen_nested -= set(
169170
[coders.ProtoCoder, coders.ProtoPlusCoder, CustomCoder])
@@ -739,6 +740,15 @@ def test_timestamp_prefixing_window_coder(self):
739740
coders.IntervalWindowCoder()), )),
740741
(window.IntervalWindow(0, 10), ))
741742

743+
def test_timestamp_prefixing_opaque_window_coder(self):
744+
sdk_coder = coders.TimestampPrefixingWindowCoder(
745+
coders.LengthPrefixCoder(coders.PickleCoder()))
746+
safe_coder = coders.TimestampPrefixingOpaqueWindowCoder()
747+
for w in [window.IntervalWindow(1, 123), window.GlobalWindow()]:
748+
round_trip = sdk_coder.decode(
749+
safe_coder.encode(safe_coder.decode(sdk_coder.encode(w))))
750+
self.assertEqual(w, round_trip)
751+
742752
def test_decimal_coder(self):
743753
test_coder = coders.DecimalCoder()
744754

sdks/python/apache_beam/portability/python_urns.py

+4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@
4040
# Components: The coders for the tuple elements, in order.
4141
TUPLE_CODER = "beam:coder:tuple:v1"
4242

43+
# This allows us to decode TimestampedPrefixed(LengthPrefixed(AnyWindowCoder)).
44+
TIMESTAMP_PREFIXED_OPAQUE_WINDOW_CODER = (
45+
"beam:timestamp_prefixed_opaque_window_coder:v1")
46+
4347
# Invoke UserFns in process, via direct function calls.
4448
# Payload: None.
4549
EMBEDDED_PYTHON = "beam:env:embedded_python:v1"

sdks/python/apache_beam/runners/portability/flink_runner_test.py

+3
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,9 @@ def test_register_finalizations(self):
314314
def test_custom_merging_window(self):
315315
raise unittest.SkipTest("https://github.com/apache/beam/issues/20641")
316316

317+
def test_custom_window_type(self):
318+
raise unittest.SkipTest("https://github.com/apache/beam/issues/20641")
319+
317320
# Inherits all other tests.
318321

319322

sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py

+54
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,19 @@ def test_custom_merging_window(self):
11171117
from apache_beam.runners.portability.fn_api_runner.execution import GenericMergingWindowFn
11181118
self.assertEqual(GenericMergingWindowFn._HANDLES, {})
11191119

1120+
def test_custom_window_type(self):
1121+
with self.create_pipeline() as p:
1122+
res = (
1123+
p
1124+
| beam.Create([1, 2, 100, 101, 102])
1125+
| beam.Map(lambda t: window.TimestampedValue(('k', t), t))
1126+
| beam.WindowInto(EvenOddWindows())
1127+
| beam.GroupByKey()
1128+
| beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
1129+
assert_that(
1130+
res,
1131+
equal_to([('k', [1]), ('k', [2]), ('k', [101]), ('k', [100, 102])]))
1132+
11201133
@unittest.skip('BEAM-9119: test is flaky')
11211134
def test_large_elements(self):
11221135
with self.create_pipeline() as p:
@@ -2379,6 +2392,47 @@ def get_window_coder(self):
23792392
return coders.IntervalWindowCoder()
23802393

23812394

2395+
class ColoredFixedWindow(window.BoundedWindow):
2396+
def __init__(self, end, color):
2397+
super().__init__(end)
2398+
self.color = color
2399+
2400+
def __hash__(self):
2401+
return hash((self.end, self.color))
2402+
2403+
def __eq__(self, other):
2404+
return (
2405+
type(self) == type(other) and self.end == other.end and
2406+
self.color == other.color)
2407+
2408+
2409+
class ColoredFixedWindowCoder(beam.coders.Coder):
2410+
kv_coder = beam.coders.TupleCoder(
2411+
[beam.coders.TimestampCoder(), beam.coders.StrUtf8Coder()])
2412+
2413+
def encode(self, colored_window):
2414+
return self.kv_coder.encode((colored_window.end, colored_window.color))
2415+
2416+
def decode(self, encoded_window):
2417+
return ColoredFixedWindow(*self.kv_coder.decode(encoded_window))
2418+
2419+
def is_deterministic(self):
2420+
return True
2421+
2422+
2423+
class EvenOddWindows(window.NonMergingWindowFn):
2424+
def assign(self, context):
2425+
timestamp = context.timestamp
2426+
return [
2427+
ColoredFixedWindow(
2428+
timestamp - timestamp % 10 + 10,
2429+
'red' if timestamp.micros // 1000000 % 2 else 'black')
2430+
]
2431+
2432+
def get_window_coder(self):
2433+
return ColoredFixedWindowCoder()
2434+
2435+
23822436
class ExpectingSideInputsFn(beam.DoFn):
23832437
def __init__(self, name):
23842438
self._name = name

sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py

+53-1
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ def __init__(
428428
self._known_coder_urns = set.union(
429429
# Those which are required.
430430
self._REQUIRED_CODER_URNS,
431-
# Those common coders which are understood by all environments.
431+
# Those common coders which are understood by many environments.
432432
self._COMMON_CODER_URNS.intersection(
433433
*(
434434
set(env.capabilities)
@@ -515,8 +515,40 @@ def maybe_length_prefixed_and_safe_coder(self, coder_id):
515515
# type: (str) -> Tuple[str, str]
516516
coder = self.components.coders[coder_id]
517517
if coder.spec.urn == common_urns.coders.LENGTH_PREFIX.urn:
518+
# If the coder is already length prefixed, we can use it as is, and
519+
# have the runner treat it as opaque bytes.
518520
return coder_id, self.bytes_coder_id
521+
elif (coder.spec.urn == common_urns.coders.WINDOWED_VALUE.urn and
522+
self.components.coders[coder.component_coder_ids[1]].spec.urn not in
523+
self._known_coder_urns):
524+
# A WindowedValue coder with an unknown window type.
525+
# This needs to be encoded in such a way that we still have access to its
526+
# timestmap.
527+
lp_elem_coder = self.maybe_length_prefixed_coder(
528+
coder.component_coder_ids[0])
529+
tp_window_coder = self.timestamped_prefixed_window_coder(
530+
coder.component_coder_ids[1])
531+
new_coder_id = unique_name(
532+
self.components.coders, coder_id + '_timestamp_prefixed')
533+
self.components.coders[new_coder_id].CopyFrom(
534+
beam_runner_api_pb2.Coder(
535+
spec=beam_runner_api_pb2.FunctionSpec(
536+
urn=common_urns.coders.WINDOWED_VALUE.urn),
537+
component_coder_ids=[lp_elem_coder, tp_window_coder]))
538+
safe_coder_id = unique_name(
539+
self.components.coders, coder_id + '_timestamp_prefixed_opaque')
540+
self.components.coders[safe_coder_id].CopyFrom(
541+
beam_runner_api_pb2.Coder(
542+
spec=beam_runner_api_pb2.FunctionSpec(
543+
urn=common_urns.coders.WINDOWED_VALUE.urn),
544+
component_coder_ids=[
545+
self.safe_coders[lp_elem_coder],
546+
self.safe_coders[tp_window_coder]
547+
]))
548+
return new_coder_id, safe_coder_id
519549
elif coder.spec.urn in self._known_coder_urns:
550+
# A known coder type, but its components may still need to be length
551+
# prefixed.
520552
new_component_ids = [
521553
self.maybe_length_prefixed_coder(c) for c in coder.component_coder_ids
522554
]
@@ -538,6 +570,7 @@ def maybe_length_prefixed_and_safe_coder(self, coder_id):
538570
spec=coder.spec, component_coder_ids=safe_component_ids))
539571
return new_coder_id, safe_coder_id
540572
else:
573+
# A completely unkown coder. Wrap the entire thing in a length prefix.
541574
new_coder_id = unique_name(
542575
self.components.coders, coder_id + '_length_prefixed')
543576
self.components.coders[new_coder_id].CopyFrom(
@@ -547,6 +580,25 @@ def maybe_length_prefixed_and_safe_coder(self, coder_id):
547580
component_coder_ids=[coder_id]))
548581
return new_coder_id, self.bytes_coder_id
549582

583+
@memoize_on_instance
584+
def timestamped_prefixed_window_coder(self, coder_id):
585+
length_prefixed = self.maybe_length_prefixed_coder(coder_id)
586+
new_coder_id = unique_name(
587+
self.components.coders, coder_id + '_timestamp_prefixed')
588+
self.components.coders[new_coder_id].CopyFrom(
589+
beam_runner_api_pb2.Coder(
590+
spec=beam_runner_api_pb2.FunctionSpec(
591+
urn=common_urns.coders.CUSTOM_WINDOW.urn),
592+
component_coder_ids=[length_prefixed]))
593+
safe_coder_id = unique_name(
594+
self.components.coders, coder_id + '_timestamp_prefixed_opaque')
595+
self.components.coders[safe_coder_id].CopyFrom(
596+
beam_runner_api_pb2.Coder(
597+
spec=beam_runner_api_pb2.FunctionSpec(
598+
urn=python_urns.TIMESTAMP_PREFIXED_OPAQUE_WINDOW_CODER)))
599+
self.safe_coders[new_coder_id] = safe_coder_id
600+
return new_coder_id
601+
550602
def length_prefix_pcoll_coders(self, pcoll_id):
551603
# type: (str) -> None
552604
self.components.pcollections[pcoll_id].coder_id = (

sdks/python/apache_beam/utils/timestamp.py

+6
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ def predecessor(self):
146146
"""Returns the largest timestamp smaller than self."""
147147
return Timestamp(micros=self.micros - 1)
148148

149+
def successor(self):
150+
# type: () -> Timestamp
151+
152+
"""Returns the smallest timestamp larger than self."""
153+
return Timestamp(micros=self.micros + 1)
154+
149155
def __repr__(self):
150156
# type: () -> str
151157
micros = self.micros

0 commit comments

Comments
 (0)