Skip to content

Commit 61cf27f

Browse files
authored
[IcebergIO] Apply delete/update filter when reading (#34607)
* apply delete filter * revert cdc changes * spotless * trigger integration tests
1 parent bbb6d83 commit 61cf27f

File tree

2 files changed

+5
-2
lines changed

2 files changed

+5
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 4
3+
"modification": 3
44
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.iceberg.Table;
3333
import org.apache.iceberg.TableProperties;
3434
import org.apache.iceberg.avro.Avro;
35+
import org.apache.iceberg.data.GenericDeleteFilter;
3536
import org.apache.iceberg.data.IdentityPartitionConverters;
3637
import org.apache.iceberg.data.Record;
3738
import org.apache.iceberg.data.avro.DataReader;
@@ -175,7 +176,9 @@ public boolean advance() throws IOException {
175176
default:
176177
throw new UnsupportedOperationException("Cannot read format: " + file.format());
177178
}
178-
currentIterator = iterable.iterator();
179+
GenericDeleteFilter deleteFilter =
180+
new GenericDeleteFilter(checkStateNotNull(io), fileTask, fileTask.schema(), project);
181+
currentIterator = deleteFilter.filter(iterable).iterator();
179182

180183
} while (true);
181184

0 commit comments

Comments
 (0)