Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Override the default merge behaviour for incremental table materialisations in a model. #2737

Closed
AndrewCathcart opened this issue Sep 2, 2020 · 7 comments
Labels
duplicate This issue or pull request already exists enhancement New feature or request

Comments

@AndrewCathcart
Copy link

AndrewCathcart commented Sep 2, 2020

Describe the feature

I would like to be able to use a custom merge statement, instead of the default, when using incremental table materialisations. I have late arriving events, so when aggregating these I need to be able to ADD them to that existing row, instead of updating the row with the new aggregated events.

As an example;

Current -
DAY - ANIMAL - COUNT
1 ---- Sheep ---- 10

New -
DAY - ANIMAL - COUNT
1 ---- Sheep ---- 5

Expected (perform an old.count + new.count) -
DAY - ANIMAL - COUNT
1 ---- Sheep ---- 15

Actual -
DAY - ANIMAL - COUNT
1 ---- Sheep ---- 5

Describe alternatives you've considered

I'm using the following approach currently, but I feel this muddy's the model and will make it difficult for others to contribute. Redacted / simplified the code but hopefully it illustrates the general approach. For context we have late arriving events that need to be dealt with.

EDIT: DO NOT USE - CORRECT EXAMPLE IN THE COMMENTS

{% if is_incremental() %}
    WITH new_aggregates AS (
        SELECT
            unique_key,
            y,
            z,
            COUNT(*) as item_count
        FROM {{ ref('A') }}
        WHERE date_loaded >= (SELECT MAX(date_aggregated) FROM {{ this }})
        GROUP BY unique_key, y, z
    )

    ,aggregates_to_update AS (
        SELECT {{this}}.*
        FROM {{this}}
        INNER JOIN aggregates_to_update USING(unique_key)
    )

    ,finished_aggregates AS (
        SELECT
            new_data.unique_key AS unique_key,
            new_data.y AS y,
            new_data.z AS z,
            new_data.item_count + IFNULL(current_data.item_count, 0) AS item_count, -- THIS LINE HERE
            CURRENT_TIMESTAMP() AS date_aggregated
        FROM new_aggregates AS new_data
        LEFT JOIN aggregates_to_update AS current_data USING(unique_key)
    )
{% else %}
... non-incremental code here, don't need to have any complex merge code
{% endif %}

SELECT * FROM finished_aggregates

Additional context

We're currently using Snowflake. Previously I was using Snowflake Tasks & Streams, and doing addition in the merge statement to update the aggregates based on new data arriving on an append-only stream. We're trying to emulate the same thing here using DBT.

Who will this benefit?

I imagine it will benefit anyone who needs to be able to handle a more complex merge command, rather than the default.

Are you interested in contributing this feature?

Not sure I'd be best placed.

@AndrewCathcart AndrewCathcart added enhancement New feature or request triage labels Sep 2, 2020
@jtcohen6
Copy link
Contributor

jtcohen6 commented Sep 3, 2020

@AndrewCathcart Thanks for the detailed explanation!

To accomplish this today, you've got a couple of options:

  • You can change your modeling logic, as you've demonstrated above. I actually think this is a good (and less muddy) answer! In general, we find the "upsert" mechanism to be the most intuitive. Any logic more complex than "take the new record, throw out the old" should be to be expressed in model SQL, where it's more accessible to colleagues, rather than obfuscated away in materialization/macro code.
  • If you want to change the merge behavior for all incremental models running merge statements in your project, you can override the get_merge_sql macro.
  • If you want to change the behavior of a subset of incremental models, you would want to define a new materialization (copy-paste of the incremental materialization with slight modifications) that calls a different merge macro.

IMO that third option requires more duplicative code than it ideally would. There's an open issue (#2366) for enabling users to define their own incremental strategies (i.e. modifications of merge/delete/insert behavior) without requiring them to mess with the incremental materialization. That issue isn't prioritized currently, but it's something I want to do in the long run.

I'm going to close this issue because you've done a great job of solving it already, and because any dbt code changes we'd make to better accommodate this use case are already addressed in an open issue.

@jtcohen6 jtcohen6 closed this as completed Sep 3, 2020
@jtcohen6 jtcohen6 added duplicate This issue or pull request already exists and removed triage labels Sep 3, 2020
@AndrewCathcart
Copy link
Author

Thanks very much for your response :-) Apologies for opening a duplicate, I couldn't find anything on the topic when doing a search.

@jtcohen6
Copy link
Contributor

jtcohen6 commented Sep 4, 2020

Thank you for opening it, and including a great writeup! If someone has a similar question in the future, they'll be able to find this now :)

@AndrewCathcart
Copy link
Author

AndrewCathcart commented Sep 10, 2020

I thought i'd come back to this and add a problem that we had to solve, just in case anyone copies the above approach.

We have a snowpipe loading data into raw tables. This has a date_loaded column set by a CURRENT_TIMESTAMP() function call.

In the dbt code above, I'm also using CURRENT_TIMESTAMP to set a "date_aggregated" column's value.

To make this an incremental model, we have a where clause like so;

WHERE date_loaded >= (SELECT MAX(date_aggregated) FROM {{ this }})

This is where the problem arises.

What looks relatively harmless can cause you great concurrency pain down the line. From the snowflake docs for CURRENT_TIMESTAMP; "Do not use the returned value for precise time ordering between concurrent queries (processed by the same virtual warehouse) because the queries might be serviced by different servers (in the warehouse)."

