-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Add reshuffle before triggering load jobs. #34657
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Run Python_Coverage PreCommit |
self.reshuffle_before_load = not util.is_compat_version_prior_to( | ||
p.options, "2.65.0") | ||
if self.reshuffle_before_load: | ||
# Ensure that TriggerLoadJob retry inputs are deterministic by breaking |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does the non-determinism currently come from? If I'm reading things correctly, the preceding transform (PartitionFiles) is the only thing between this and the last fusion break (the GBK), and I think that should be deterministic since its operating per-element, but it is possible I'm missing something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, I am not sure exactly where the non-determinism currently come from, but we have seen cases of number of files being uploaded being different between retries during autoscaling and this is the only plausible explanation I could come up with.
that should be deterministic since its operating per-element, but it is possible I'm missing something
Can you elaborate on this?
Does GroupByKey guarantee determinism for the inputs to PartitionFiles? Without a Reshuffle it looks like part of the GroupFilesByTableDestinations (it lists being part of 3 stages?), PartitionFiles and the TriggerLoadJobs are fused into a single stage.
Adding a reshuffle puts TriggerLoadJobs* in their own stages, but it is less obvious what is happening with just the GBK.
Java has this precaution
Line 822 in 38192de
.apply("MultiPartitionsReshuffle", Reshuffle.of()) |
Line 865 in 38192de
.apply("SinglePartitionsReshuffle", Reshuffle.of()) |
https://cloud.google.com/dataflow/docs/concepts/exactly-once#output-delivery mentions best practice for IO's is to add a reshuffle before doing a write with side effects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it more, does Reshuffle force determinism by grouping by unique id's?
Without reshuffle, if more elements destined for a given destination (key for GroupFilesByTableDestinations) arrived between retries, is there a chance these new files could be materialized for the key, and therefore more files are read by the GroupFilesByTableDestinations.read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mentioned offline
The contents of GroupByKey should be deterministic unless there was a trigger that fired twice (in which case everything gets recomputed, there's no determinism at all)
The order may not be deterministic; I wonder if that's the issue here. That could potentially be solved for by sorting the files in PartitionFiles (thus making GBK + PartitionFiles collectively deterministic)
What are the symptoms you're seeing?
Without reshuffle, if more elements destined for a given destination (key for GroupFilesByTableDestinations) arrived between retries, is there a chance these new files could be materialized for the key, and therefore more files are read by the GroupFilesByTableDestinations.read?
The GBK operation shouldn't happen until the first trigger has fired. If more elements arrived late, that indicates that another trigger fired (which would impact the Reshuffle case as well)
https://cloud.google.com/dataflow/docs/concepts/exactly-once#output-delivery mentions best practice for IO's is to add a reshuffle before doing a write with side effects.
Functionally, we have a reshuffle as part of our GBK; adding another one will make this more expensive (obviously if we need it for correctness we need it, but we should understand why first)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok - after chatting a bit offline my take is that I'm fine with this change if we can have confidence it will fix the issue (the performance cost is vanishingly small since it is just file names being shuffled, not records), but we should have that either empirically or theoretically.
@kennknowles may have ideas on what is going on here since it may be related to triggering semantics -
def _window_fn(self): |
I will be AFK for the next 2 weeks, so don't block on me going forward :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sg, I did more testing and have been able to consistently reproduce it without this fix, and have not been able to repro it with this fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let us update CHANGES.md to mention this graph change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it more, does Reshuffle force determinism by grouping by unique id's?
Reshuffle checkpoints on Dataflow but not other runners. Otherwise any randomly generated ids in a fused stage will be generated again on retry. Hence this behavior getting the name RequiresStableInput
which is the "right" way to express this (but unfortunately I don't think complete enough to use here).
self.reshuffle_before_load = not util.is_compat_version_prior_to( | ||
p.options, "2.65.0") | ||
if self.reshuffle_before_load: | ||
# Ensure that TriggerLoadJob retry inputs are deterministic by breaking |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it more, does Reshuffle force determinism by grouping by unique id's?
Reshuffle checkpoints on Dataflow but not other runners. Otherwise any randomly generated ids in a fused stage will be generated again on retry. Hence this behavior getting the name RequiresStableInput
which is the "right" way to express this (but unfortunately I don't think complete enough to use here).
@@ -1101,6 +1101,18 @@ def _load_data( | |||
of the load jobs would fail but not other. If any of them fails, then | |||
copy jobs are not triggered. | |||
""" | |||
self.reshuffle_before_load = not util.is_compat_version_prior_to( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Structural comments:
- This reshuffle should be added outside of this transform. Make it the responsibility of the caller to ensure stable inputs.
- It is be nicer to fully fork
expand
whenever we move to a new update-incompatible version. Basically freeze the old one and leave it behind (likereturn expand_2_53_0(input); expand
itself straight-line code except for the top-level decision of which version to expand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reshuffle should be added outside of this transform. Make it the responsibility of the caller to ensure stable inputs.
Apologies, but I don't understand 100%.
class BigQueryBatchFileLoads(beam.PTransform): |
def _load_data( |
class TriggerLoadJobs(beam.DoFn): |
So I am adding the reshuffle right before the DoFn that requires stable inputs (in _load_data).
Are you saying to create a new PTransform that wraps TriggerLoadJobs, with expand_2_264_0() that just returns ParDo(TriggerLoadJobs) and expand() returns Reshuffle() | ParDo(TriggerLoadJobs)?
param(compat_version=None), | ||
param(compat_version="2.64.0"), | ||
]) | ||
def test_reshuffle_before_load(self, compat_version): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this really tests what you want to test. You want:
- correctness of the transform with both settings of the flag
- if possible, some way to reproduce the issue you had before that is red before, green after
You also want to build an internal dataflow test of update compatibility with requested 2.64.0 version. I can show you a CL that does that if you haven't already seen them.
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.