-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Refactor commit logic out of StreamingDataflowWorker #30312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
5260d57
to
4a70e02
Compare
R: @scwhittle still working on the tests but curious about initial thoughts. Thanks! |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
74f8a1a
to
c9aab72
Compare
@scwhittle ready for a look! thanks |
76fd384
to
edf03c0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't get very far through but given round-trip time thought I'd send the comment I did have.
|
||
public WindmillStreamPoolCloseableStreamFactory(Supplier<StreamT> streamFactory) { | ||
this.streamPool = | ||
WindmillStreamPool.create(NUM_COMMIT_STREAMS, COMMIT_STREAM_TIMEOUT, streamFactory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just change StreamPool to have a method returning ClosableStreams? Not sure we need this new class. This class also hard-codes parameters that we might want to change for different stream pools.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done! good idea
this::onStreamingCommitComplete, | ||
started) | ||
: WorkCommitters.createApplianceWorkCommitter( | ||
windmillServer::commitWork, numCommitThreads, running::get, started); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the commit threads is always 1 for appliance, should we remove the param?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
() -> { | ||
while (true) { | ||
try { | ||
ready.await(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get rid of this countdownlatch?
It seems that we'd just fall through to blocking on the commitQueue which is contained in this class. Alternatively if that doesn't work for some reason, having some explicit start method on the committer interface seems better than passing in this latch that does so out-of-band.
ditto for the SE impl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return create(stream, () -> {}); | ||
} | ||
|
||
public abstract Supplier<StreamT> stream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this should be StreamT, otherwise we could bind a supplier that creates distinct streams on each call, and we're only closing it once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...he/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
Outdated
Show resolved
Hide resolved
...he/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
Show resolved
Hide resolved
private boolean tryAddToCommitStream(Commit commit, CommitWorkStream commitStream) { | ||
Preconditions.checkNotNull(commit); | ||
// Drop commits for failed work. Such commits will be dropped by Windmill anyway. | ||
if (commit.work().isFailed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems better to do this when getting the initial commit in the streamingCommitLoop and in batchCommitsToStream
otherwise we go onto batchCommitsToStream in a weird state that there was no commit added to the stream or we may count commits in batchCommitsToStream which weren't actually added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
activeCommitBytes.addAndGet(-size); | ||
return false; | ||
} | ||
private void onStreamingCommitFailed(Commit commit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get rid of the separate failed handler? Can we just create a CompletedCommit with appropriate error status and use single onCompletedCommit handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...he/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
Show resolved
Hide resolved
private static final Logger LOG = LoggerFactory.getLogger(StreamingApplianceWorkCommitter.class); | ||
private static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20; | ||
|
||
private final Consumer<CommitWorkRequest> commitWork; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some suffix to make it clearer this is funciton/consumer? makes reading code a little confusing below, there is a function of the same name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/CompleteCommit.java
Show resolved
Hide resolved
210b847
to
540acdb
Compare
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/CompleteCommit.java
Outdated
Show resolved
Hide resolved
...he/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
Show resolved
Hide resolved
...va/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/CloseableStream.java
Outdated
Show resolved
Hide resolved
@ThreadSafe | ||
final class StreamingEngineWorkCommitter implements WorkCommitter { | ||
private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineWorkCommitter.class); | ||
private static final int COMMIT_BATCH_SIZE = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might have changed the wrong one, the bytes is still a max since that is the queue limit. I meant what is now COMMIT_BATCH_SIZE to be TARGET_COMMIT_BATCH_SIZE (could use KEYS nstead of SIZE as well)
...pache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java
Outdated
Show resolved
Hide resolved
@@ -310,6 +311,10 @@ public void invalidate(ByteString processingKey, long shardingKey) { | |||
keyIndex.remove(key); | |||
} | |||
|
|||
public void invalidate(ShardedKey shardedKey) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
fakeWindmillServer.waitForAndGetCommits(commits.size() / 2); | ||
|
||
for (Commit commit : commits) { | ||
if (commit.work().isFailed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verify that non-failed ones are in committed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
WorkItemCommitRequest request = committed.get(commit.work().getWorkItem().getWorkToken()); | ||
assertNotNull(request); | ||
assertThat(request).isEqualTo(commit.request()); | ||
assertThat(completeCommits).contains(asCompleteCommit(commit)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a test that verifies some particular keys failing in windmill, that the non-ok commit status is plumbed correctly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done had to make some changes to FakeWindmillServer
|
||
@Test | ||
public void testParallelism() { | ||
workCommitter = createWorkCommitter(ignored -> {}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't seem worth testing IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
fyi today is release cut. What is the status of this PR? I see the comments are all replied? CC: @scwhittle @m-trieu |
There are still some open comments. This is part of work to support direct path which won't be ready for the cut. So I don't think this should hold up the cut. |
540acdb
to
7e28f3c
Compare
ready for another look @scwhittle thanks! |
presubmit failure is due to flaky test (working on a PR to fix) and not related. |
test flakiness attempted to be fixed here #30572 |
.setRemainingBytesForWorkItem(0) | ||
.build()); | ||
StreamingCommitResponse response = | ||
streamingCommitsToOffer.getOrDefault(streamingRequestBuilder.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are specifying hte default here and when creating the queue, seems like you could remove here.
I think instead of request/response pair injection you could set commitstatus for each worktoken. Then you wouldn't need the useInjectableStreamingCommitResponses, you could always lookup and default to OK.
The full request/response isn't adding any additonal testing since both sides are fake. And we need to call onDone with exactly one commit status for the request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done using map instead of response queue
@@ -122,6 +131,11 @@ public void setDropStreamingCommits(boolean dropStreamingCommits) { | |||
this.dropStreamingCommits = dropStreamingCommits; | |||
} | |||
|
|||
public void setUseInjectableStreamingCommitResponses( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment on what this means
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
} | ||
|
||
@Test | ||
public void testCommit_handlesCompleteCommits_commitStatusOK() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge with testCommit_sendsCommitsToStreamingEngine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
commits.add(Commit.create(commitRequest, createComputationState("computationId-" + i), work)); | ||
fakeWindmillServer | ||
.whenCommitWorkStreamCalled() | ||
.thenAnswer(request -> toResponse(request, INVALID_TOKEN)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be better to have some ok and some different commit status to verify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
@scwhittle any more comments here? Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly done
commit.work().setState(Work.State.COMMIT_QUEUED); | ||
} | ||
|
||
activeCommitBytes.addAndGet(-commit.getSize()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
activeCommitBytes is no longer tracking the bytes until they complete, just until they are added on the stream
You should:
- move the decrement into the commitWorkItem ondonefn above. That handles the case it was added successfully, tracking bytes until the commit response is recieved
- decrement here only if !isCommitSuccesful to cover the case we're keeping it until next time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
workCommitter.start(); | ||
commits.forEach(workCommitter::commit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a paralleliszed forEach to cover thread-safety a little?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
added drain (fail work and clear queues) when we close the SEWorkCommitter will need this for direct path. in non direct path mode we would never call stop (except when we call stop on the entire harness in StreamingDataflowWorker) so it will not affect that code path. @scwhittle |
} | ||
|
||
public void clear() { | ||
queue.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not updating limit. To do so properly it seems easiest to poll() until null, but actually perhaps a better alternative is to remove stream() and clear() and just do that loop on poll() until null in StreamingEngineWorkCommitter
That is less API of this class to test and in general if queue is still in use, stream()+clear() may process something that is poll()/take() in between the calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
private void drainCommitQueue() { | ||
commitQueue.stream().forEach(this::failCommit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see other comment, how about loop on poll instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
(commitStatus) -> { | ||
onCommitComplete.accept(CompleteCommit.create(commit, commitStatus)); | ||
activeCommitBytes.addAndGet(-commit.getSize()); | ||
}); | ||
|
||
if (!isCommitSuccessful) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Since the commit was not accepted, revert the changes made above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
Run Java PreCommit |
test failures should be fixed in #30572 since it removes arbitrary waiting from the tests (replaced with countdownlatch as event triggers) |
The other PR is merged, woudl like to verify this passes now |
Run Java PreCommit |
…he StreamingEngineWorkCommitter; add test to make sure drain is working
dfb5341
to
cfa5039
Compare
wooh! looks good |
StreamingDataflowWorker is a rather large class
For maintainability it would be helpful in the long run to encapsulate different pieces of the worker harness logic into their own class.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.