Skip to content

Commit c298da5

Browse files
authored
Refactor commit logic out of StreamingDataflowWorker (#30312)
1 parent 49c7864 commit c298da5

File tree

14 files changed

+1105
-205
lines changed

14 files changed

+1105
-205
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

+42-195
Large diffs are not rendered by default.

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillComputationKey.java

+5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.runners.dataflow.worker;
1919

2020
import com.google.auto.value.AutoValue;
21+
import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
2122
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
2223
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat;
2324

@@ -29,6 +30,10 @@ public static WindmillComputationKey create(
2930
return new AutoValue_WindmillComputationKey(computationId, key, shardingKey);
3031
}
3132

33+
public static WindmillComputationKey create(String computationId, ShardedKey shardedKey) {
34+
return create(computationId, shardedKey.key(), shardedKey.shardingKey());
35+
}
36+
3237
public abstract String computationId();
3338

3439
public abstract ByteString key();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.windmill.client;
19+
20+
import com.google.auto.value.AutoValue;
21+
import org.apache.beam.sdk.annotations.Internal;
22+
23+
/**
24+
* Wrapper for a {@link WindmillStream} that allows callers to tie an action after the stream is
25+
* finished being used. Has an option for closing code to be a no-op.
26+
*/
27+
@Internal
28+
@AutoValue
29+
public abstract class CloseableStream<StreamT extends WindmillStream> implements AutoCloseable {
30+
public static <StreamT extends WindmillStream> CloseableStream<StreamT> create(
31+
StreamT stream, Runnable onClose) {
32+
return new AutoValue_CloseableStream<>(stream, onClose);
33+
}
34+
35+
public abstract StreamT stream();
36+
37+
abstract Runnable onClose();
38+
39+
@Override
40+
public void close() throws Exception {
41+
onClose().run();
42+
}
43+
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPool.java

+7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.function.Supplier;
2626
import javax.annotation.concurrent.GuardedBy;
2727
import javax.annotation.concurrent.ThreadSafe;
28+
import org.apache.beam.sdk.annotations.Internal;
2829
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
2930
import org.checkerframework.checker.nullness.qual.Nullable;
3031
import org.joda.time.Duration;
@@ -36,6 +37,7 @@
3637
* <p>The pool holds a fixed total number of streams, and keeps each stream open for a specified
3738
* time to allow for better load-balancing.
3839
*/
40+
@Internal
3941
@ThreadSafe
4042
public class WindmillStreamPool<StreamT extends WindmillStream> {
4143

@@ -131,6 +133,11 @@ public StreamT getStream() {
131133
}
132134
}
133135

136+
public CloseableStream<StreamT> getCloseableStream() {
137+
StreamT stream = getStream();
138+
return CloseableStream.create(stream, () -> releaseStream(stream));
139+
}
140+
134141
private synchronized WindmillStreamPool.StreamData<StreamT> createAndCacheStream(int cacheKey) {
135142
WindmillStreamPool.StreamData<StreamT> newStreamData =
136143
new WindmillStreamPool.StreamData<>(streamSupplier.get());
+9-1
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,17 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.beam.runners.dataflow.worker.streaming;
18+
package org.apache.beam.runners.dataflow.worker.windmill.client.commits;
1919

2020
import com.google.auto.value.AutoValue;
21+
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
22+
import org.apache.beam.runners.dataflow.worker.streaming.Work;
2123
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
24+
import org.apache.beam.sdk.annotations.Internal;
2225
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
2326

2427
/** Value class for a queued commit. */
28+
@Internal
2529
@AutoValue
2630
public abstract class Commit {
2731

@@ -31,6 +35,10 @@ public static Commit create(
3135
return new AutoValue_Commit(request, computationState, work);
3236
}
3337

38+
public final String computationId() {
39+
return computationState().getComputationId();
40+
}
41+
3442
public abstract WorkItemCommitRequest request();
3543

3644
public abstract ComputationState computationState();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.windmill.client.commits;
19+
20+
import com.google.auto.value.AutoValue;
21+
import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
22+
import org.apache.beam.runners.dataflow.worker.streaming.WorkId;
23+
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
24+
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
25+
import org.apache.beam.sdk.annotations.Internal;
26+
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
27+
28+
/**
29+
* A {@link Commit} is marked as complete when it has been attempted to be committed back to
30+
* Streaming Engine/Appliance via {@link
31+
* org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub#commitWorkStream(StreamObserver)}
32+
* for Streaming Engine or {@link
33+
* org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub#commitWork(Windmill.CommitWorkRequest,
34+
* StreamObserver)} for Streaming Appliance.
35+
*/
36+
@Internal
37+
@AutoValue
38+
public abstract class CompleteCommit {
39+
40+
public static CompleteCommit create(Commit commit, CommitStatus commitStatus) {
41+
return new AutoValue_CompleteCommit(
42+
commit.computationId(),
43+
ShardedKey.create(commit.request().getKey(), commit.request().getShardingKey()),
44+
WorkId.builder()
45+
.setWorkToken(commit.request().getWorkToken())
46+
.setCacheToken(commit.request().getCacheToken())
47+
.build(),
48+
commitStatus);
49+
}
50+
51+
public static CompleteCommit create(
52+
String computationId, ShardedKey shardedKey, WorkId workId, CommitStatus status) {
53+
return new AutoValue_CompleteCommit(computationId, shardedKey, workId, status);
54+
}
55+
56+
public static CompleteCommit forFailedWork(Commit commit) {
57+
return create(commit, CommitStatus.ABORTED);
58+
}
59+
60+
public abstract String computationId();
61+
62+
public abstract ShardedKey shardedKey();
63+
64+
public abstract WorkId workId();
65+
66+
public abstract CommitStatus status();
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.windmill.client.commits;
19+
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
import java.util.function.Consumer;
26+
import javax.annotation.concurrent.ThreadSafe;
27+
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
28+
import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
29+
import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
30+
import org.apache.beam.runners.dataflow.worker.streaming.Work;
31+
import org.apache.beam.runners.dataflow.worker.streaming.WorkId;
32+
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
33+
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest;
34+
import org.apache.beam.sdk.annotations.Internal;
35+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
/** Streaming appliance implementation of {@link WorkCommitter}. */
40+
@Internal
41+
@ThreadSafe
42+
public final class StreamingApplianceWorkCommitter implements WorkCommitter {
43+
private static final Logger LOG = LoggerFactory.getLogger(StreamingApplianceWorkCommitter.class);
44+
private static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20;
45+
private static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB
46+
47+
private final Consumer<CommitWorkRequest> commitWorkFn;
48+
private final WeightedBoundedQueue<Commit> commitQueue;
49+
private final ExecutorService commitWorkers;
50+
private final AtomicLong activeCommitBytes;
51+
private final Consumer<CompleteCommit> onCommitComplete;
52+
53+
private StreamingApplianceWorkCommitter(
54+
Consumer<CommitWorkRequest> commitWorkFn, Consumer<CompleteCommit> onCommitComplete) {
55+
this.commitWorkFn = commitWorkFn;
56+
this.commitQueue =
57+
WeightedBoundedQueue.create(
58+
MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize()));
59+
this.commitWorkers =
60+
Executors.newSingleThreadScheduledExecutor(
61+
new ThreadFactoryBuilder()
62+
.setDaemon(true)
63+
.setPriority(Thread.MAX_PRIORITY)
64+
.setNameFormat("CommitThread-%d")
65+
.build());
66+
this.activeCommitBytes = new AtomicLong();
67+
this.onCommitComplete = onCommitComplete;
68+
}
69+
70+
public static StreamingApplianceWorkCommitter create(
71+
Consumer<CommitWorkRequest> commitWork, Consumer<CompleteCommit> onCommitComplete) {
72+
return new StreamingApplianceWorkCommitter(commitWork, onCommitComplete);
73+
}
74+
75+
@Override
76+
@SuppressWarnings("FutureReturnValueIgnored")
77+
public void start() {
78+
if (!commitWorkers.isShutdown()) {
79+
commitWorkers.submit(this::commitLoop);
80+
}
81+
}
82+
83+
@Override
84+
public void commit(Commit commit) {
85+
commitQueue.put(commit);
86+
}
87+
88+
@Override
89+
public long currentActiveCommitBytes() {
90+
return activeCommitBytes.get();
91+
}
92+
93+
@Override
94+
public void stop() {
95+
commitWorkers.shutdownNow();
96+
}
97+
98+
@Override
99+
public int parallelism() {
100+
return 1;
101+
}
102+
103+
private void commitLoop() {
104+
Map<ComputationState, Windmill.ComputationCommitWorkRequest.Builder> computationRequestMap =
105+
new HashMap<>();
106+
while (true) {
107+
computationRequestMap.clear();
108+
CommitWorkRequest.Builder commitRequestBuilder = CommitWorkRequest.newBuilder();
109+
long commitBytes = 0;
110+
// Block until we have a commit, then batch with additional commits.
111+
Commit commit;
112+
try {
113+
commit = commitQueue.take();
114+
} catch (InterruptedException e) {
115+
Thread.currentThread().interrupt();
116+
continue;
117+
}
118+
while (commit != null) {
119+
ComputationState computationState = commit.computationState();
120+
commit.work().setState(Work.State.COMMITTING);
121+
Windmill.ComputationCommitWorkRequest.Builder computationRequestBuilder =
122+
computationRequestMap.get(computationState);
123+
if (computationRequestBuilder == null) {
124+
computationRequestBuilder = commitRequestBuilder.addRequestsBuilder();
125+
computationRequestBuilder.setComputationId(computationState.getComputationId());
126+
computationRequestMap.put(computationState, computationRequestBuilder);
127+
}
128+
computationRequestBuilder.addRequests(commit.request());
129+
// Send the request if we've exceeded the bytes or there is no more
130+
// pending work. commitBytes is a long, so this cannot overflow.
131+
commitBytes += commit.getSize();
132+
if (commitBytes >= TARGET_COMMIT_BUNDLE_BYTES) {
133+
break;
134+
}
135+
commit = commitQueue.poll();
136+
}
137+
commitWork(commitRequestBuilder.build(), commitBytes);
138+
completeWork(computationRequestMap);
139+
}
140+
}
141+
142+
private void commitWork(CommitWorkRequest commitRequest, long commitBytes) {
143+
LOG.trace("Commit: {}", commitRequest);
144+
activeCommitBytes.addAndGet(commitBytes);
145+
commitWorkFn.accept(commitRequest);
146+
activeCommitBytes.addAndGet(-commitBytes);
147+
}
148+
149+
private void completeWork(
150+
Map<ComputationState, Windmill.ComputationCommitWorkRequest.Builder> committedWork) {
151+
for (Map.Entry<ComputationState, Windmill.ComputationCommitWorkRequest.Builder> entry :
152+
committedWork.entrySet()) {
153+
for (Windmill.WorkItemCommitRequest workRequest : entry.getValue().getRequestsList()) {
154+
// Appliance errors are propagated by exception on entire batch.
155+
onCommitComplete.accept(
156+
CompleteCommit.create(
157+
entry.getKey().getComputationId(),
158+
ShardedKey.create(workRequest.getKey(), workRequest.getShardingKey()),
159+
WorkId.builder()
160+
.setCacheToken(workRequest.getCacheToken())
161+
.setWorkToken(workRequest.getWorkToken())
162+
.build(),
163+
Windmill.CommitStatus.OK));
164+
}
165+
}
166+
}
167+
}

0 commit comments

Comments
 (0)