Skip to content

Commit 53080f1

Browse files
authored
[java] BQ: Add avro schema to BQ TableSchema conversion (#33389)
* [java] BQ: Add avro schema to BQ TableSchema conversion * Use name that avoid compiler class with overload
1 parent 23ba9fc commit 53080f1

File tree

4 files changed

+457
-39
lines changed

4 files changed

+457
-39
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java

+137
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Map;
3838
import java.util.Optional;
3939
import java.util.Set;
40+
import java.util.stream.Collectors;
4041
import org.apache.avro.Conversions;
4142
import org.apache.avro.LogicalType;
4243
import org.apache.avro.LogicalTypes;
@@ -530,4 +531,140 @@ private static Field convertField(
530531
bigQueryField.getDescription(),
531532
(Object) null /* Cast to avoid deprecated JsonNode constructor. */);
532533
}
534+
535+
static TableSchema fromGenericAvroSchema(Schema schema) {
536+
return fromGenericAvroSchema(schema, true);
537+
}
538+
539+
static TableSchema fromGenericAvroSchema(Schema schema, Boolean useAvroLogicalTypes) {
540+
verify(
541+
schema.getType() == Type.RECORD,
542+
"Expected Avro schema type RECORD, not %s",
543+
schema.getType());
544+
545+
List<TableFieldSchema> fields =
546+
schema.getFields().stream()
547+
.map(f -> fromAvroFieldSchema(f, useAvroLogicalTypes))
548+
.collect(Collectors.toList());
549+
return new TableSchema().setFields(fields);
550+
}
551+
552+
private static TableFieldSchema fromAvroFieldSchema(
553+
Schema.Field avrofield, Boolean useAvroLogicalTypes) {
554+
Schema fieldSchema = avrofield.schema();
555+
TableFieldSchema field;
556+
switch (fieldSchema.getType()) {
557+
case UNION:
558+
List<Schema> types = fieldSchema.getTypes();
559+
verify(
560+
types.size() == 2 && types.get(0).getType() == Type.NULL,
561+
"Avro union field %s should be of null and another type, not %s",
562+
avrofield.name(),
563+
fieldSchema);
564+
field = typedTableFieldSchema(types.get(1), useAvroLogicalTypes).setMode("NULLABLE");
565+
break;
566+
case ARRAY:
567+
field =
568+
typedTableFieldSchema(fieldSchema.getElementType(), useAvroLogicalTypes)
569+
.setMode("REPEATED");
570+
break;
571+
case MAP:
572+
TableFieldSchema key =
573+
new TableFieldSchema().setType("STRING").setName("key").setMode("REQUIRED");
574+
TableFieldSchema value =
575+
typedTableFieldSchema(fieldSchema.getValueType(), useAvroLogicalTypes)
576+
.setName("value")
577+
.setMode("REQUIRED");
578+
List<TableFieldSchema> mapTableSchema = new ArrayList<>();
579+
mapTableSchema.add(key);
580+
mapTableSchema.add(value);
581+
field =
582+
new TableFieldSchema().setType("RECORD").setFields(mapTableSchema).setMode("REPEATED");
583+
break;
584+
default:
585+
field = typedTableFieldSchema(fieldSchema, useAvroLogicalTypes).setMode("REQUIRED");
586+
}
587+
588+
return field.setName(avrofield.name()).setDescription(avrofield.doc());
589+
}
590+
591+
private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAvroLogicalTypes) {
592+
TableFieldSchema fieldSchema = new TableFieldSchema();
593+
LogicalType logicalType = useAvroLogicalTypes ? type.getLogicalType() : null;
594+
String sqlType = useAvroLogicalTypes ? type.getProp("sqlType") : null;
595+
switch (type.getType()) {
596+
case INT:
597+
if (logicalType instanceof LogicalTypes.Date) {
598+
return fieldSchema.setType("DATE");
599+
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
600+
return fieldSchema.setType("TIME");
601+
} else {
602+
return fieldSchema.setType("INTEGER");
603+
}
604+
case LONG:
605+
if (logicalType instanceof LogicalTypes.TimeMicros) {
606+
return fieldSchema.setType("TIME");
607+
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
608+
&& (logicalType instanceof LogicalTypes.LocalTimestampMillis
609+
|| logicalType instanceof LogicalTypes.LocalTimestampMicros)) {
610+
return fieldSchema.setType("DATETIME");
611+
} else if (logicalType instanceof LogicalTypes.TimestampMillis
612+
|| logicalType instanceof LogicalTypes.TimestampMicros) {
613+
return fieldSchema.setType("TIMESTAMP");
614+
} else {
615+
return fieldSchema.setType("INTEGER");
616+
}
617+
case FLOAT:
618+
case DOUBLE:
619+
return fieldSchema.setType("FLOAT");
620+
case BOOLEAN:
621+
return fieldSchema.setType("BOOLEAN");
622+
case STRING:
623+
if ("GEOGRAPHY".equals(sqlType)) {
624+
return fieldSchema.setType("GEOGRAPHY");
625+
} else if ("JSON".equals(sqlType)) {
626+
return fieldSchema.setType("JSON");
627+
} else {
628+
return fieldSchema.setType("STRING");
629+
}
630+
case BYTES:
631+
if (logicalType instanceof LogicalTypes.Decimal) {
632+
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
633+
int precision = decimal.getPrecision();
634+
int scale = decimal.getScale();
635+
if (scale <= 9 && precision - scale <= 29) {
636+
fieldSchema.setType("NUMERIC");
637+
if (!(precision == 38 && scale == 9)) {
638+
fieldSchema.setPrecision((long) precision);
639+
if (scale != 0) {
640+
fieldSchema.setScale((long) scale);
641+
}
642+
}
643+
} else {
644+
fieldSchema.setType("BIGNUMERIC");
645+
if (!(precision == 77 && scale == 38)) {
646+
fieldSchema.setPrecision((long) precision);
647+
if (scale != 0) {
648+
fieldSchema.setScale((long) scale);
649+
}
650+
}
651+
}
652+
return fieldSchema;
653+
} else {
654+
return fieldSchema.setType("BYTES");
655+
}
656+
case ENUM:
657+
return fieldSchema.setType("STRING");
658+
case FIXED:
659+
return fieldSchema.setType("BYTES");
660+
case RECORD:
661+
List<TableFieldSchema> recordFields =
662+
type.getFields().stream()
663+
.map(f -> fromAvroFieldSchema(f, useAvroLogicalTypes))
664+
.collect(Collectors.toList());
665+
return new TableFieldSchema().setType("RECORD").setFields(recordFields);
666+
default:
667+
throw new IllegalArgumentException("Unknown Avro type: " + type.getType());
668+
}
669+
}
533670
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -456,12 +456,23 @@ public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOp
456456
return fromTableFieldSchema(tableSchema.getFields(), options);
457457
}
458458

459-
/** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */
459+
/** Convert a BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */
460460
public static org.apache.avro.Schema toGenericAvroSchema(TableSchema tableSchema) {
461461
return toGenericAvroSchema(tableSchema, false);
462462
}
463463

464-
/** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */
464+
/** Convert an Avro {@link org.apache.avro.Schema} to a BigQuery {@link TableSchema}. */
465+
public static TableSchema fromGenericAvroSchema(org.apache.avro.Schema schema) {
466+
return fromGenericAvroSchema(schema, false);
467+
}
468+
469+
/** Convert an Avro {@link org.apache.avro.Schema} to a BigQuery {@link TableSchema}. */
470+
public static TableSchema fromGenericAvroSchema(
471+
org.apache.avro.Schema schema, Boolean useAvroLogicalTypes) {
472+
return BigQueryAvroUtils.fromGenericAvroSchema(schema, useAvroLogicalTypes);
473+
}
474+
475+
/** Convert a BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */
465476
public static org.apache.avro.Schema toGenericAvroSchema(
466477
TableSchema tableSchema, Boolean useAvroLogicalTypes) {
467478
return toGenericAvroSchema("root", tableSchema.getFields(), useAvroLogicalTypes);

0 commit comments

Comments
 (0)