Skip to content

Commit eed6dc7

Browse files
authored
Flink: Backport add StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE to Flink 1.19 (#12899)
backports #12839
1 parent 68b016c commit eed6dc7

File tree

4 files changed

+91
-16
lines changed

4 files changed

+91
-16
lines changed

flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,21 @@ public enum StreamingStartingStrategy {
3030
/**
3131
* Start incremental mode from the latest snapshot inclusive.
3232
*
33-
* <p>If it is an empty map, all future append snapshots should be discovered.
33+
* <p>If it is an empty table, all future append snapshots should be discovered.
3434
*/
3535
INCREMENTAL_FROM_LATEST_SNAPSHOT,
3636

37+
/**
38+
* Start incremental mode from the latest snapshot exclusive.
39+
*
40+
* <p>If it is an empty table, all future append snapshots should be discovered.
41+
*/
42+
INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE,
43+
3744
/**
3845
* Start incremental mode from the earliest snapshot inclusive.
3946
*
40-
* <p>If it is an empty map, all future append snapshots should be discovered.
47+
* <p>If it is an empty table, all future append snapshots should be discovered.
4148
*/
4249
INCREMENTAL_FROM_EARLIEST_SNAPSHOT,
4350

flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ private ContinuousEnumerationResult discoverInitialSplits() {
165165
"Get starting snapshot id {} based on strategy {}",
166166
startSnapshot.snapshotId(),
167167
scanContext.streamingStartingStrategy());
168-
List<IcebergSourceSplit> splits;
168+
List<IcebergSourceSplit> splits = Collections.emptyList();
169169
IcebergEnumeratorPosition toPosition;
170170
if (scanContext.streamingStartingStrategy()
171171
== StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
@@ -180,10 +180,17 @@ private ContinuousEnumerationResult discoverInitialSplits() {
180180
// For TABLE_SCAN_THEN_INCREMENTAL, incremental mode starts exclusive from the startSnapshot
181181
toPosition =
182182
IcebergEnumeratorPosition.of(startSnapshot.snapshotId(), startSnapshot.timestampMillis());
183+
} else if (scanContext.streamingStartingStrategy()
184+
== StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE) {
185+
toPosition =
186+
IcebergEnumeratorPosition.of(startSnapshot.snapshotId(), startSnapshot.timestampMillis());
187+
LOG.info(
188+
"Start incremental scan with start snapshot (exclusive): id = {}, timestamp = {}",
189+
startSnapshot.snapshotId(),
190+
startSnapshot.timestampMillis());
183191
} else {
184192
// For all other modes, starting snapshot should be consumed inclusively.
185193
// Use parentId to achieve the inclusive behavior. It is fine if parentId is null.
186-
splits = Collections.emptyList();
187194
Long parentSnapshotId = startSnapshot.parentId();
188195
if (parentSnapshotId != null) {
189196
Snapshot parentSnapshot = table.snapshot(parentSnapshotId);
@@ -216,6 +223,7 @@ static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
216223
switch (scanContext.streamingStartingStrategy()) {
217224
case TABLE_SCAN_THEN_INCREMENTAL:
218225
case INCREMENTAL_FROM_LATEST_SNAPSHOT:
226+
case INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE:
219227
return Optional.ofNullable(table.currentSnapshot());
220228
case INCREMENTAL_FROM_EARLIEST_SNAPSHOT:
221229
return Optional.ofNullable(SnapshotUtil.oldestAncestor(table));

flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java

+47-6
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
import org.junit.jupiter.api.Test;
4545
import org.junit.jupiter.api.extension.RegisterExtension;
4646
import org.junit.jupiter.api.io.TempDir;
47+
import org.junit.jupiter.params.ParameterizedTest;
48+
import org.junit.jupiter.params.provider.EnumSource;
4749

4850
public class TestContinuousSplitPlannerImpl {
4951
@TempDir protected Path temporaryFolder;
@@ -173,13 +175,14 @@ public void testTableScanThenIncrementalWithNonEmptyTable() throws Exception {
173175
}
174176
}
175177

176-
@Test
177-
public void testIncrementalFromLatestSnapshotWithEmptyTable() throws Exception {
178+
@ParameterizedTest
179+
@EnumSource(
180+
value = StreamingStartingStrategy.class,
181+
names = {"INCREMENTAL_FROM_LATEST_SNAPSHOT", "INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE"})
182+
public void testIncrementalFromLatestSnapshotWithEmptyTable(
183+
StreamingStartingStrategy startingStrategy) throws Exception {
178184
ScanContext scanContext =
179-
ScanContext.builder()
180-
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
181-
.splitSize(1L)
182-
.build();
185+
ScanContext.builder().startingStrategy(startingStrategy).splitSize(1L).build();
183186
ContinuousSplitPlannerImpl splitPlanner =
184187
new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
185188

@@ -256,6 +259,44 @@ public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio
256259
}
257260
}
258261

262+
@Test
263+
public void testIncrementalFromLatestSnapshotExclusiveWithNonEmptyTable() throws Exception {
264+
appendTwoSnapshots();
265+
266+
ScanContext scanContext =
267+
ScanContext.builder()
268+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE)
269+
.build();
270+
ContinuousSplitPlannerImpl splitPlanner =
271+
new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
272+
273+
ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
274+
assertThat(initialResult.splits()).isEmpty();
275+
assertThat(initialResult.fromPosition()).isNull();
276+
// For exclusive behavior, the initial result should point to snapshot2
277+
assertThat(initialResult.toPosition().snapshotId().longValue())
278+
.isEqualTo(snapshot2.snapshotId());
279+
assertThat(initialResult.toPosition().snapshotTimestampMs().longValue())
280+
.isEqualTo(snapshot2.timestampMillis());
281+
282+
// Then the next incremental scan shall discover no files
283+
ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
284+
assertThat(initialResult.splits()).isEmpty();
285+
assertThat(secondResult.fromPosition().snapshotId().longValue())
286+
.isEqualTo(snapshot2.snapshotId());
287+
assertThat(secondResult.fromPosition().snapshotTimestampMs().longValue())
288+
.isEqualTo(snapshot2.timestampMillis());
289+
assertThat(secondResult.toPosition().snapshotId().longValue())
290+
.isEqualTo(snapshot2.snapshotId());
291+
assertThat(secondResult.toPosition().snapshotTimestampMs().longValue())
292+
.isEqualTo(snapshot2.timestampMillis());
293+
294+
IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
295+
for (int i = 0; i < 3; ++i) {
296+
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
297+
}
298+
}
299+
259300
@Test
260301
public void testIncrementalFromEarliestSnapshotWithEmptyTable() throws Exception {
261302
ScanContext scanContext =

flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java

+25-6
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.junit.jupiter.api.Test;
3838
import org.junit.jupiter.api.extension.RegisterExtension;
3939
import org.junit.jupiter.api.io.TempDir;
40+
import org.junit.jupiter.params.ParameterizedTest;
41+
import org.junit.jupiter.params.provider.EnumSource;
4042

4143
public class TestContinuousSplitPlannerImplStartStrategy {
4244
private static final FileFormat FILE_FORMAT = FileFormat.PARQUET;
@@ -88,13 +90,14 @@ public void testTableScanThenIncrementalStrategy() throws IOException {
8890
assertThat(startSnapshot.snapshotId()).isEqualTo(snapshot3.snapshotId());
8991
}
9092

91-
@Test
92-
public void testForLatestSnapshotStrategy() throws IOException {
93+
@ParameterizedTest
94+
@EnumSource(
95+
value = StreamingStartingStrategy.class,
96+
names = {"INCREMENTAL_FROM_LATEST_SNAPSHOT", "INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE"})
97+
public void testForLatestSnapshotStrategyWithEmptyTable(
98+
StreamingStartingStrategy startingStrategy) throws IOException {
9399
ScanContext scanContext =
94-
ScanContext.builder()
95-
.streaming(true)
96-
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
97-
.build();
100+
ScanContext.builder().streaming(true).startingStrategy(startingStrategy).build();
98101

99102
assertThat(ContinuousSplitPlannerImpl.startSnapshot(TABLE_RESOURCE.table(), scanContext))
100103
.isNotPresent();
@@ -105,6 +108,22 @@ public void testForLatestSnapshotStrategy() throws IOException {
105108
assertThat(startSnapshot.snapshotId()).isEqualTo(snapshot3.snapshotId());
106109
}
107110

111+
@ParameterizedTest
112+
@EnumSource(
113+
value = StreamingStartingStrategy.class,
114+
names = {"INCREMENTAL_FROM_LATEST_SNAPSHOT", "INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE"})
115+
public void testForLatestSnapshotStrategyWithNonEmptyTable(
116+
StreamingStartingStrategy startingStrategy) throws IOException {
117+
appendThreeSnapshots();
118+
119+
ScanContext scanContext =
120+
ScanContext.builder().streaming(true).startingStrategy(startingStrategy).build();
121+
122+
Snapshot startSnapshot =
123+
ContinuousSplitPlannerImpl.startSnapshot(TABLE_RESOURCE.table(), scanContext).get();
124+
assertThat(startSnapshot.snapshotId()).isEqualTo(snapshot3.snapshotId());
125+
}
126+
108127
@Test
109128
public void testForEarliestSnapshotStrategy() throws IOException {
110129
ScanContext scanContext =

0 commit comments

Comments
 (0)