We ran into a problem where we were missing some events. Thankfully I wrote a nice data test that checked the raw/aggregate counts matched and caught this before we went to prod.

To solve this we removed the "date_aggregated" column, and created a "date_loaded_checkpoint" column which contained the MAX(date_loaded) for the new data we were merging into the table. In this way we don't lose any events to concurrency issues by trying to use two separate CURRENT_TIMESTAMP calls (Snowpipe CURRENT_TIMESTAMP which is date_loaded, and dbt merge CURRENT_TIMESTAMP which is date_aggregated).

{% if is_incremental() %}
    WITH new_event_aggregates AS (
        SELECT
            unique_key,
            a,
            b,
            date_loaded,
            COUNT(*) AS item_count,
        FROM
            {{ ref('events') }}
        WHERE
            date_loaded > ( SELECT MAX(date_loaded_checkpoint) FROM {{ this }} )
        GROUP BY 1, 2, 3, 4
    ),
    current_aggregates_to_update AS (
        SELECT
            {{ this }}.*
        FROM
            {{ this }}
            INNER JOIN new_event_aggregates USING(unique_key)
    ),
    event_aggregates AS (
        SELECT
            new_data.unique_key AS unique_key,
            new_data.a AS a,
            new_data.b AS b,
            new_data.item_count + IFNULL(current_data.item_count, 0) AS item_count,
            new_data.date_loaded AS date_loaded
        FROM
            new_event_aggregates AS new_data
            LEFT JOIN current_aggregates_to_update AS current_data USING(unique_key)
    )
{% else %}
    WITH event_aggregates AS (
        SELECT
            unique_key,
            a,
            b,
            date_loaded,
            COUNT(*) AS item_count,
        FROM
            {{ ref('events') }}
        GROUP BY 1, 2, 3, 4
    )
{% endif %}
SELECT
    unique_key,
    a,
    b,
    SUM(item_count) as item_count,
    (SELECT MAX(date_loaded) FROM event_aggregates) AS date_loaded_checkpoint
FROM
    event_aggregates
GROUP BY 1, 2, 3

TLDR; Don't try to compare two separate CURRENT_TIMESTAMP calls. Use CURRENT_TIMESTAMP once, when loading data in, and then find the MAX of this in the dbt CTEs, setting this as a checkpoint for incremental tables.

@jtcohen6
Copy link
Contributor

Right on, @AndrewCathcart. In general, we advise against using relative date/time values (such as current_timestamp) for is_incremental() comparisons. It's always better to compare values in the underlying source data. Otherwise, idempotency is at risk, as you found: results varying based on when or how the model is run, given the same input data.

(Using current_timestamp within a Snowpipe definition is fine given that each record is loaded once in near-real-time.)

@Jeremynadal33
Copy link

Jeremynadal33 commented Jun 8, 2022

Hi everyone,
Sorry to write here but this felt like the closest topic.
I ran into the same issue but I did not feel like overwriting the incremental materialization since I just needed to add a simple statement to the merge strategy. Instead, I overwrote the get_merge_sql macro and used an additional configuration to the models so I can keep the original merge if needed.

To be more precise, I needed to check that the version I keep is the most recent one so I added a macro :
{% macro add_merge_date_condition() %} {% set date_col = config.get('update_date_column', none) %} {% if date_col %} and DBT_INTERNAL_SOURCE.{{date_col}} >= DBT_INTERNAL_DEST.{{date_col}} {% endif %} {% endmacro %}

Then I add it to the get_merge_sql macro :

{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
    {%- set predicates = [] if predicates is none else [] + predicates -%}
    {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
    {%- set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) -%}
    {%- set sql_header = config.get('sql_header', none) -%}
    {% if unique_key %}
        {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
            {% for key in unique_key %}
                {% set this_key_match %}
                    DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
                {% endset %}
                {% do predicates.append(this_key_match) %}
            {% endfor %}
        {% else %}
            {% set unique_key_match %}
                DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
            {% endset %}
            {% do predicates.append(unique_key_match) %}
        {% endif %}
    {% else %}
        {% do predicates.append('FALSE') %}
    {% endif %}

    {{ sql_header if sql_header is not none }}

    merge into {{ target }} as DBT_INTERNAL_DEST
        using {{ source }} as DBT_INTERNAL_SOURCE
        on {{ predicates | join(' and ') }}

    {% if unique_key %}
    when matched {{ add_merge_date_condition() }} then update set
        {% for column_name in update_columns -%}
            {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
            {%- if not loop.last %}, {%- endif %}
        {%- endfor %}
    {% endif %}

    when not matched then insert
        ({{ dest_cols_csv }})
    values
        ({{ dest_cols_csv }})

{% endmacro %}

FInally, I can put the following config on any model to ensure the new statement is used for the merge strategy :
{{ config( materialized='incremental' , unique_key = 'my_unique_key' , strategy = 'merge' , update_date_column='update_date' ) }}

Is there any reason for this not to be a good idea ? I was wondering if there is anything against addind custom config ?
Hope this helps, cheers !

@jtcohen6
Copy link
Contributor

@Jeremynadal33 This sounds a lot like the request to add custom predicates (#3293 + #4546), with exactly this idea: limit the table scan of the preexisting table when searching for matches.

The mechanism proposed there would add the condition to the matching predicates (ON ... AND ...), rather than a condition within the when matched and .... I believe that should have the same effect, so long as we're also filtering to just the past X number of days in the model SQL too ("new" data) — which you always should be, to limit the data scanned for incremental transformation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
duplicate This issue or pull request already exists enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants