- First we want to create the necessary stages and file formats:
create or replace file format development_jyeo.dbt_jyeo.my_csv_format type = 'CSV';
create schema if not exists development_jyeo.dbt_jyeo_stages;
create or replace stage development_jyeo.dbt_jyeo_stages.my_stage file_format = development_jyeo.dbt_jyeo.my_csv_format;
- Check that our stage exists and is empty:
jyeo_integration#TRANSFORMING@DEVELOPMENT_JYEO.DBT_JYEO>list @development_jyeo.dbt_jyeo_stages.my_stage;
+------+------+-----+---------------+
| name | size | md5 | last_modified |
|------+------+-----+---------------|
+------+------+-----+---------------+
0 Row(s) produced. Time Elapsed: 2.438s
- dbt project setup:
# dbt_project.yml
name: my_dbt_project
profile: snowflake
config-version: 2
version: 1.0
models:
my_dbt_project:
+materialized: table
# models/sources.yml
version: 2
sources:
- name: my_sources
schema: dbt_jyeo_stages
tables:
- name: my_stage
Note that the default schema (
target.schema
) for my project isdbt_jyeo
- which is where our models will be written to but our stage is in a different schema -dbt_jyeo_stages
.
-- models/foo.sql
{{
config(
materialized = 'incremental',
post_hook = "{{ unload_this() }}"
)
}}
select 1 as id, '{{ run_started_at.strftime("%Y-%m-%d %H:%M:%S") }}' as updated_at
And our unload_this()
macro:
-- macros/unload_this.sql
{% macro unload_this() %}
{% set run_ts = run_started_at.strftime("%Y-%m-%d %H:%M:%S") %}
{# Replace the special characters in our run_ts so that it looks better when used as csv file names #}
{% set run_ts_tidy = run_ts.replace(" ", "_").replace(":", "").replace("-", "_") %}
{% set new_rows_query %}
select count(*) as num_rows from {{ this }}
{% if is_incremental() -%}
where updated_at = '{{ run_ts }}'
{%- endif %}
{% endset %}
{% set file_count_query %}
select count(*) as num_files from @{{ source('my_sources', 'my_stage') }};
{% endset %}
{% set unload_query %}
copy into @{{ source('my_sources', 'my_stage') }}/{{ this.name }}_{{ run_ts_tidy }}.csv.gz
from (
select * from {{ this }}
{% if is_incremental() -%}
where updated_at = '{{ run_ts }}'
{%- endif %}
)
file_format = (format_name = development_jyeo.dbt_jyeo.my_csv_format)
single = true;
{% endset %}
{% if execute %}
{% set new_rows_count = run_query(new_rows_query) %}
{% set file_count_before = run_query(file_count_query).columns[0].values()[0] %}
{% do log('>>>>> New rows added to [' ~ this ~ ']: ' ~ new_rows_count.columns[0].values()[0], True) %}
{% do log('>>>>> Running copy command: ' ~ unload_query, True) %}
{% do run_query(unload_query) %}
{% set file_count_after = run_query(file_count_query).columns[0].values()[0] %}
{% do log('>>>>> Row count in stage: Before: ' ~ file_count_before ~ ' / After: ' ~ file_count_after, True) %}
{% endif %}
{% endmacro %}
- Let's do our first run:
$ dbt run --full-refresh
00:59:29 Running with dbt=1.3.3
00:59:30 Found 1 model, 0 tests, 0 snapshots, 0 analyses, 306 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
00:59:30
00:59:35 Concurrency: 1 threads (target='dev')
00:59:35
00:59:35 1 of 1 START sql incremental model dbt_jyeo.foo ................................ [RUN]
00:59:41 >>>>> New rows added to [development_jyeo.dbt_jyeo.foo]: 1
00:59:41 >>>>> Running copy command:
copy into @development_jyeo.dbt_jyeo_stages.my_stage/foo_2023_03_21_005928.csv.gz
from (
select * from development_jyeo.dbt_jyeo.foo
)
file_format = (format_name = development_jyeo.dbt_jyeo.my_csv_format)
single = true;
00:59:42 >>>>> Row count in stage: Before: 0 / After: 1
00:59:43 1 of 1 OK created sql incremental model dbt_jyeo.foo ........................... [SUCCESS 1 in 7.76s]
00:59:43
00:59:43 Finished running 1 incremental model in 0 hours 0 minutes and 13.27 seconds (13.27s).
00:59:43
00:59:43 Completed successfully
00:59:43
00:59:43 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
- Let's change our
foo
model up and rerun:
-- models/foo.sql
{{
config(
materialized = 'incremental',
post_hook = "{{ unload_this() }}"
)
}}
select 2 as id, '{{ run_started_at.strftime("%Y-%m-%d %H:%M:%S") }}' as updated_at
union
select 3 as id, '{{ run_started_at.strftime("%Y-%m-%d %H:%M:%S") }}' as updated_at
$ dbt run
01:00:16 Running with dbt=1.3.3
01:00:17 Found 1 model, 0 tests, 0 snapshots, 0 analyses, 306 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
01:00:17
01:00:22 Concurrency: 1 threads (target='dev')
01:00:22
01:00:22 1 of 1 START sql incremental model dbt_jyeo.foo ................................ [RUN]
01:00:31 >>>>> New rows added to [development_jyeo.dbt_jyeo.foo]: 2
01:00:31 >>>>> Running copy command:
copy into @development_jyeo.dbt_jyeo_stages.my_stage/foo_2023_03_21_010015.csv.gz
from (
select * from development_jyeo.dbt_jyeo.foo
where updated_at = '2023-03-21 01:00:15'
)
file_format = (format_name = development_jyeo.dbt_jyeo.my_csv_format)
single = true;
01:00:33 >>>>> Row count in stage: Before: 1 / After: 3
01:00:33 1 of 1 OK created sql incremental model dbt_jyeo.foo ........................... [SUCCESS 2 in 11.07s]
01:00:33
01:00:33 Finished running 1 incremental model in 0 hours 0 minutes and 16.56 seconds (16.56s).
01:00:33
01:00:33 Completed successfully
01:00:33
01:00:33 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
- Finally check our stage again to see how many files:
jyeo_integration#TRANSFORMING@DEVELOPMENT_JYEO.DBT_JYEO>list @development_jyeo.dbt_jyeo_stages.my_stage;
+---------------------------------------+------+----------------------------------+-------------------------------+
| name | size | md5 | last_modified |
|---------------------------------------+------+----------------------------------+-------------------------------|
| my_stage/foo_2023_03_21_005928.csv.gz | 48 | 2e381b11110495645e7c04ab2213e0f5 | Tue, 21 Mar 2023 00:59:33 GMT |
| my_stage/foo_2023_03_21_010015.csv.gz | 48 | eb079d7e22df736c2b458b9447e58087 | Tue, 21 Mar 2023 01:00:24 GMT |
+---------------------------------------+------+----------------------------------+-------------------------------+
2 Row(s) produced. Time Elapsed: 1.806s