Skip to content

Commit

Permalink
187: Adding apache hudi support to dbt (#210)
Browse files Browse the repository at this point in the history
* initial working version

* Rebased and resolve all the merge conflicts.

* Rebased and resolved merge conflicts.

* Removed hudi dep jar and used the released version via packages option

* Added insert overwrite unit tests for hudi

* Used unique_key as default value for hudi primaryKey option

* Updated changelog.md with this new update.

* Final round of testing and few minor fixes

* Fixed lint issues

* Fixed the integration tests

* Fixed the circle ci env to add hudi packages

* Updated hudi spark bundle to use scala 2.11

* Fixed Hudi incremental strategy integration tests and other integration tests

* Fixed the hudi hive sync hms integration test issues

* Added sql HMS config to fix the integration tests.

* Added hudi hive sync mode conf to CI

* Set the hms schema verification to false

* Removed the merge update columns hence its not supported.

* Passed the correct hiveconf to the circle ci build script

* Disabled few incremental tests for spark2 and reverted to spark2 config

* Added hudi configs to the circle ci build script

* Commented out the Hudi integration test until we have the hudi 0.10.0 version

* Fixed the macro which checks the table type.

* Disabled this model since hudi is not supported in databricks runtime, will be added later
  • Loading branch information
Vinoth Govindarajan authored Nov 19, 2021
1 parent 05678b3 commit 68a3b5a
Show file tree
Hide file tree
Showing 19 changed files with 261 additions and 8 deletions.
12 changes: 12 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ jobs:
--conf spark.hadoop.javax.jdo.option.ConnectionUserName=dbt
--conf spark.hadoop.javax.jdo.option.ConnectionPassword=dbt
--conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.jars.packages=org.apache.hudi:hudi-spark-bundle_2.11:0.9.0
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
--conf spark.driver.userClassPathFirst=true
--conf spark.hadoop.datanucleus.autoCreateTables=true
--conf spark.hadoop.datanucleus.schema.autoCreateTables=true
--conf spark.hadoop.datanucleus.fixedDatastore=false
--conf spark.sql.hive.convertMetastoreParquet=false
--hiveconf hoodie.datasource.hive_sync.use_jdbc=false
--hiveconf hoodie.datasource.hive_sync.mode=hms
--hiveconf datanucleus.schema.autoCreateAll=true
--hiveconf hive.metastore.schema.verification=false
- image: postgres:9.6.17-alpine
environment:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
### Fixes
- Enhanced get_columns_in_relation method to handle a bug in open source deltalake which doesnt return schema details in `show table extended in databasename like '*'` query output. This impacts dbt snapshots if file format is open source deltalake ([#207](https://github.com/dbt-labs/dbt-spark/pull/207))
- Parse properly columns when there are struct fields to avoid considering inner fields: Issue ([#202](https://github.com/dbt-labs/dbt-spark/issues/202))
- Add support for Apache Hudi (hudi file format) which supports incremental merge strategies: Issue ([#187](https://github.com/dbt-labs/dbt-spark/issues/187))

### Under the hood
- Add `unique_field` to better understand adapter adoption in anonymous usage tracking ([#211](https://github.com/dbt-labs/dbt-spark/pull/211))

### Contributors
- [@harryharanb](https://github.com/harryharanb) ([#207](https://github.com/dbt-labs/dbt-spark/pull/207))
- [@SCouto](https://github.com/Scouto) ([#204](https://github.com/dbt-labs/dbt-spark/pull/204))
- [@vingov](https://github.com/vingov) ([#210](https://github.com/dbt-labs/dbt-spark/pull/210))

## dbt-spark 0.21.0b2 (August 20, 2021)

Expand Down
13 changes: 13 additions & 0 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ class SparkAdapter(SQLAdapter):
INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE)
INFORMATION_STATISTICS_REGEX = re.compile(
r"^Statistics: (.*)$", re.MULTILINE)
HUDI_METADATA_COLUMNS = [
'_hoodie_commit_time',
'_hoodie_commit_seqno',
'_hoodie_record_key',
'_hoodie_partition_path',
'_hoodie_file_name'
]

Relation = SparkRelation
Column = SparkColumn
Expand Down Expand Up @@ -145,12 +152,14 @@ def list_relations_without_caching(
rel_type = RelationType.View \
if 'Type: VIEW' in information else RelationType.Table
is_delta = 'Provider: delta' in information
is_hudi = 'Provider: hudi' in information
relation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_hudi=is_hudi,
)
relations.append(relation)

Expand Down Expand Up @@ -224,6 +233,10 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
# which would execute 'describe extended tablename' query
rows: List[agate.Row] = super().get_columns_in_relation(relation)
columns = self.parse_describe_extended(relation, rows)

# strip hudi metadata columns.
columns = [x for x in columns
if x.name not in self.HUDI_METADATA_COLUMNS]
return columns

def parse_columns_from_information(
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SparkRelation(BaseRelation):
include_policy: SparkIncludePolicy = SparkIncludePolicy()
quote_character: str = '`'
is_delta: Optional[bool] = None
is_hudi: Optional[bool] = None
information: str = None

def __post_init__(self):
Expand Down
13 changes: 12 additions & 1 deletion dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@

{% macro options_clause() -%}
{%- set options = config.get('options') -%}
{%- if config.get('file_format') == 'hudi' -%}
{%- set unique_key = config.get('unique_key') -%}
{%- if unique_key is not none and options is none -%}
{%- set options = {'primaryKey': config.get('unique_key')} -%}
{%- elif unique_key is not none and options is not none and 'primaryKey' not in options -%}
{%- set _ = options.update({'primaryKey': config.get('unique_key')}) -%}
{%- elif options is not none and 'primaryKey' in options and options['primaryKey'] != unique_key -%}
{{ exceptions.raise_compiler_error("unique_key and options('primaryKey') should be the same column(s).") }}
{%- endif %}
{%- endif %}

{%- if options is not none %}
options (
{%- for option in options -%}
Expand Down Expand Up @@ -181,7 +192,7 @@
{% endmacro %}

{% macro spark__alter_column_comment(relation, column_dict) %}
{% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %}
{% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'hudi'] %}
{% for column_name in column_dict %}
{% set comment = column_dict[column_name]['description'] %}
{% set escaped_comment = comment | replace('\'', '\\\'') %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro dbt_spark_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}

{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %}
{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %}

{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
Expand All @@ -26,7 +26,7 @@

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta'
You can only choose this strategy when file_format is set to 'delta' or 'hudi'
{%- endset %}

{% set invalid_insert_overwrite_delta_msg -%}
Expand All @@ -44,7 +44,7 @@
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format != 'delta' %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %}
Expand Down
8 changes: 4 additions & 4 deletions dbt/include/spark/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,18 @@
identifier=target_table,
type='table') -%}

{%- if file_format != 'delta' -%}
{%- if file_format not in ['delta', 'hudi'] -%}
{% set invalid_format_msg -%}
Invalid file format: {{ file_format }}
Snapshot functionality requires file_format be set to 'delta'
Snapshot functionality requires file_format be set to 'delta' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}

{%- if target_relation_exists -%}
{%- if not target_relation.is_delta -%}
{%- if not target_relation.is_delta and not target_relation.is_hudi -%}
{% set invalid_format_msg -%}
The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta'
The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ services:
volumes:
- ./.spark-warehouse/:/spark-warehouse/
- ./docker/hive-site.xml:/usr/spark/conf/hive-site.xml
- ./docker/spark-defaults.conf:/usr/spark/conf/spark-defaults.conf
environment:
- WAIT_FOR=dbt-hive-metastore:5432

Expand Down
4 changes: 4 additions & 0 deletions docker/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@
<value>dbt</value>
</property>

<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>
7 changes: 7 additions & 0 deletions docker/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
spark.hadoop.datanucleus.autoCreateTables true
spark.hadoop.datanucleus.schema.autoCreateTables true
spark.hadoop.datanucleus.fixedDatastore false
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.jars.packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.userClassPathFirst true
19 changes: 19 additions & 0 deletions tests/integration/incremental_strategies/models_hudi/append.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'append',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = 'id',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'hudi',
unique_key = 'id',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'hudi',
unique_key = 'id',
merge_update_columns = ['msg'],
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

-- msg will be updated, color will be ignored
select cast(2 as bigint) as id, 'yo' as msg, 'green' as color
union all
select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def project_config(self):
},
}

def seed_and_run_once(self):
self.run_dbt(["seed"])
self.run_dbt(["run"])

def seed_and_run_twice(self):
self.run_dbt(["seed"])
self.run_dbt(["run"])
Expand Down Expand Up @@ -77,6 +81,26 @@ def run_and_test(self):
def test_delta_strategies_databricks_cluster(self):
self.run_and_test()

# Uncomment this hudi integration test after the hudi 0.10.0 release to make it work.
# class TestHudiStrategies(TestIncrementalStrategies):
# @property
# def models(self):
# return "models_hudi"
#
# def run_and_test(self):
# self.seed_and_run_once()
# self.assertTablesEqual("append", "expected_append")
# self.assertTablesEqual("merge_no_key", "expected_append")
# self.assertTablesEqual("merge_unique_key", "expected_upsert")
# self.assertTablesEqual(
# "insert_overwrite_no_partitions", "expected_overwrite")
# self.assertTablesEqual(
# "insert_overwrite_partitions", "expected_upsert")
#
# @use_profile("apache_spark")
# def test_hudi_strategies_apache_spark(self):
# self.run_and_test()


class TestBadStrategies(TestIncrementalStrategies):
@property
Expand Down
24 changes: 24 additions & 0 deletions tests/integration/persist_docs/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,30 @@ models:
description: |
Some stuff here and then a call to
{{ doc('my_fun_doc')}}
- name: table_hudi_model
description: |
Table model description "with double quotes"
and with 'single quotes' as welll as other;
'''abc123'''
reserved -- characters
--
/* comment */
Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting
columns:
- name: id
description: |
id Column description "with double quotes"
and with 'single quotes' as welll as other;
'''abc123'''
reserved -- characters
--
/* comment */
Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting
- name: name
description: |
Some stuff here and then a call to
{{ doc('my_fun_doc')}}
- name: view_model
description: |
Expand Down
Loading

0 comments on commit 68a3b5a

Please sign in to comment.