Skip to content

feat: add force_batch to iceberg_merge and incremental_insert #482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,60 @@
{% do return(raw_strategy) %}
{% endmacro %}

{% macro incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation, statement_name="main") %}

{% macro batch_incremental_insert(tmp_relation, target_relation, dest_cols_csv) %}
{% set partitions_batches = get_partition_batches(tmp_relation) %}
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}
{%- for batch in partitions_batches -%}
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches|length) -%}
{%- set insert_batch_partitions -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
where {{ batch }}
);
{%- endset -%}
{%- do run_query(insert_batch_partitions) -%}
{%- endfor -%}
{% endmacro %}


{% macro incremental_insert(
on_schema_change,
tmp_relation,
target_relation,
existing_relation,
force_batch,
statement_name="main"
)
%}
{%- set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%}
{%- if not dest_columns -%}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- endif -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

{%- set insert_full -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
);
{%- endset -%}
{% if force_batch %}
{% do batch_incremental_insert(tmp_relation, target_relation, dest_cols_csv) %}
{% else %}
{%- set insert_full -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
);
{%- endset -%}

{%- set query_result = adapter.run_query_with_partitions_limit_catching(insert_full) -%}
{%- do log('QUERY RESULT: ' ~ query_result) -%}
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
{% set partitions_batches = get_partition_batches(tmp_relation) %}
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}
{%- for batch in partitions_batches -%}
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches|length) -%}
{%- set insert_batch_partitions -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
where {{ batch }}
);
{%- endset -%}
{%- do run_query(insert_batch_partitions) -%}
{%- endfor -%}
{%- set query_result = adapter.run_query_with_partitions_limit_catching(insert_full) -%}
{%- do log('QUERY RESULT: ' ~ query_result) -%}
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
{% do batch_incremental_insert(tmp_relation, target_relation, dest_cols_csv) %}
{%- endif -%}
{%- endif -%}

SELECT '{{query_result}}'

{%- endmacro %}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,21 @@
{% endif %}
{% set query_result = safe_create_table_as(True, tmp_relation, sql, force_batch) -%}
{% do delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %}
{% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %}
{% set build_sql = incremental_insert(
on_schema_change, tmp_relation, target_relation, existing_relation, force_batch
)
%}
{% do to_drop.append(tmp_relation) %}
{% elif strategy == 'append' %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% if tmp_relation is not none %}
{% do drop_relation(tmp_relation) %}
{% endif %}
{% set query_result = safe_create_table_as(True, tmp_relation, sql, force_batch) -%}
{% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %}
{% set build_sql = incremental_insert(
on_schema_change, tmp_relation, target_relation, existing_relation, force_batch
)
%}
{% do to_drop.append(tmp_relation) %}
{% elif strategy == 'merge' and table_type == 'iceberg' %}
{% set unique_key = config.get('unique_key') %}
Expand Down Expand Up @@ -84,6 +90,7 @@
delete_condition=delete_condition,
update_condition=update_condition,
insert_condition=insert_condition,
force_batch=force_batch,
)
%}
{% do to_drop.append(tmp_relation) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@
{{ "," if not is_last }}
{%- endmacro -%}


{% macro batch_iceberg_merge(tmp_relation, target_relation, merge_part, dest_cols_csv) %}
{% set partitions_batches = get_partition_batches(tmp_relation) %}
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}
{%- for batch in partitions_batches -%}
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%}
{%- set src_batch_part -%}
merge into {{ target_relation }} as target
using (select {{ dest_cols_csv }} from {{ tmp_relation }} where {{ batch }}) as src
{%- endset -%}
{%- set merge_batch -%}
{{ src_batch_part }}
{{ merge_part }}
{%- endset -%}
{%- do run_query(merge_batch) -%}
{%- endfor -%}
{%- endmacro -%}


{% macro iceberg_merge(
on_schema_change,
tmp_relation,
Expand All @@ -57,6 +76,7 @@
delete_condition,
update_condition,
insert_condition,
force_batch,
statement_name="main"
)
%}
Expand Down Expand Up @@ -86,10 +106,6 @@
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns_wo_keys) -%}
{%- set src_cols_csv = src_columns_quoted | join(', ') -%}

{%- set src_part -%}
merge into {{ target_relation }} as target using {{ tmp_relation }} as src
{%- endset -%}

