Skip to content

fix: Invalid merge when unique_key == merge_update_columns #406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,17 @@
when matched and ({{ delete_condition }})
then delete
{%- endif %}
when matched
then update set
{%- for col in update_columns %}
{%- if merge_update_columns_rules and col.name in merge_update_columns_rules %}
{{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }}
{%- else -%}
{{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }}
{%- endif -%}
{%- endfor %}
{% if update_columns -%}
when matched
then update set
{%- for col in update_columns %}
{%- if merge_update_columns_rules and col.name in merge_update_columns_rules %}
{{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }}
{%- else -%}
{{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }}
{%- endif -%}
{%- endfor %}
{%- endif %}
when not matched
then insert ({{ dest_cols_csv }})
values ({{ src_cols_csv }})
Expand Down
69 changes: 69 additions & 0 deletions tests/functional/adapter/test_incremental_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import pytest

from dbt.tests.adapter.incremental.test_incremental_merge_exclude_columns import (
BaseMergeExcludeColumns,
)
from dbt.tests.adapter.incremental.test_incremental_predicates import (
BaseIncrementalPredicates,
)
Expand Down Expand Up @@ -46,6 +49,42 @@
3,anyway,purple
"""

models__merge_exclude_all_columns_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
merge_exclude_columns=['msg', 'color']
) }}

{% if not is_incremental() %}

-- data for first invocation of model

select 1 as id, 'hello' as msg, 'blue' as color
union all
select 2 as id, 'goodbye' as msg, 'red' as color

{% else %}

-- data for subsequent incremental update

select 1 as id, 'hey' as msg, 'blue' as color
union all
select 2 as id, 'yo' as msg, 'green' as color
union all
select 3 as id, 'anyway' as msg, 'purple' as color

{% endif %}
"""


seeds__expected_merge_exclude_all_columns_csv = """id,msg,color
1,hello,blue
2,goodbye,red
3,anyway,purple
"""


class TestIcebergIncrementalUniqueKey(BaseIncrementalUniqueKey):
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -185,6 +224,36 @@ def test__incremental_predicates(self, project):
self.check_scenario_correctness(expected_fields, test_case_fields, project)


class TestIcebergMergeExcludeColumns(BaseMergeExcludeColumns):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_strategy": "merge",
"+table_type": "iceberg",
}
}


class TestIcebergMergeExcludeAllColumns(BaseMergeExcludeColumns):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_strategy": "merge",
"+table_type": "iceberg",
}
}

@pytest.fixture(scope="class")
def models(self):
return {"merge_exclude_columns.sql": models__merge_exclude_all_columns_sql}

@pytest.fixture(scope="class")
def seeds(self):
return {"expected_merge_exclude_columns.csv": seeds__expected_merge_exclude_all_columns_csv}


def replace_cast_date(model: str) -> str:
"""Wrap all date strings with a cast date function"""

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from collections import namedtuple

import pytest

from dbt.tests.util import check_relations_equal, run_dbt

models__merge_no_updates_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy = 'merge',
merge_update_columns = ['id'],
table_type = 'iceberg',
) }}

{% if not is_incremental() %}

-- data for first invocation of model

select 1 as id, 'hello' as msg, 'blue' as color
union all
select 2 as id, 'goodbye' as msg, 'red' as color

{% else %}

-- data for subsequent incremental update

select 1 as id, 'hey' as msg, 'blue' as color
union all
select 2 as id, 'yo' as msg, 'green' as color
union all
select 3 as id, 'anyway' as msg, 'purple' as color

{% endif %}
"""

seeds__expected_merge_no_updates_csv = """id,msg,color
1,hello,blue
2,goodbye,red
3,anyway,purple
"""

ResultHolder = namedtuple(
"ResultHolder",
[
"seed_count",
"model_count",
"seed_rows",
"inc_test_model_count",
"relation",
],
)


class TestIncrementalIcebergMergeNoUpdates:
@pytest.fixture(scope="class")
def models(self):
return {"merge_no_updates.sql": models__merge_no_updates_sql}

@pytest.fixture(scope="class")
def seeds(self):
return {"expected_merge_no_updates.csv": seeds__expected_merge_no_updates_csv}

def update_incremental_model(self, incremental_model):
"""update incremental model after the seed table has been updated"""
model_result_set = run_dbt(["run", "--select", incremental_model])
return len(model_result_set)

def get_test_fields(self, project, seed, incremental_model, update_sql_file):
seed_count = len(run_dbt(["seed", "--select", seed, "--full-refresh"]))

model_count = len(run_dbt(["run", "--select", incremental_model, "--full-refresh"]))

relation = incremental_model
# update seed in anticipation of incremental model update
row_count_query = f"select * from {project.test_schema}.{seed}"

seed_rows = len(project.run_sql(row_count_query, fetch="all"))

# propagate seed state to incremental model according to unique keys
inc_test_model_count = self.update_incremental_model(incremental_model=incremental_model)

return ResultHolder(seed_count, model_count, seed_rows, inc_test_model_count, relation)

def check_scenario_correctness(self, expected_fields, test_case_fields, project):
"""Invoke assertions to verify correct build functionality"""
# 1. test seed(s) should build afresh
assert expected_fields.seed_count == test_case_fields.seed_count
# 2. test model(s) should build afresh
assert expected_fields.model_count == test_case_fields.model_count
# 3. seeds should have intended row counts post update
assert expected_fields.seed_rows == test_case_fields.seed_rows
# 4. incremental test model(s) should be updated
assert expected_fields.inc_test_model_count == test_case_fields.inc_test_model_count
# 5. result table should match intended result set (itself a relation)
check_relations_equal(project.adapter, [expected_fields.relation, test_case_fields.relation])

def test__merge_no_updates(self, project):
"""seed should match model after incremental run"""

expected_fields = ResultHolder(
seed_count=1,
model_count=1,
inc_test_model_count=1,
seed_rows=3,
relation="expected_merge_no_updates",
)

test_case_fields = self.get_test_fields(
project,
seed="expected_merge_no_updates",
incremental_model="merge_no_updates",
update_sql_file=None,
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)