Skip to content

Commit 40151ab

Browse files
authored
Add Iceberg support for name-based mapping schema (#33315)
* Add Iceberg support for name-based mapping schema * Add nullable annotation * Add nested field * iceberg-gcp already as a runtimeOnly * Trigger IT tests
1 parent 6509e51 commit 40151ab

File tree

6 files changed

+244
-10
lines changed

6 files changed

+244
-10
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 3
3+
"modification": 4
44
}

sdks/java/io/iceberg/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ dependencies {
6565
testImplementation library.java.bigdataoss_gcsio
6666
testImplementation library.java.bigdataoss_gcs_connector
6767
testImplementation library.java.bigdataoss_util_hadoop
68+
testImplementation "org.apache.parquet:parquet-avro:$parquet_version"
69+
testImplementation "org.apache.parquet:parquet-common:$parquet_version"
6870
testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
6971
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
7072
testImplementation project(":sdks:java:extensions:google-cloud-platform-core")

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java

+28-9
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.iceberg.PartitionSpec;
3636
import org.apache.iceberg.Schema;
3737
import org.apache.iceberg.Table;
38+
import org.apache.iceberg.TableProperties;
3839
import org.apache.iceberg.avro.Avro;
3940
import org.apache.iceberg.data.IdentityPartitionConverters;
4041
import org.apache.iceberg.data.Record;
@@ -47,6 +48,7 @@
4748
import org.apache.iceberg.io.CloseableIterator;
4849
import org.apache.iceberg.io.FileIO;
4950
import org.apache.iceberg.io.InputFile;
51+
import org.apache.iceberg.mapping.NameMappingParser;
5052
import org.apache.iceberg.orc.ORC;
5153
import org.apache.iceberg.parquet.Parquet;
5254
import org.apache.iceberg.types.Type;
@@ -98,6 +100,8 @@ public boolean advance() throws IOException {
98100
// which are not null-safe.
99101
@SuppressWarnings("nullness")
100102
org.apache.iceberg.@NonNull Schema project = this.project;
103+
@Nullable
104+
String nameMapping = source.getTable().properties().get(TableProperties.DEFAULT_NAME_MAPPING);
101105

102106
do {
103107
// If our current iterator is working... do that.
@@ -129,37 +133,52 @@ public boolean advance() throws IOException {
129133
switch (file.format()) {
130134
case ORC:
131135
LOG.info("Preparing ORC input");
132-
iterable =
136+
ORC.ReadBuilder orcReader =
133137
ORC.read(input)
134138
.split(fileTask.start(), fileTask.length())
135139
.project(project)
136140
.createReaderFunc(
137141
fileSchema ->
138142
GenericOrcReader.buildReader(project, fileSchema, idToConstants))
139-
.filter(fileTask.residual())
140-
.build();
143+
.filter(fileTask.residual());
144+
145+
if (nameMapping != null) {
146+
orcReader.withNameMapping(NameMappingParser.fromJson(nameMapping));
147+
}
148+
149+
iterable = orcReader.build();
141150
break;
142151
case PARQUET:
143152
LOG.info("Preparing Parquet input.");
144-
iterable =
153+
Parquet.ReadBuilder parquetReader =
145154
Parquet.read(input)
146155
.split(fileTask.start(), fileTask.length())
147156
.project(project)
148157
.createReaderFunc(
149158
fileSchema ->
150159
GenericParquetReaders.buildReader(project, fileSchema, idToConstants))
151-
.filter(fileTask.residual())
152-
.build();
160+
.filter(fileTask.residual());
161+
162+
if (nameMapping != null) {
163+
parquetReader.withNameMapping(NameMappingParser.fromJson(nameMapping));
164+
}
165+
166+
iterable = parquetReader.build();
153167
break;
154168
case AVRO:
155169
LOG.info("Preparing Avro input.");
156-
iterable =
170+
Avro.ReadBuilder avroReader =
157171
Avro.read(input)
158172
.split(fileTask.start(), fileTask.length())
159173
.project(project)
160174
.createReaderFunc(
161-
fileSchema -> DataReader.create(project, fileSchema, idToConstants))
162-
.build();
175+
fileSchema -> DataReader.create(project, fileSchema, idToConstants));
176+
177+
if (nameMapping != null) {
178+
avroReader.withNameMapping(NameMappingParser.fromJson(nameMapping));
179+
}
180+
181+
iterable = avroReader.build();
163182
break;
164183
default:
165184
throw new UnsupportedOperationException("Cannot read format: " + file.format());

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java

+202
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
import static org.hamcrest.MatcherAssert.assertThat;
2222
import static org.hamcrest.Matchers.containsInAnyOrder;
2323

24+
import java.io.File;
25+
import java.io.IOException;
2426
import java.util.Arrays;
2527
import java.util.Collection;
2628
import java.util.List;
2729
import java.util.Map;
2830
import java.util.UUID;
2931
import java.util.stream.Collectors;
3032
import java.util.stream.Stream;
33+
import org.apache.avro.generic.GenericData;
34+
import org.apache.avro.generic.GenericRecord;
3135
import org.apache.beam.sdk.coders.RowCoder;
3236
import org.apache.beam.sdk.schemas.Schema;
3337
import org.apache.beam.sdk.testing.PAssert;
@@ -36,13 +40,31 @@
3640
import org.apache.beam.sdk.transforms.ParDo;
3741
import org.apache.beam.sdk.values.PCollection;
3842
import org.apache.beam.sdk.values.Row;
43+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
3944
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
45+
import org.apache.hadoop.conf.Configuration;
46+
import org.apache.hadoop.fs.Path;
4047
import org.apache.iceberg.CatalogUtil;
48+
import org.apache.iceberg.DataFile;
49+
import org.apache.iceberg.DataFiles;
50+
import org.apache.iceberg.FileFormat;
51+
import org.apache.iceberg.Metrics;
52+
import org.apache.iceberg.MetricsConfig;
4153
import org.apache.iceberg.PartitionKey;
4254
import org.apache.iceberg.PartitionSpec;
4355
import org.apache.iceberg.Table;
56+
import org.apache.iceberg.TableProperties;
4457
import org.apache.iceberg.catalog.TableIdentifier;
58+
import org.apache.iceberg.data.Record;
59+
import org.apache.iceberg.hadoop.HadoopInputFile;
60+
import org.apache.iceberg.mapping.MappedField;
61+
import org.apache.iceberg.mapping.MappedFields;
62+
import org.apache.iceberg.mapping.NameMapping;
63+
import org.apache.iceberg.mapping.NameMappingParser;
64+
import org.apache.iceberg.parquet.ParquetUtil;
4565
import org.apache.iceberg.types.Types;
66+
import org.apache.parquet.avro.AvroParquetWriter;
67+
import org.apache.parquet.hadoop.ParquetWriter;
4668
import org.junit.ClassRule;
4769
import org.junit.Rule;
4870
import org.junit.Test;
@@ -210,4 +232,184 @@ public void testIdentityColumnScan() throws Exception {
210232

211233
testPipeline.run();
212234
}
235+
236+
@Test
237+
public void testNameMappingScan() throws Exception {
238+
org.apache.avro.Schema metadataSchema =
239+
org.apache.avro.Schema.createRecord(
240+
"metadata",
241+
null,
242+
null,
243+
false,
244+
ImmutableList.of(
245+
new org.apache.avro.Schema.Field(
246+
"source",
247+
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
248+
null,
249+
null)));
250+
251+
org.apache.avro.Schema avroSchema =
252+
org.apache.avro.Schema.createRecord(
253+
"test",
254+
null,
255+
null,
256+
false,
257+
ImmutableList.of(
258+
new org.apache.avro.Schema.Field(
259+
"data",
260+
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
261+
null,
262+
null),
263+
new org.apache.avro.Schema.Field(
264+
"id",
265+
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
266+
null,
267+
null),
268+
new org.apache.avro.Schema.Field("metadata", metadataSchema, null, null)));
269+
270+
List<Map<String, Object>> recordData =
271+
ImmutableList.<Map<String, Object>>builder()
272+
.add(
273+
ImmutableMap.of(
274+
"id",
275+
0L,
276+
"data",
277+
"clarification",
278+
"metadata",
279+
ImmutableMap.of("source", "systemA")))
280+
.add(
281+
ImmutableMap.of(
282+
"id", 1L, "data", "risky", "metadata", ImmutableMap.of("source", "systemB")))
283+
.add(
284+
ImmutableMap.of(
285+
"id", 2L, "data", "falafel", "metadata", ImmutableMap.of("source", "systemC")))
286+
.build();
287+
288+
List<GenericRecord> avroRecords =
289+
recordData.stream()
290+
.map(data -> avroGenericRecord(avroSchema, data))
291+
.collect(Collectors.toList());
292+
293+
Configuration hadoopConf = new Configuration();
294+
String path = createParquetFile(avroSchema, avroRecords);
295+
HadoopInputFile inputFile = HadoopInputFile.fromLocation(path, hadoopConf);
296+
297+
NameMapping defaultMapping =
298+
NameMapping.of(
299+
MappedField.of(1, "id"),
300+
MappedField.of(2, "data"),
301+
MappedField.of(3, "metadata", MappedFields.of(MappedField.of(4, "source"))));
302+
ImmutableMap<String, String> tableProperties =
303+
ImmutableMap.<String, String>builder()
304+
.put(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(defaultMapping))
305+
.build();
306+
307+
TableIdentifier tableId =
308+
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
309+
Table simpleTable =
310+
warehouse
311+
.buildTable(tableId, TestFixtures.NESTED_SCHEMA)
312+
.withProperties(tableProperties)
313+
.withPartitionSpec(PartitionSpec.unpartitioned())
314+
.create();
315+
316+
MetricsConfig metricsConfig = MetricsConfig.forTable(simpleTable);
317+
Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig);
318+
DataFile dataFile =
319+
DataFiles.builder(PartitionSpec.unpartitioned())
320+
.withFormat(FileFormat.PARQUET)
321+
.withInputFile(inputFile)
322+
.withMetrics(metrics)
323+
.build();
324+
325+
final Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.NESTED_SCHEMA);
326+
327+
simpleTable.newFastAppend().appendFile(dataFile).commit();
328+
329+
Map<String, String> catalogProps =
330+
ImmutableMap.<String, String>builder()
331+
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
332+
.put("warehouse", warehouse.location)
333+
.build();
334+
335+
IcebergCatalogConfig catalogConfig =
336+
IcebergCatalogConfig.builder()
337+
.setCatalogName("name")
338+
.setCatalogProperties(catalogProps)
339+
.build();
340+
341+
PCollection<Row> output =
342+
testPipeline
343+
.apply(IcebergIO.readRows(catalogConfig).from(tableId))
344+
.apply(ParDo.of(new PrintRow()))
345+
.setCoder(RowCoder.of(beamSchema));
346+
347+
final Row[] expectedRows =
348+
recordData.stream()
349+
.map(data -> icebergGenericRecord(TestFixtures.NESTED_SCHEMA.asStruct(), data))
350+
.map(record -> IcebergUtils.icebergRecordToBeamRow(beamSchema, record))
351+
.toArray(Row[]::new);
352+
353+
PAssert.that(output)
354+
.satisfies(
355+
(Iterable<Row> rows) -> {
356+
assertThat(rows, containsInAnyOrder(expectedRows));
357+
return null;
358+
});
359+
360+
testPipeline.run();
361+
}
362+
363+
@SuppressWarnings("unchecked")
364+
public static GenericRecord avroGenericRecord(
365+
org.apache.avro.Schema schema, Map<String, Object> values) {
366+
GenericRecord record = new GenericData.Record(schema);
367+
for (org.apache.avro.Schema.Field field : schema.getFields()) {
368+
Object rawValue = values.get(field.name());
369+
Object avroValue =
370+
rawValue instanceof Map
371+
? avroGenericRecord(field.schema(), (Map<String, Object>) rawValue)
372+
: rawValue;
373+
record.put(field.name(), avroValue);
374+
}
375+
return record;
376+
}
377+
378+
@SuppressWarnings("unchecked")
379+
public static Record icebergGenericRecord(Types.StructType type, Map<String, Object> values) {
380+
org.apache.iceberg.data.GenericRecord record =
381+
org.apache.iceberg.data.GenericRecord.create(type);
382+
for (Types.NestedField field : type.fields()) {
383+
Object rawValue = values.get(field.name());
384+
Object value =
385+
rawValue instanceof Map
386+
? icebergGenericRecord(field.type().asStructType(), (Map<String, Object>) rawValue)
387+
: rawValue;
388+
record.setField(field.name(), value);
389+
}
390+
return record;
391+
}
392+
393+
public static String createParquetFile(org.apache.avro.Schema schema, List<GenericRecord> records)
394+
throws IOException {
395+
396+
File tempFile = createTempFile();
397+
Path file = new Path(tempFile.getPath());
398+
399+
AvroParquetWriter.Builder<GenericRecord> builder = AvroParquetWriter.builder(file);
400+
ParquetWriter<GenericRecord> parquetWriter = builder.withSchema(schema).build();
401+
for (GenericRecord record : records) {
402+
parquetWriter.write(record);
403+
}
404+
parquetWriter.close();
405+
406+
return tempFile.getPath();
407+
}
408+
409+
private static File createTempFile() throws IOException {
410+
File tempFile = File.createTempFile(ScanSourceTest.class.getSimpleName(), ".tmp");
411+
tempFile.deleteOnExit();
412+
boolean unused = tempFile.delete();
413+
return tempFile;
414+
}
213415
}

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java

+4
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ public Table createTable(
163163
return catalog.createTable(tableId, schema, partitionSpec);
164164
}
165165

166+
public Catalog.TableBuilder buildTable(TableIdentifier tableId, Schema schema) {
167+
return catalog.buildTable(tableId, schema);
168+
}
169+
166170
public Table loadTable(TableIdentifier tableId) {
167171
return catalog.loadTable(tableId);
168172
}

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java

+7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ public class TestFixtures {
3636
new Schema(
3737
required(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get()));
3838

39+
public static final Schema NESTED_SCHEMA =
40+
new Schema(
41+
required(1, "id", Types.LongType.get()),
42+
optional(2, "data", Types.StringType.get()),
43+
optional(
44+
3, "metadata", Types.StructType.of(optional(4, "source", Types.StringType.get()))));
45+
3946
public static final List<Map<String, Object>> FILE1SNAPSHOT1_DATA =
4047
ImmutableList.of(
4148
ImmutableMap.of("id", 0L, "data", "clarification"),

0 commit comments

Comments
 (0)