Skip to content

Commit 4fc5c86

Browse files
authored
[#31438] WindowingStrategy Plumbing + AllowedLateness + Fixing Session Windows (#33542)
1 parent a19466a commit 4fc5c86

File tree

9 files changed

+129
-167
lines changed

9 files changed

+129
-167
lines changed

runners/prism/java/build.gradle

-15
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,6 @@ def sickbayTests = [
9696
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly',
9797
'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle',
9898
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
99-
// Requires Allowed Lateness, among others.
100-
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
101-
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
10299
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
103100
'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
104101
'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
@@ -116,10 +113,6 @@ def sickbayTests = [
116113
// Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected
117114
'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage',
118115

119-
// Prism not firing sessions correctly (seems to be merging inapppropriately)
120-
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine',
121-
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext',
122-
123116
// Java side dying during execution.
124117
// https://github.com/apache/beam/issues/32930
125118
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
@@ -161,14 +154,6 @@ def sickbayTests = [
161154
// TODO(https://github.com/apache/beam/issues/31231)
162155
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',
163156

164-
165-
// These tests fail once Late Data was being precisely dropped.
166-
// They set a single element to be late data, and expect it (correctly) to be preserved.
167-
// Since presently, these are treated as No-ops, the fix is to disable the
168-
// dropping behavior when a stage's input is a Reshuffle/Redistribute transform.
169-
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
170-
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming',
171-
172157
// Prism isn't handling Java's side input views properly.
173158
// https://github.com/apache/beam/issues/32932
174159
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

+21-23
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,10 @@ func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, side
261261

262262
// StageAggregates marks the given stage as an aggregation, which
263263
// means elements will only be processed based on windowing strategies.
264-
func (em *ElementManager) StageAggregates(ID string) {
265-
em.stages[ID].aggregate = true
264+
func (em *ElementManager) StageAggregates(ID string, strat WinStrat) {
265+
ss := em.stages[ID]
266+
ss.aggregate = true
267+
ss.strat = strat
266268
}
267269

268270
// StageStateful marks the given stage as stateful, which means elements are
@@ -1095,7 +1097,7 @@ type stageState struct {
10951097
// Special handling bits
10961098
stateful bool // whether this stage uses state or timers, and needs keyed processing.
10971099
aggregate bool // whether this stage needs to block for aggregation.
1098-
strat winStrat // Windowing Strategy for aggregation fireings.
1100+
strat WinStrat // Windowing Strategy for aggregation fireings.
10991101
processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain.
11001102

11011103
// onWindowExpiration management
@@ -1154,7 +1156,6 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
11541156
ID: ID,
11551157
outputIDs: outputIDs,
11561158
sides: sides,
1157-
strat: defaultStrat{},
11581159
state: map[LinkID]map[typex.Window]map[string]StateData{},
11591160
watermarkHolds: newHoldTracker(),
11601161

@@ -1179,18 +1180,21 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
11791180
func (ss *stageState) AddPending(newPending []element) int {
11801181
ss.mu.Lock()
11811182
defer ss.mu.Unlock()
1182-
// TODO(#https://github.com/apache/beam/issues/31438):
1183-
// Adjust with AllowedLateness
1184-
// Data that arrives after the *output* watermark is late.
1185-
threshold := ss.output
1186-
origPending := make([]element, 0, ss.pending.Len())
1187-
for _, e := range newPending {
1188-
if e.window.MaxTimestamp() < threshold {
1189-
continue
1183+
if ss.aggregate {
1184+
// Late Data is data that has arrived after that window has expired.
1185+
// We only need to drop late data before aggregations.
1186+
// TODO - handle for side inputs too.
1187+
threshold := ss.output
1188+
origPending := make([]element, 0, ss.pending.Len())
1189+
for _, e := range newPending {
1190+
if ss.strat.EarliestCompletion(e.window) < threshold {
1191+
// TODO: figure out Pane and trigger firings.
1192+
continue
1193+
}
1194+
origPending = append(origPending, e)
11901195
}
1191-
origPending = append(origPending, e)
1196+
newPending = origPending
11921197
}
1193-
newPending = origPending
11941198
if ss.stateful {
11951199
if ss.pendingByKeys == nil {
11961200
ss.pendingByKeys = map[string]*dataAndTimers{}
@@ -1626,10 +1630,8 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
16261630
// They'll never be read in again.
16271631
for _, wins := range ss.sideInputs {
16281632
for win := range wins {
1629-
// TODO(#https://github.com/apache/beam/issues/31438):
1630-
// Adjust with AllowedLateness
16311633
// Clear out anything we've already used.
1632-
if win.MaxTimestamp() < newOut {
1634+
if ss.strat.EarliestCompletion(win) < newOut {
16331635
// If the expiry is in progress, skip this window.
16341636
if ss.inProgressExpiredWindows[win] > 0 {
16351637
continue
@@ -1640,9 +1642,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
16401642
}
16411643
for _, wins := range ss.state {
16421644
for win := range wins {
1643-
// TODO(#https://github.com/apache/beam/issues/31438):
1644-
// Adjust with AllowedLateness
1645-
if win.MaxTimestamp() < newOut {
1645+
if ss.strat.EarliestCompletion(win) < newOut {
16461646
// If the expiry is in progress, skip collecting this window.
16471647
if ss.inProgressExpiredWindows[win] > 0 {
16481648
continue
@@ -1685,9 +1685,7 @@ func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *Ele
16851685
var preventDownstreamUpdate bool
16861686
for win, keys := range ss.keysToExpireByWindow {
16871687
// Check if the window has expired.
1688-
// TODO(#https://github.com/apache/beam/issues/31438):
1689-
// Adjust with AllowedLateness
1690-
if win.MaxTimestamp() >= newOut {
1688+
if ss.strat.EarliestCompletion(win) >= newOut {
16911689
continue
16921690
}
16931691
// We can't advance the output watermark if there's garbage to collect.

sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go

+8-20
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,16 @@ import (
2323
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
2424
)
2525

26-
type winStrat interface {
27-
EarliestCompletion(typex.Window) mtime.Time
26+
// WinStrat configures the windowing strategy for the stage, based on the
27+
// stage's input PCollection.
28+
type WinStrat struct {
29+
AllowedLateness time.Duration // Used to extend duration
2830
}
2931

30-
type defaultStrat struct{}
31-
32-
func (ws defaultStrat) EarliestCompletion(w typex.Window) mtime.Time {
33-
return w.MaxTimestamp()
34-
}
35-
36-
func (defaultStrat) String() string {
37-
return "default"
38-
}
39-
40-
type sessionStrat struct {
41-
GapSize time.Duration
42-
}
43-
44-
func (ws sessionStrat) EarliestCompletion(w typex.Window) mtime.Time {
45-
return w.MaxTimestamp().Add(ws.GapSize)
32+
func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
33+
return w.MaxTimestamp().Add(ws.AllowedLateness)
4634
}
4735

48-
func (ws sessionStrat) String() string {
49-
return fmt.Sprintf("session[GapSize:%v]", ws.GapSize)
36+
func (ws WinStrat) String() string {
37+
return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
5038
}

sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,16 @@ import (
2626

2727
func TestEarliestCompletion(t *testing.T) {
2828
tests := []struct {
29-
strat winStrat
29+
strat WinStrat
3030
input typex.Window
3131
want mtime.Time
3232
}{
33-
{defaultStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime},
34-
{defaultStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
35-
{defaultStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1},
36-
{sessionStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
37-
{sessionStrat{GapSize: 3 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 6},
33+
{WinStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime},
34+
{WinStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
35+
{WinStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1},
36+
{WinStrat{AllowedLateness: 5 * time.Second}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime.Add(5 * time.Second)},
37+
{WinStrat{AllowedLateness: 5 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 8},
38+
{WinStrat{AllowedLateness: 5 * time.Second}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp.Add(5 * time.Second)},
3839
}
3940

4041
for _, test := range tests {

sdks/go/pkg/beam/runners/prism/internal/execute.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
223223
KeyDec: kd,
224224
}
225225
}
226-
em.StageAggregates(stage.ID)
226+
ws := windowingStrategy(comps, tid)
227+
em.StageAggregates(stage.ID, engine.WinStrat{
228+
AllowedLateness: time.Duration(ws.GetAllowedLateness()) * time.Millisecond,
229+
})
227230
case urns.TransformImpulse:
228231
impulses = append(impulses, stage.ID)
229232
em.AddStage(stage.ID, nil, []string{getOnlyValue(t.GetOutputs())}, nil)
@@ -266,11 +269,11 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
266269
case *pipepb.TestStreamPayload_Event_ElementEvent:
267270
var elms []engine.TestStreamElement
268271
for _, e := range ev.ElementEvent.GetElements() {
269-
elms = append(elms, engine.TestStreamElement{Encoded: mayLP(e.GetEncodedElement()), EventTime: mtime.Time(e.GetTimestamp())})
272+
elms = append(elms, engine.TestStreamElement{Encoded: mayLP(e.GetEncodedElement()), EventTime: mtime.FromMilliseconds(e.GetTimestamp())})
270273
}
271274
tsb.AddElementEvent(ev.ElementEvent.GetTag(), elms)
272275
case *pipepb.TestStreamPayload_Event_WatermarkEvent:
273-
tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.Time(ev.WatermarkEvent.GetNewWatermark()))
276+
tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.FromMilliseconds(ev.WatermarkEvent.GetNewWatermark()))
274277
case *pipepb.TestStreamPayload_Event_ProcessingTimeEvent:
275278
if ev.ProcessingTimeEvent.GetAdvanceDuration() == int64(mtime.MaxTimestamp) {
276279
// TODO: Determine the SDK common formalism for setting processing time to infinity.

0 commit comments

Comments
 (0)