Skip to content

Commit fe44b1c

Browse files
authored
feat: add force_batch to iceberg_merge and incremental_insert (#482)
1 parent cb7050d commit fe44b1c

File tree

5 files changed

+225
-93
lines changed

5 files changed

+225
-93
lines changed

dbt/include/athena/macros/materializations/models/incremental/helpers.sql

+45-25
Original file line numberDiff line numberDiff line change
@@ -21,40 +21,60 @@
2121
{% do return(raw_strategy) %}
2222
{% endmacro %}
2323

24-
{% macro incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation, statement_name="main") %}
24+
25+
{% macro batch_incremental_insert(tmp_relation, target_relation, dest_cols_csv) %}
26+
{% set partitions_batches = get_partition_batches(tmp_relation) %}
27+
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}
28+
{%- for batch in partitions_batches -%}
29+
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches|length) -%}
30+
{%- set insert_batch_partitions -%}
31+
insert into {{ target_relation }} ({{ dest_cols_csv }})
32+
(
33+
select {{ dest_cols_csv }}
34+
from {{ tmp_relation }}
35+
where {{ batch }}
36+
);
37+
{%- endset -%}
38+
{%- do run_query(insert_batch_partitions) -%}
39+
{%- endfor -%}
40+
{% endmacro %}
41+
42+
43+
{% macro incremental_insert(
44+
on_schema_change,
45+
tmp_relation,
46+
target_relation,
47+
existing_relation,
48+
force_batch,
49+
statement_name="main"
50+
)
51+
%}
2552
{%- set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%}
2653
{%- if not dest_columns -%}
2754
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
2855
{%- endif -%}
2956
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
3057

31-
{%- set insert_full -%}
32-
insert into {{ target_relation }} ({{ dest_cols_csv }})
33-
(
34-
select {{ dest_cols_csv }}
35-
from {{ tmp_relation }}
36-
);
37-
{%- endset -%}
58+
{% if force_batch %}
59+
{% do batch_incremental_insert(tmp_relation, target_relation, dest_cols_csv) %}
60+
{% else %}
61+
{%- set insert_full -%}
62+
insert into {{ target_relation }} ({{ dest_cols_csv }})
63+
(
64+
select {{ dest_cols_csv }}
65+
from {{ tmp_relation }}
66+
);
67+
{%- endset -%}
3868

39-
{%- set query_result = adapter.run_query_with_partitions_limit_catching(insert_full) -%}
40-
{%- do log('QUERY RESULT: ' ~ query_result) -%}
41-
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
42-
{% set partitions_batches = get_partition_batches(tmp_relation) %}
43-
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}
44-
{%- for batch in partitions_batches -%}
45-
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches|length) -%}
46-
{%- set insert_batch_partitions -%}
47-
insert into {{ target_relation }} ({{ dest_cols_csv }})
48-
(
49-
select {{ dest_cols_csv }}
50-
from {{ tmp_relation }}
51-
where {{ batch }}
52-
);
53-
{%- endset -%}
54-
{%- do run_query(insert_batch_partitions) -%}
55-
{%- endfor -%}
69+
{%- set query_result = adapter.run_query_with_partitions_limit_catching(insert_full) -%}
70+
{%- do log('QUERY RESULT: ' ~ query_result) -%}
71+
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
72+
{% do batch_incremental_insert(tmp_relation, target_relation, dest_cols_csv) %}
73+
{%- endif -%}
5674
{%- endif -%}
75+
5776
SELECT '{{query_result}}'
77+
5878
{%- endmacro %}
5979

6080

dbt/include/athena/macros/materializations/models/incremental/incremental.sql

+9-2
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,21 @@
3939
{% endif %}
4040
{% set query_result = safe_create_table_as(True, tmp_relation, sql, force_batch) -%}
4141
{% do delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %}
42-
{% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %}
42+
{% set build_sql = incremental_insert(
43+
on_schema_change, tmp_relation, target_relation, existing_relation, force_batch
44+
)
45+
%}
4346
{% do to_drop.append(tmp_relation) %}
4447
{% elif strategy == 'append' %}
4548
{% set tmp_relation = make_temp_relation(target_relation) %}
4649
{% if tmp_relation is not none %}
4750
{% do drop_relation(tmp_relation) %}
4851
{% endif %}
4952
{% set query_result = safe_create_table_as(True, tmp_relation, sql, force_batch) -%}
50-
{% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %}
53+
{% set build_sql = incremental_insert(
54+
on_schema_change, tmp_relation, target_relation, existing_relation, force_batch
55+
)
56+
%}
5157
{% do to_drop.append(tmp_relation) %}
5258
{% elif strategy == 'merge' and table_type == 'iceberg' %}
5359
{% set unique_key = config.get('unique_key') %}
@@ -84,6 +90,7 @@
8490
delete_condition=delete_condition,
8591
update_condition=update_condition,
8692
insert_condition=insert_condition,
93+
force_batch=force_batch,
8794
)
8895
%}
8996
{% do to_drop.append(tmp_relation) %}

dbt/include/athena/macros/materializations/models/incremental/merge.sql

+35-25
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,25 @@
4747
{{ "," if not is_last }}
4848
{%- endmacro -%}
4949

50+
51+
{% macro batch_iceberg_merge(tmp_relation, target_relation, merge_part, dest_cols_csv) %}
52+
{% set partitions_batches = get_partition_batches(tmp_relation) %}
53+
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}
54+
{%- for batch in partitions_batches -%}
55+
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%}
56+
{%- set src_batch_part -%}
57+
merge into {{ target_relation }} as target
58+
using (select {{ dest_cols_csv }} from {{ tmp_relation }} where {{ batch }}) as src
59+
{%- endset -%}
60+
{%- set merge_batch -%}
61+
{{ src_batch_part }}
62+
{{ merge_part }}
63+
{%- endset -%}
64+
{%- do run_query(merge_batch) -%}
65+
{%- endfor -%}
66+
{%- endmacro -%}
67+
68+
5069
{% macro iceberg_merge(
5170
on_schema_change,
5271
tmp_relation,
@@ -57,6 +76,7 @@
5776
delete_condition,
5877
update_condition,
5978
insert_condition,
79+
force_batch,
6080
statement_name="main"
6181
)
6282
%}
@@ -86,10 +106,6 @@
86106
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns_wo_keys) -%}
87107
{%- set src_cols_csv = src_columns_quoted | join(', ') -%}
88108

89-
{%- set src_part -%}
90-
merge into {{ target_relation }} as target using {{ tmp_relation }} as src
91-
{%- endset -%}
92-
93109
{%- set merge_part -%}
94110
on (
95111
{%- for key in unique_key_cols -%}
@@ -124,28 +140,22 @@
124140
values ({{ src_cols_csv }})
125141
{%- endset -%}
126142

127-
{%- set merge_full -%}
128-
{{ src_part }}
129-
{{ merge_part }}
130-
{%- endset -%}
143+
{%- if force_batch -%}
144+
{% do batch_iceberg_merge(tmp_relation, target_relation, merge_part, dest_cols_csv) %}
145+
{%- else -%}
146+
{%- set src_part -%}
147+
merge into {{ target_relation }} as target using {{ tmp_relation }} as src
148+
{%- endset -%}
149+
{%- set merge_full -%}
150+
{{ src_part }}
151+
{{ merge_part }}
152+
{%- endset -%}
131153

132-
{%- set query_result = adapter.run_query_with_partitions_limit_catching(merge_full) -%}
133-
{%- do log('QUERY RESULT: ' ~ query_result) -%}
134-
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
135-
{% set partitions_batches = get_partition_batches(tmp_relation) %}
136-
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}
137-
{%- for batch in partitions_batches -%}
138-
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%}
139-
{%- set src_batch_part -%}
140-
merge into {{ target_relation }} as target
141-
using (select * from {{ tmp_relation }} where {{ batch }}) as src
142-
{%- endset -%}
143-
{%- set merge_batch -%}
144-
{{ src_batch_part }}
145-
{{ merge_part }}
146-
{%- endset -%}
147-
{%- do run_query(merge_batch) -%}
148-
{%- endfor -%}
154+
{%- set query_result = adapter.run_query_with_partitions_limit_catching(merge_full) -%}
155+
{%- do log('QUERY RESULT: ' ~ query_result) -%}
156+
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
157+
{% do batch_iceberg_merge(tmp_relation, target_relation, merge_part, dest_cols_csv) %}
158+
{%- endif -%}
149159
{%- endif -%}
150160

