67
67
import org .apache .beam .sdk .transforms .windowing .DefaultTrigger ;
68
68
import org .apache .beam .sdk .transforms .windowing .GlobalWindows ;
69
69
import org .apache .beam .sdk .transforms .windowing .Repeatedly ;
70
+ import org .apache .beam .sdk .transforms .windowing .TimestampCombiner ;
70
71
import org .apache .beam .sdk .transforms .windowing .Window ;
71
72
import org .apache .beam .sdk .util .Preconditions ;
72
73
import org .apache .beam .sdk .values .KV ;
@@ -340,7 +341,8 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
340
341
AfterProcessingTime .pastFirstElementInPane ()
341
342
.plusDelayOf (triggeringFrequency ),
342
343
AfterPane .elementCountAtLeast (FILE_TRIGGERING_RECORD_COUNT ))))
343
- .discardingFiredPanes ());
344
+ .discardingFiredPanes ()
345
+ .withTimestampCombiner (TimestampCombiner .EARLIEST ));
344
346
results = writeStaticallyShardedFiles (inputInGlobalWindow , tempFilePrefixView );
345
347
} else {
346
348
// In the case of dynamic sharding, however, we use a default trigger since the transform
@@ -364,7 +366,8 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
364
366
Repeatedly .forever (
365
367
AfterProcessingTime .pastFirstElementInPane ()
366
368
.plusDelayOf (triggeringFrequency )))
367
- .discardingFiredPanes ());
369
+ .discardingFiredPanes ()
370
+ .withTimestampCombiner (TimestampCombiner .EARLIEST ));
368
371
369
372
TupleTag <KV <ShardedKey <DestinationT >, WritePartition .Result >> multiPartitionsTag =
370
373
new TupleTag <>("multiPartitionsTag" );
0 commit comments