Skip to content

Commit 6509e51

Browse files
authored
[#33513][prism]Handle Time sorted requirement and drop late data. (#33515)
1 parent 6618968 commit 6509e51

File tree

5 files changed

+33
-11
lines changed

5 files changed

+33
-11
lines changed

runners/prism/java/build.gradle

+9-7
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ def sickbayTests = [
9898
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
9999
// Requires Allowed Lateness, among others.
100100
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
101+
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
101102
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
102103
'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
103104
'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
@@ -160,6 +161,14 @@ def sickbayTests = [
160161
// TODO(https://github.com/apache/beam/issues/31231)
161162
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',
162163

164+
165+
// These tests fail once Late Data was being precisely dropped.
166+
// They set a single element to be late data, and expect it (correctly) to be preserved.
167+
// Since presently, these are treated as No-ops, the fix is to disable the
168+
// dropping behavior when a stage's input is a Reshuffle/Redistribute transform.
169+
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
170+
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming',
171+
163172
// Prism isn't handling Java's side input views properly.
164173
// https://github.com/apache/beam/issues/32932
165174
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
@@ -177,13 +186,6 @@ def sickbayTests = [
177186
// java.lang.IllegalStateException: java.io.EOFException
178187
'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables',
179188

180-
// Requires Time Sorted Input
181-
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput',
182-
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream',
183-
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
184-
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData',
185-
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData',
186-
187189
// Missing output due to processing time timer skew.
188190
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',
189191

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

+12
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,18 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
11791179
func (ss *stageState) AddPending(newPending []element) int {
11801180
ss.mu.Lock()
11811181
defer ss.mu.Unlock()
1182+
// TODO(#https://github.com/apache/beam/issues/31438):
1183+
// Adjust with AllowedLateness
1184+
// Data that arrives after the *output* watermark is late.
1185+
threshold := ss.output
1186+
origPending := make([]element, 0, ss.pending.Len())
1187+
for _, e := range newPending {
1188+
if e.window.MaxTimestamp() < threshold {
1189+
continue
1190+
}
1191+
origPending = append(origPending, e)
1192+
}
1193+
newPending = origPending
11821194
if ss.stateful {
11831195
if ss.pendingByKeys == nil {
11841196
ss.pendingByKeys = map[string]*dataAndTimers{}

sdks/go/pkg/beam/runners/prism/internal/handlepardo.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,12 @@ func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb
8484

8585
// At their simplest, we don't need to do anything special at pre-processing time, and simply pass through as normal.
8686

87-
// StatefulDoFns need to be marked as being roots.
87+
// ForceRoots cause fusion breaks in the optimized graph.
88+
// StatefulDoFns need to be marked as being roots, for correct per-key state handling.
89+
// Prism already sorts input elements for a stage by EventTime, so a fusion break enables the sorted behavior.
8890
var forcedRoots []string
89-
if len(pdo.StateSpecs)+len(pdo.TimerFamilySpecs) > 0 {
91+
if len(pdo.GetStateSpecs())+len(pdo.GetTimerFamilySpecs()) > 0 ||
92+
pdo.GetRequiresTimeSortedInput() {
9093
forcedRoots = append(forcedRoots, tid)
9194
}
9295

sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ var supportedRequirements = map[string]struct{}{
4747
urns.RequirementStatefulProcessing: {},
4848
urns.RequirementBundleFinalization: {},
4949
urns.RequirementOnWindowExpiration: {},
50+
urns.RequirementTimeSortedInput: {},
5051
}
5152

5253
// TODO, move back to main package, and key off of executor handlers?

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -3764,7 +3764,9 @@ public void testRequiresTimeSortedInputWithLateData() {
37643764
if (stamp == 100) {
37653765
// advance watermark when we have 100 remaining elements
37663766
// all the rest are going to be late elements
3767-
input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
3767+
input =
3768+
input.advanceWatermarkTo(
3769+
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1)));
37683770
}
37693771
}
37703772
testTimeSortedInput(
@@ -3796,7 +3798,9 @@ public void testTwoRequiresTimeSortedInputWithLateData() {
37963798
if (stamp == 100) {
37973799
// advance watermark when we have 100 remaining elements
37983800
// all the rest are going to be late elements
3799-
input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
3801+
input =
3802+
input.advanceWatermarkTo(
3803+
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1)));
38003804
}
38013805
}
38023806
// apply the sorted function for the first time

0 commit comments

Comments
 (0)