Skip to content

[ManagedIO] Fail expansion when encountering extra or unknown configuration #34525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ public void testBuildTransformWithManaged() {
@Test
public void testManagedMappings() {
KafkaReadSchemaTransformProvider provider = new KafkaReadSchemaTransformProvider();
Map<String, String> mapping = ManagedTransformConstants.MAPPINGS.get(provider.identifier());
Map<String, String> mapping =
ManagedTransformConstants.CONFIG_NAME_OVERRIDES.get(provider.identifier());

assertNotNull(mapping);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ public void testBuildTransformWithManaged() {
@Test
public void testManagedMappings() {
KafkaWriteSchemaTransformProvider provider = new KafkaWriteSchemaTransformProvider();
Map<String, String> mapping = ManagedTransformConstants.MAPPINGS.get(provider.identifier());
Map<String, String> mapping =
ManagedTransformConstants.CONFIG_NAME_OVERRIDES.get(provider.identifier());

assertNotNull(mapping);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@
*/
package org.apache.beam.sdk.managed;

import static org.apache.beam.sdk.managed.ManagedTransformConstants.MAPPINGS;
import static org.apache.beam.sdk.managed.ManagedTransformConstants.CONFIG_NAME_OVERRIDES;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
Expand Down Expand Up @@ -160,15 +163,8 @@ static class ManagedSchemaTransform extends SchemaTransform {
ManagedConfig managedConfig, SchemaTransformProvider underlyingTransformProvider) {
// parse config before expansion to check if it matches underlying transform's config schema
Schema transformConfigSchema = underlyingTransformProvider.configurationSchema();
Row underlyingRowConfig;
try {
underlyingRowConfig = getRowConfig(managedConfig, transformConfigSchema);
} catch (Exception e) {
throw new IllegalArgumentException(
"Encountered an error when retrieving a Row configuration", e);
}

this.underlyingRowConfig = underlyingRowConfig;
this.underlyingRowConfig = getRowConfig(managedConfig, transformConfigSchema);
this.underlyingTransformProvider = underlyingTransformProvider;
this.managedConfig = managedConfig;
}
Expand All @@ -183,6 +179,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
return input.apply(underlyingTransformProvider.from(underlyingRowConfig));
}

@VisibleForTesting
public ManagedConfig getManagedConfig() {
return this.managedConfig;
}
Expand All @@ -205,25 +202,51 @@ Row getConfigurationRow() {
// May return an empty row (perhaps the underlying transform doesn't have any required
// parameters)
@VisibleForTesting
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
static Row getRowConfig(ManagedConfig config, Schema transformConfigSchema) {
Map<String, Object> configMap = config.resolveUnderlyingConfig();
// Build a config Row that will be used to build the underlying SchemaTransform.
// If a mapping for the SchemaTransform exists, we use it to update parameter names to align
// with the underlying SchemaTransform config schema
Map<String, String> mapping = MAPPINGS.get(config.getTransformIdentifier());
if (mapping != null && configMap != null) {
Map<String, String> namingOverride = CONFIG_NAME_OVERRIDES.get(config.getTransformIdentifier());
if (namingOverride != null && configMap != null) {
Map<String, Object> remappedConfig = new HashMap<>();
for (Map.Entry<String, Object> entry : configMap.entrySet()) {
String paramName = entry.getKey();
if (mapping.containsKey(paramName)) {
paramName = mapping.get(paramName);
if (namingOverride.containsKey(paramName)) {
paramName = namingOverride.get(paramName);
}
remappedConfig.put(paramName, entry.getValue());
}
configMap = remappedConfig;
}

return YamlUtils.toBeamRow(configMap, transformSchema, false);
validateUserConfig(
config.getTransformIdentifier(), new HashSet<>(configMap.keySet()), transformConfigSchema);

return YamlUtils.toBeamRow(configMap, transformConfigSchema, false);
}

static void validateUserConfig(
String transformId, Set<String> userParams, Schema transformConfigSchema) {
List<String> missingRequiredFields = new ArrayList<>();
for (Schema.Field field : transformConfigSchema.getFields()) {
boolean inUserConfig = userParams.remove(field.getName());
if (!field.getType().getNullable() && !inUserConfig) {
missingRequiredFields.add(field.getName());
}
}

if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) {
String msg = "Invalid config for transform '" + transformId + "':";
if (!missingRequiredFields.isEmpty()) {
msg += " Missing required fields: " + missingRequiredFields + ".";
}
if (!userParams.isEmpty()) {
msg += " Contains unknown fields: " + userParams + ".";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember when I worked with SchemaIO I encountered similar scenario due to Beam version mismatch between Python / and expansion service. This could be a valid use case on dev, e.g. python code is presubmit and java expansion service not yet regenerated.

Could this happen in managed transform also? Saying user is using a newer version of Beam with added configurations while managed backend isn't rollout out

if so shall we issue warning here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some offline conversations, we settled for a new API .skipConfigValidation() that will let users opt out if they need to

}

throw new IllegalArgumentException(msg);
}
}

// We load providers separately, after construction, to prevent the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import java.util.Map;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.ExternalTransforms.ManagedTransforms.Urns;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;

/**
Expand Down Expand Up @@ -62,15 +62,11 @@ public class ManagedTransformConstants {
.put("triggering_frequency", "triggering_frequency_seconds")
.build();

public static final Map<String, Map<String, String>> MAPPINGS =
public static final Map<String, Map<String, String>> CONFIG_NAME_OVERRIDES =
ImmutableMap.<String, Map<String, String>>builder()
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), KAFKA_READ_MAPPINGS)
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS)
.put(
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ),
BIGQUERY_READ_MAPPINGS)
.put(
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE),
BIGQUERY_WRITE_MAPPINGS)
.put(getUrn(Urns.KAFKA_READ), KAFKA_READ_MAPPINGS)
.put(getUrn(Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS)
.put(getUrn(Urns.BIGQUERY_READ), BIGQUERY_READ_MAPPINGS)
.put(getUrn(Urns.BIGQUERY_WRITE), BIGQUERY_WRITE_MAPPINGS)
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.managed.testing;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.AutoValueSchema;
Expand All @@ -31,6 +33,7 @@
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.checkerframework.checker.nullness.qual.Nullable;

@AutoService(SchemaTransformProvider.class)
public class TestSchemaTransformProvider
Expand All @@ -52,19 +55,27 @@ public static Builder builder() {
@SchemaFieldDescription("Integer to add to each row element.")
public abstract Integer getExtraInteger();

@SchemaFieldDescription("If true, will upper case the extra string. Default is false.")
public abstract @Nullable Boolean getToggleUppercase();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setExtraString(String extraString);

public abstract Builder setExtraInteger(Integer extraInteger);

public abstract Builder setToggleUppercase(Boolean toggleUppercase);

public abstract Config build();
}
}

@Override
public SchemaTransform from(Config config) {
String extraString = config.getExtraString();
String extraString =
firstNonNull(config.getToggleUppercase(), false)
? config.getExtraString().toUpperCase()
: config.getExtraString();
Integer extraInteger = config.getExtraInteger();
return new SchemaTransform() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Map;
import org.apache.beam.sdk.managed.testing.TestSchemaTransformProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.YamlUtils;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -49,6 +52,61 @@ public void testFailWhenNoConfigSpecified() {
config.validate();
}

@Test
public void testFailWhenUnknownFieldsSpecified() {
Map<String, Object> config =
ImmutableMap.of(
"extra_string",
"str",
"extra_integer",
123,
"toggle_uppercase",
true,
"unknown_field",
"unknown");
ManagedSchemaTransformProvider.ManagedConfig managedConfig =
ManagedSchemaTransformProvider.ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.setConfig(YamlUtils.yamlStringFromMap(config))
.build();

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid config for transform");
thrown.expectMessage(TestSchemaTransformProvider.IDENTIFIER);
thrown.expectMessage("Contains unknown fields");
thrown.expectMessage("unknown_field");
new ManagedSchemaTransformProvider(null).from(managedConfig);
}

@Test
public void testFailWhenMissingRequiredFields() {
Map<String, Object> config = ImmutableMap.of("extra_string", "str", "toggle_uppercase", true);
ManagedSchemaTransformProvider.ManagedConfig managedConfig =
ManagedSchemaTransformProvider.ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.setConfig(YamlUtils.yamlStringFromMap(config))
.build();

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid config for transform");
thrown.expectMessage(TestSchemaTransformProvider.IDENTIFIER);
thrown.expectMessage("Missing required fields");
thrown.expectMessage("extra_integer");
new ManagedSchemaTransformProvider(null).from(managedConfig);
}

@Test
public void testPassWhenMissingNullableFields() {
Map<String, Object> config = ImmutableMap.of("extra_string", "str", "extra_integer", 123);
ManagedSchemaTransformProvider.ManagedConfig managedConfig =
ManagedSchemaTransformProvider.ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.setConfig(YamlUtils.yamlStringFromMap(config))
.build();

new ManagedSchemaTransformProvider(null).from(managedConfig);
}

@Test
public void testGetConfigRowFromYamlString() {
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
Expand Down
Loading