{%- set merge_part -%}
on (
{%- for key in unique_key_cols -%}
Expand Down Expand Up @@ -124,28 +140,22 @@
values ({{ src_cols_csv }})
{%- endset -%}

{%- set merge_full -%}
{{ src_part }}
{{ merge_part }}
{%- endset -%}
{%- if force_batch -%}
{% do batch_iceberg_merge(tmp_relation, target_relation, merge_part, dest_cols_csv) %}
{%- else -%}
{%- set src_part -%}
merge into {{ target_relation }} as target using {{ tmp_relation }} as src
{%- endset -%}
{%- set merge_full -%}
{{ src_part }}
{{ merge_part }}
{%- endset -%}

{%- set query_result = adapter.run_query_with_partitions_limit_catching(merge_full) -%}
{%- do log('QUERY RESULT: ' ~ query_result) -%}
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
{% set partitions_batches = get_partition_batches(tmp_relation) %}
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}
{%- for batch in partitions_batches -%}
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%}
{%- set src_batch_part -%}
merge into {{ target_relation }} as target
using (select * from {{ tmp_relation }} where {{ batch }}) as src
{%- endset -%}
{%- set merge_batch -%}
{{ src_batch_part }}
{{ merge_part }}
{%- endset -%}
{%- do run_query(merge_batch) -%}
{%- endfor -%}
{%- set query_result = adapter.run_query_with_partitions_limit_catching(merge_full) -%}
{%- do log('QUERY RESULT: ' ~ query_result) -%}
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
{% do batch_iceberg_merge(tmp_relation, target_relation, merge_part, dest_cols_csv) %}
{%- endif -%}
{%- endif -%}

SELECT '{{query_result}}'
Expand Down
136 changes: 136 additions & 0 deletions tests/functional/adapter/test_force_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import pytest

from dbt.contracts.results import RunStatus
from dbt.tests.util import run_dbt

models__force_batch_sql = """
{{ config(
materialized='table',
partitioned_by=['date_column'],
force_batch=true
)
}}

select
random() as rnd,
cast(date_column as date) as date_column
from (
values (
sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
)
) as t1(date_array)
cross join unnest(date_array) as t2(date_column)
"""

models_append_force_batch_sql = """
{{ config(
materialized='incremental',
incremental_strategy='append',
partitioned_by=['date_column'],
force_batch=true
)
}}

select
random() as rnd,
cast(date_column as date) as date_column
from (
values (
sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
)
) as t1(date_array)
cross join unnest(date_array) as t2(date_column)
"""

models_merge_force_batch_sql = """
{{ config(
table_type='iceberg',
materialized='incremental',
incremental_strategy='merge',
unique_key=['date_column'],
partitioned_by=['date_column'],
force_batch=true
)
}}
{% if is_incremental() %}
select
1 as rnd,
cast(date_column as date) as date_column
from (
values (
sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
)
) as t1(date_array)
cross join unnest(date_array) as t2(date_column)
{% else %}
select
2 as rnd,
cast(date_column as date) as date_column
from (
values (
sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
)
) as t1(date_array)
cross join unnest(date_array) as t2(date_column)
{% endif %}
"""


class TestForceBatchInsertParam:
@pytest.fixture(scope="class")
def models(self):
return {"force_batch.sql": models__force_batch_sql}

def test__force_batch_param(self, project):
relation_name = "force_batch"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"

model_run = run_dbt(["run", "--select", relation_name])
model_run_result = model_run.results[0]
assert model_run_result.status == RunStatus.Success

models_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert models_records_count == 212


class TestAppendForceBatch:
@pytest.fixture(scope="class")
def models(self):
return {"models_append_force_batch.sql": models_append_force_batch_sql}

def test__append_force_batch_param(self, project):
relation_name = "models_append_force_batch"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"

model_run = run_dbt(["run", "--select", relation_name])
model_run_result = model_run.results[0]
assert model_run_result.status == RunStatus.Success

models_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
assert models_records_count == 212


class TestMergeForceBatch:
@pytest.fixture(scope="class")
def models(self):
return {"models_merge_force_batch.sql": models_merge_force_batch_sql}

def test__merge_force_batch_param(self, project):
relation_name = "models_merge_force_batch"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"
model_run_result_distinct_query = f"select distinct rnd from {project.test_schema}.{relation_name}"

model_run = run_dbt(["run", "--select", relation_name])
model_run_result = model_run.results[0]
assert model_run_result.status == RunStatus.Success

model_update = run_dbt(["run", "--select", relation_name])
model_update_result = model_update.results[0]
assert model_update_result.status == RunStatus.Success

models_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
assert models_records_count == 212

models_distinct_records = project.run_sql(model_run_result_distinct_query, fetch="all")[0][0]
assert models_distinct_records == 1
41 changes: 0 additions & 41 deletions tests/functional/adapter/test_force_batch_insert.py

This file was deleted.