Skip to content

Commit c3b3fa6

Browse files
authored
Include byte size of stateKey in estimated weight of WindmillBag, WindmillValue, and WindmillWatermarkHold (#30654)
* Update WindmillBag.java Include byte size of the stateKey on the BagState weight used to estimate and limit the total state cache size * Update WindmillValue.java Include stateKey size in the byte size of a WidnmillValue * Update WindmillWatermarkHold.java Include keyState size in the WatermarkHold estimated byte size * Fix formatting issue * Fix expected cache item weights in WindmillStateInternalsTest
1 parent 63aff7e commit c3b3fa6

File tree

4 files changed

+13
-12
lines changed

4 files changed

+13
-12
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyA
193193
}
194194
// We now know the complete bag contents, and any read on it will yield a
195195
// cached value, so cache it for future reads.
196-
cache.put(namespace, address, this, encodedSize);
196+
cache.put(namespace, address, this, encodedSize + stateKey.size());
197197
}
198198

199199
// Don't reuse the localAdditions object; we don't want future changes to it to

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ protected Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForK
124124
coder.encode(value, stream, Coder.Context.OUTER);
125125
}
126126
encoded = stream.toByteString();
127-
cachedSize = encoded.size();
127+
cachedSize = (long) encoded.size() + stateKey.size();
128128
}
129129

130130
// Place in cache to avoid a future read.

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,14 @@ public Future<Windmill.WorkItemCommitRequest> persist(
175175
throw new IllegalStateException("Unreachable condition");
176176
}
177177

178+
final int estimatedByteSize = ENCODED_SIZE + stateKey.size();
178179
return Futures.lazyTransform(
179180
result,
180181
result1 -> {
181182
cleared = false;
182183
localAdditions = null;
183184
if (cachedValue != null) {
184-
cache.put(namespace, address, WindmillWatermarkHold.this, ENCODED_SIZE);
185+
cache.put(namespace, address, WindmillWatermarkHold.this, estimatedByteSize);
185186
}
186187
return result1;
187188
});

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -3043,15 +3043,15 @@ public void testCachedValue() throws Exception {
30433043
value.write("Hi");
30443044
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
30453045

3046-
assertEquals(132, cache.getWeight());
3046+
assertEquals(141, cache.getWeight());
30473047

30483048
resetUnderTest();
30493049
value = underTest.state(NAMESPACE, addr);
30503050
assertEquals("Hi", value.read());
30513051
value.clear();
30523052
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
30533053

3054-
assertEquals(130, cache.getWeight());
3054+
assertEquals(139, cache.getWeight());
30553055

30563056
resetUnderTest();
30573057
value = underTest.state(NAMESPACE, addr);
@@ -3083,7 +3083,7 @@ public void testCachedBag() throws Exception {
30833083

30843084
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
30853085

3086-
assertEquals(140, cache.getWeight());
3086+
assertEquals(147, cache.getWeight());
30873087

30883088
resetUnderTest();
30893089
bag = underTest.state(NAMESPACE, addr);
@@ -3103,7 +3103,7 @@ public void testCachedBag() throws Exception {
31033103

31043104
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
31053105

3106-
assertEquals(133, cache.getWeight());
3106+
assertEquals(140, cache.getWeight());
31073107

31083108
resetUnderTest();
31093109
bag = underTest.state(NAMESPACE, addr);
@@ -3114,7 +3114,7 @@ public void testCachedBag() throws Exception {
31143114

31153115
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
31163116

3117-
assertEquals(134, cache.getWeight());
3117+
assertEquals(141, cache.getWeight());
31183118

31193119
resetUnderTest();
31203120
bag = underTest.state(NAMESPACE, addr);
@@ -3145,7 +3145,7 @@ public void testCachedWatermarkHold() throws Exception {
31453145

31463146
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
31473147

3148-
assertEquals(138, cache.getWeight());
3148+
assertEquals(151, cache.getWeight());
31493149

31503150
resetUnderTest();
31513151
hold = underTest.state(NAMESPACE, addr);
@@ -3154,7 +3154,7 @@ public void testCachedWatermarkHold() throws Exception {
31543154

31553155
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
31563156

3157-
assertEquals(138, cache.getWeight());
3157+
assertEquals(151, cache.getWeight());
31583158

31593159
resetUnderTest();
31603160
hold = underTest.state(NAMESPACE, addr);
@@ -3185,7 +3185,7 @@ public void testCachedCombining() throws Exception {
31853185

31863186
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
31873187

3188-
assertEquals(131, cache.getWeight());
3188+
assertEquals(144, cache.getWeight());
31893189

31903190
resetUnderTest();
31913191
value = underTest.state(NAMESPACE, COMBINING_ADDR);
@@ -3196,7 +3196,7 @@ public void testCachedCombining() throws Exception {
31963196

31973197
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
31983198

3199-
assertEquals(130, cache.getWeight());
3199+
assertEquals(143, cache.getWeight());
32003200

32013201
resetUnderTest();
32023202
value = underTest.state(NAMESPACE, COMBINING_ADDR);

0 commit comments

Comments
 (0)