Skip to content

[ManagedIO] Fail expansion when encountering extra or unknown configuration #34525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void testReadUsingManagedTransform() throws Exception {
Map<String, Object> configMap = new Yaml().load(yamlConfig);
PCollection<Row> output =
testPipeline
.apply(Managed.read(Managed.ICEBERG).withConfig(configMap))
.apply(Managed.read(Managed.ICEBERG).withConfig(configMap).skipConfigValidation())
.getSinglePCollection();

PAssert.that(output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,6 @@ public void testBuildTransformWithManaged() {
+ "bootstrap_servers: some bootstrap\n"
+ "schema: '{\"type\":\"record\",\"name\":\"my_record\",\"fields\":[{\"name\":\"bool\",\"type\":\"boolean\"}]}'",
"topic: topic_3\n"
+ "bootstrap_servers: some bootstrap\n"
+ "schema_registry_url: some-url\n"
+ "schema_registry_subject: some-subject\n"
+ "data_format: RAW",
"topic: topic_4\n"
+ "bootstrap_servers: some bootstrap\n"
+ "data_format: PROTO\n"
+ "schema: '"
Expand All @@ -331,7 +326,8 @@ public void testBuildTransformWithManaged() {
@Test
public void testManagedMappings() {
KafkaReadSchemaTransformProvider provider = new KafkaReadSchemaTransformProvider();
Map<String, String> mapping = ManagedTransformConstants.MAPPINGS.get(provider.identifier());
Map<String, String> mapping =
ManagedTransformConstants.CONFIG_NAME_OVERRIDES.get(provider.identifier());

assertNotNull(mapping);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ public void testBuildTransformWithManaged() {
@Test
public void testManagedMappings() {
KafkaWriteSchemaTransformProvider provider = new KafkaWriteSchemaTransformProvider();
Map<String, String> mapping = ManagedTransformConstants.MAPPINGS.get(provider.identifier());
Map<String, String> mapping =
ManagedTransformConstants.CONFIG_NAME_OVERRIDES.get(provider.identifier());

assertNotNull(mapping);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ public abstract static class ManagedTransform extends PTransform<PInput, PCollec
@VisibleForTesting
abstract List<String> getSupportedIdentifiers();

abstract @Nullable Boolean getSkipConfigValidation();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -187,6 +189,8 @@ abstract static class Builder {
@VisibleForTesting
abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);

abstract Builder setSkipConfigValidation(boolean skip);

abstract ManagedTransform build();
}

Expand All @@ -213,6 +217,14 @@ ManagedTransform withSupportedIdentifiers(List<String> supportedIdentifiers) {
return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
}

/**
* Skips configuration validation. If unset, the pipeline will fail at construction time if the
* configuration includes unknown fields or missing required fields.
*/
public ManagedTransform skipConfigValidation() {
return toBuilder().setSkipConfigValidation(true).build();
}

@Override
public PCollectionRowTuple expand(PInput input) {
PCollectionRowTuple inputTuple = resolveInput(input);
Expand All @@ -222,6 +234,7 @@ public PCollectionRowTuple expand(PInput input) {
.setTransformIdentifier(getIdentifier())
.setConfig(YamlUtils.yamlStringFromMap(getConfig()))
.setConfigUrl(getConfigUrl())
.setSkipConfigValidation(getSkipConfigValidation())
.build();

SchemaTransform underlyingTransform =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@
*/
package org.apache.beam.sdk.managed;

import static org.apache.beam.sdk.managed.ManagedTransformConstants.MAPPINGS;
import static org.apache.beam.sdk.managed.ManagedTransformConstants.CONFIG_NAME_OVERRIDES;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -97,6 +101,11 @@ public static Builder builder() {
@SchemaFieldDescription("YAML string config used to build the underlying SchemaTransform.")
public abstract @Nullable String getConfig();

@SchemaFieldDescription(
"Skips configuration validation. If unset, the pipeline will fail at construction "
+ "time if the configuration includes unknown fields or missing required fields.")
public abstract @Nullable Boolean getSkipConfigValidation();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTransformIdentifier(String identifier);
Expand All @@ -105,6 +114,8 @@ public abstract static class Builder {

public abstract Builder setConfig(@Nullable String yamlConfig);

public abstract Builder setSkipConfigValidation(@Nullable Boolean skip);

public abstract ManagedConfig build();
}

Expand Down Expand Up @@ -153,28 +164,21 @@ protected SchemaTransform from(ManagedConfig managedConfig) {

static class ManagedSchemaTransform extends SchemaTransform {
private final ManagedConfig managedConfig;
private final Row underlyingRowConfig;
private final SchemaTransformProvider underlyingTransformProvider;

ManagedSchemaTransform(
ManagedConfig managedConfig, SchemaTransformProvider underlyingTransformProvider) {
// parse config before expansion to check if it matches underlying transform's config schema
Schema transformConfigSchema = underlyingTransformProvider.configurationSchema();
Row underlyingRowConfig;
try {
underlyingRowConfig = getRowConfig(managedConfig, transformConfigSchema);
} catch (Exception e) {
throw new IllegalArgumentException(
"Encountered an error when retrieving a Row configuration", e);
}

this.underlyingRowConfig = underlyingRowConfig;
this.underlyingTransformProvider = underlyingTransformProvider;
this.managedConfig = managedConfig;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
Row underlyingRowConfig =
getRowConfig(
managedConfig,
underlyingTransformProvider.configurationSchema(),
input.getPipeline().getOptions());
LOG.debug(
"Building transform \"{}\" with configuration: {}",
underlyingTransformProvider.identifier(),
Expand All @@ -183,6 +187,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
return input.apply(underlyingTransformProvider.from(underlyingRowConfig));
}

@VisibleForTesting
public ManagedConfig getManagedConfig() {
return this.managedConfig;
}
Expand All @@ -205,25 +210,57 @@ Row getConfigurationRow() {
// May return an empty row (perhaps the underlying transform doesn't have any required
// parameters)
@VisibleForTesting
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
static Row getRowConfig(
ManagedConfig config, Schema transformConfigSchema, PipelineOptions options) {
Map<String, Object> configMap = config.resolveUnderlyingConfig();
// Build a config Row that will be used to build the underlying SchemaTransform.
// If a mapping for the SchemaTransform exists, we use it to update parameter names to align
// with the underlying SchemaTransform config schema
Map<String, String> mapping = MAPPINGS.get(config.getTransformIdentifier());
if (mapping != null && configMap != null) {
Map<String, String> namingOverride = CONFIG_NAME_OVERRIDES.get(config.getTransformIdentifier());
if (namingOverride != null && configMap != null) {
Map<String, Object> remappedConfig = new HashMap<>();
for (Map.Entry<String, Object> entry : configMap.entrySet()) {
String paramName = entry.getKey();
if (mapping.containsKey(paramName)) {
paramName = mapping.get(paramName);
if (namingOverride.containsKey(paramName)) {
paramName = namingOverride.get(paramName);
}
remappedConfig.put(paramName, entry.getValue());
}
configMap = remappedConfig;
}

return YamlUtils.toBeamRow(configMap, transformSchema, false);
@Nullable Boolean skipValidation = config.getSkipConfigValidation();
if (skipValidation == null || !skipValidation) {
validateUserConfig(
config.getTransformIdentifier(),
new HashSet<>(configMap.keySet()),
transformConfigSchema);
}

return YamlUtils.toBeamRow(configMap, transformConfigSchema, false);
}

static void validateUserConfig(
String transformId, Set<String> userParams, Schema transformConfigSchema) {
List<String> missingRequiredFields = new ArrayList<>();
for (Schema.Field field : transformConfigSchema.getFields()) {
boolean inUserConfig = userParams.remove(field.getName());
if (!field.getType().getNullable() && !inUserConfig) {
missingRequiredFields.add(field.getName());
}
}

if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) {
String msg = "Invalid config for transform '" + transformId + "':";
if (!missingRequiredFields.isEmpty()) {
msg += " Missing required fields: " + missingRequiredFields + ".";
}
if (!userParams.isEmpty()) {
msg += " Contains unknown fields: " + userParams + ".";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember when I worked with SchemaIO I encountered similar scenario due to Beam version mismatch between Python / and expansion service. This could be a valid use case on dev, e.g. python code is presubmit and java expansion service not yet regenerated.

Could this happen in managed transform also? Saying user is using a newer version of Beam with added configurations while managed backend isn't rollout out

if so shall we issue warning here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some offline conversations, we settled for a new API .skipConfigValidation() that will let users opt out if they need to

}

throw new IllegalArgumentException(msg);
}
}

// We load providers separately, after construction, to prevent the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import java.util.Map;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.ExternalTransforms.ManagedTransforms.Urns;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;

/**
Expand Down Expand Up @@ -62,15 +62,11 @@ public class ManagedTransformConstants {
.put("triggering_frequency", "triggering_frequency_seconds")
.build();

public static final Map<String, Map<String, String>> MAPPINGS =
public static final Map<String, Map<String, String>> CONFIG_NAME_OVERRIDES =
ImmutableMap.<String, Map<String, String>>builder()
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), KAFKA_READ_MAPPINGS)
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS)
.put(
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ),
BIGQUERY_READ_MAPPINGS)
.put(
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE),
BIGQUERY_WRITE_MAPPINGS)
.put(getUrn(Urns.KAFKA_READ), KAFKA_READ_MAPPINGS)
.put(getUrn(Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS)
.put(getUrn(Urns.BIGQUERY_READ), BIGQUERY_READ_MAPPINGS)
.put(getUrn(Urns.BIGQUERY_WRITE), BIGQUERY_WRITE_MAPPINGS)
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.managed.testing;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.AutoValueSchema;
Expand All @@ -31,6 +33,7 @@
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.checkerframework.checker.nullness.qual.Nullable;

@AutoService(SchemaTransformProvider.class)
public class TestSchemaTransformProvider
Expand All @@ -52,19 +55,27 @@ public static Builder builder() {
@SchemaFieldDescription("Integer to add to each row element.")
public abstract Integer getExtraInteger();

@SchemaFieldDescription("If true, will upper case the extra string. Default is false.")
public abstract @Nullable Boolean getToggleUppercase();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setExtraString(String extraString);

public abstract Builder setExtraInteger(Integer extraInteger);

public abstract Builder setToggleUppercase(Boolean toggleUppercase);

public abstract Config build();
}
}

@Override
public SchemaTransform from(Config config) {
String extraString = config.getExtraString();
String extraString =
firstNonNull(config.getToggleUppercase(), false)
? config.getExtraString().toUpperCase()
: config.getExtraString();
Integer extraInteger = config.getExtraInteger();
return new SchemaTransform() {
@Override
Expand Down
Loading
Loading