151161
SELECT '{{query_result}}'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import pytest
2+
3+
from dbt.contracts.results import RunStatus
4+
from dbt.tests.util import run_dbt
5+
6+
models__force_batch_sql = """
7+
{{ config(
8+
materialized='table',
9+
partitioned_by=['date_column'],
10+
force_batch=true
11+
)
12+
}}
13+
14+
select
15+
random() as rnd,
16+
cast(date_column as date) as date_column
17+
from (
18+
values (
19+
sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
20+
)
21+
) as t1(date_array)
22+
cross join unnest(date_array) as t2(date_column)
23+
"""
24+
25+
models_append_force_batch_sql = """
26+
{{ config(
27+
materialized='incremental',
28+
incremental_strategy='append',
29+
partitioned_by=['date_column'],
30+
force_batch=true
31+
)
32+
}}
33+
34+
select
35+
random() as rnd,
36+
cast(date_column as date) as date_column
37+
from (
38+
values (
39+
sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
40+
)
41+
) as t1(date_array)
42+
cross join unnest(date_array) as t2(date_column)
43+
"""
44+
45+
models_merge_force_batch_sql = """
46+
{{ config(
47+
table_type='iceberg',
48+
materialized='incremental',
49+
incremental_strategy='merge',
50+
unique_key=['date_column'],
51+
partitioned_by=['date_column'],
52+
force_batch=true
53+
)
54+
}}
55+
{% if is_incremental() %}
56+
select
57+
1 as rnd,
58+
cast(date_column as date) as date_column
59+
from (
60+
values (
61+
sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
62+
)
63+
) as t1(date_array)
64+
cross join unnest(date_array) as t2(date_column)
65+
{% else %}
66+
select
67+
2 as rnd,
68+
cast(date_column as date) as date_column
69+
from (
70+
values (
71+
sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
72+
)
73+
) as t1(date_array)
74+
cross join unnest(date_array) as t2(date_column)
75+
{% endif %}
76+
"""
77+
78+
79+
class TestForceBatchInsertParam:
80+
@pytest.fixture(scope="class")
81+
def models(self):
82+
return {"force_batch.sql": models__force_batch_sql}
83+
84+
def test__force_batch_param(self, project):
85+
relation_name = "force_batch"
86+
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"
87+
88+
model_run = run_dbt(["run", "--select", relation_name])
89+
model_run_result = model_run.results[0]
90+
assert model_run_result.status == RunStatus.Success
91+
92+
models_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
93+
94+
assert models_records_count == 212
95+
96+
97+
class TestAppendForceBatch:
98+
@pytest.fixture(scope="class")
99+
def models(self):
100+
return {"models_append_force_batch.sql": models_append_force_batch_sql}
101+
102+
def test__append_force_batch_param(self, project):
103+
relation_name = "models_append_force_batch"
104+
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"
105+
106+
model_run = run_dbt(["run", "--select", relation_name])
107+
model_run_result = model_run.results[0]
108+
assert model_run_result.status == RunStatus.Success
109+
110+
models_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
111+
assert models_records_count == 212
112+
113+
114+
class TestMergeForceBatch:
115+
@pytest.fixture(scope="class")
116+
def models(self):
117+
return {"models_merge_force_batch.sql": models_merge_force_batch_sql}
118+
119+
def test__merge_force_batch_param(self, project):
120+
relation_name = "models_merge_force_batch"
121+
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"
122+
model_run_result_distinct_query = f"select distinct rnd from {project.test_schema}.{relation_name}"
123+
124+
model_run = run_dbt(["run", "--select", relation_name])
125+
model_run_result = model_run.results[0]
126+
assert model_run_result.status == RunStatus.Success
127+
128+
model_update = run_dbt(["run", "--select", relation_name])
129+
model_update_result = model_update.results[0]
130+
assert model_update_result.status == RunStatus.Success
131+
132+
models_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
133+
assert models_records_count == 212
134+
135+
models_distinct_records = project.run_sql(model_run_result_distinct_query, fetch="all")[0][0]
136+
assert models_distinct_records == 1

tests/functional/adapter/test_force_batch_insert.py

-41
This file was deleted.

0 commit comments

Comments
 (0)