Skip to content

Commit f910184

Browse files
committed
Properly propagate schemas of Beam YAML Partition transform.
1 parent 781917a commit f910184

File tree

2 files changed

+4
-0
lines changed

2 files changed

+4
-0
lines changed

sdks/python/apache_beam/yaml/yaml_mapping.py

+3
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,9 @@ def split(element):
814814
mapping_transform = mapping_transform.with_outputs(*output_set)
815815
splits = pcoll | mapping_transform.with_input_types(T).with_output_types(T)
816816
result = {out: getattr(splits, out) for out in output_set}
817+
for tag, out in result.items():
818+
if tag != error_output:
819+
out.element_type = pcoll.element_type
817820
if error_output:
818821
result[error_output] = result[error_output] | map_errors_to_standard_format(
819822
pcoll.element_type)

sdks/python/apache_beam/yaml/yaml_mapping_test.py

+1
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ def test_partition(self):
212212
language: python
213213
outputs: [even, odd]
214214
''')
215+
self.assertEqual(result['even'].element_type, elements.element_type)
215216
assert_that(
216217
result['even'] | beam.Map(lambda x: x.element),
217218
equal_to(['banana', 'orange']),

0 commit comments

Comments
 (0)