-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Inject SDK-side flattens while handling input/output coder mismatch in flattens. #34641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Inject SDK-side flattens while handling input/output coder mismatch in flattens. #34641
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @jrmccluskey for label go. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
…the fix. The test is also included in the test suite of flink, samza and spark, but without transcoding until their corresponding FRs are resolved.
9d6ec0a
to
369859b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like how clean this ended up. Great work.
@@ -88,8 +88,7 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep | |||
} | |||
|
|||
func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.Components) prepareResult { | |||
if !h.config.SDKFlatten { | |||
t.EnvironmentId = "" // force the flatten to be a runner transform due to configuration. | |||
if !h.config.SDKFlatten && !strings.HasPrefix(tid, "ft_") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll note that there's no user serviceable way to do these configurations at the moment, and it really was a hard binary. It would be acceptable to remove the SDKFlatten option in favour of just a single approach that biases to runner flattens, but does the SDK flatten to get around these issues.
@@ -492,6 +492,9 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa | |||
} | |||
|
|||
stg.internalCols = internal | |||
// Sort the keys of internal producers (from stageFacts.PcolProducers) | |||
// to ensure deterministic order for stable tests. | |||
sort.Strings(stg.internalCols) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good find! I thought I had everything deterministic already.
Following the idea of identity transform (#32930 (comment)) and valuable discussions with @lostluck, I have developed a "simpler and more general fix" for Prism runner issues involving
Flatten
. This approach works not only forFlatten->Flatten
scenarios, but also for others likeGroupBy->Flatten
.The key observations are:
Flatten
only passes data through from its input PCollections.Flatten
output.Previous approaches (including #34602) attempted to fix this by by overwriting the upstream coders with the
Flatten
output coder. This works in many scenarios, especially when the upstream transform is a SDK-side transform or anotherFlatten
(where #34602 ensured coder propagation).However, modifying upstream PCollection coders can cause side effects.
groupby->flatten
example (org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2
), changingGroupBy
's coder has no effect on the actual encoded data, becauseGroupBy
always generateK,Iterable<V>
. Downstream transforms expecting data encoded with theFlatten
's coder then fail during decoding.C = [A, B] | Flatten(), D = A | GroupByKey()
. IfFlatten
overwrites the coder associated with PCollectionA
, theGroupByKey
onA
may fail due to coder incompatibility.The proposed solution avoids the pitfalls of modifying upstream coders. Instead, it inserts an identity transform (specifically, an SDK-side Flatten) before the Runner
Flatten
. This identity transform implicitly converts the input PCollection's elements to use the same output coder as the target Runner Flatten, ensuring the correctly encoded data emitted from it.fixes #32930
fixes #34643
The PR also fixes the flaky test of
Test_preprocessor_preProcessGraph/ignoreEmptyAndIdentityTransform
under https://github.com/apache/beam/actions/workflows/beam_PreCommit_Go.yml.