Skip to content

Commit 05e939e

Browse files
authored
Fix coder parsing for any non-standard composite coder types (#34606)
* Fix coder for python tuple and potentially any non-standard composite types * Re-enable test_pack_combiners * Exclude CoderLengthPrefix from knownCompositeCoders so leaf and knownComposite are now mutually exclusive.
1 parent 46c3f26 commit 05e939e

File tree

2 files changed

+35
-10
lines changed

2 files changed

+35
-10
lines changed

sdks/go/pkg/beam/runners/prism/internal/coders.go

+35-5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ import (
3535
// leafCoders lists coder urns the runner knows how to manipulate.
3636
// In particular, ones that won't be a problem to parse, in general
3737
// because they have a known total size.
38+
//
39+
// Important: The 'leafCoders' and 'knownCompositeCoders' do not necessarily
40+
// cover all possible standard coder types.
41+
// For example, coderRow is neither a leaf coder nor a composite coder (so it
42+
// will have to be LP'd).
3843
var leafCoders = map[string]struct{}{
3944
urns.CoderBytes: {},
4045
urns.CoderStringUTF8: {},
@@ -51,6 +56,28 @@ func isLeafCoder(c *pipepb.Coder) bool {
5156
return ok
5257
}
5358

59+
// knownCompositeCoders lists coder urns that we expect to see components in
60+
// their spec.
61+
var knownCompositeCoders = map[string]struct{}{
62+
urns.CoderKV: {},
63+
urns.CoderIterable: {},
64+
urns.CoderTimer: {},
65+
urns.CoderWindowedValue: {},
66+
urns.CoderParamWindowedValue: {},
67+
urns.CoderStateBackedIterable: {},
68+
urns.CoderCustomWindow: {},
69+
urns.CoderShardedKey: {},
70+
urns.CoderNullable: {},
71+
// Exclude CoderLengthPrefix from the list. Even though it is a composite coder,
72+
// we never need to introspect its component.
73+
// urns.CoderLengthPrefix: {},
74+
}
75+
76+
func isKnownCompositeCoder(c *pipepb.Coder) bool {
77+
_, ok := knownCompositeCoders[c.GetSpec().GetUrn()]
78+
return ok
79+
}
80+
5481
// makeWindowedValueCoder gets the coder for the PCollection, renders it safe, and adds it to the coders map.
5582
//
5683
// PCollection coders are not inherently WindowValueCoder wrapped, and they are added by the runner
@@ -123,10 +150,10 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string,
123150
}
124151
// Add the original coder to the coders map.
125152
bundle[cID] = c
126-
// If we don't know this coder, and it has no sub components,
127-
// we must LP it, and we return the LP'd version.
153+
// If we don't know this coder, we must LP it, and we return the LP'd version.
128154
leaf := isLeafCoder(c)
129-
if len(c.GetComponentCoderIds()) == 0 && !leaf {
155+
knownComposite := isKnownCompositeCoder(c)
156+
if !leaf && !knownComposite {
130157
lpc := &pipepb.Coder{
131158
Spec: &pipepb.FunctionSpec{
132159
Urn: urns.CoderLengthPrefix,
@@ -136,15 +163,18 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string,
136163
bundle[lpcID] = lpc
137164
return lpcID, nil
138165
}
139-
// We know we have a composite, so if we count this as a leaf, move everything to
140-
// the coders map.
166+
// If it is a leaf, move its components (if any) to the coders map.
141167
if leaf {
142168
// Copy the components from the base.
143169
for _, cc := range c.GetComponentCoderIds() {
144170
bundle[cc] = base[cc]
145171
}
146172
return cID, nil
147173
}
174+
175+
// Now we have a known composite.
176+
// We may need to LP its components. If so, we make a new composite with the
177+
// LP'd components.
148178
var needNewComposite bool
149179
var comps []string
150180
for i, cc := range c.GetComponentCoderIds() {

sdks/python/apache_beam/runners/portability/prism_runner_test.py

-5
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,6 @@ def test_custom_window_type(self):
217217
"Requires Prism to support Custom Window Coders." +
218218
" https://github.com/apache/beam/issues/31921")
219219

220-
def test_pack_combiners(self):
221-
raise unittest.SkipTest(
222-
"Requires Prism to support coder:" +
223-
" 'beam:coder:tuple:v1'. https://github.com/apache/beam/issues/32636")
224-
225220
def test_metrics(self):
226221
super().test_metrics(check_bounded_trie=False)
227222

0 commit comments

Comments
 (0)