Skip to content

Commit

Permalink
Add Dremio as a destination (#1026)
Browse files Browse the repository at this point in the history
* Add docker-compose.yml for Dremio

* bootstrap dremio in docker-compose.yml

* refactor dremio bootstrap

* Add dremio client dependency

* test adbc from separate container

* Add pydremio db api implementation

* Further development

* Initial dremio test

* Add description and rowcount to pydremio

* Initial INSERT working

* Passing test

* Clean up test

* Fixup some more issues

* Inject data source configuration

* Inject data source configuration

* Add flatten logic

* pyproject.toml

* Fix pyproject.toml

* Fix Dockerfile

* Fix a couple of problems

* Tidy up

* Add dremio.md

* Fix supported file formats in capabilities

* Add code to handle partition and localsort

* Add some tests around PARTITION and LOCALSORT

* Add some docs for partitions

* Update poetry.lock and fix lint errors

* Use DOUBLE instead of FLOAT

* Fix a few more tests

* Override CREATE TEMP TABLE queries as Dremio does not support TEMP tables

* Credit the original code in pydremio and reproduce Apache2 license.

* poetry.lock

* Refactor sqlalchemy ULR import

* Fix stage loading test

* Fix stage loading test

* Fix lint issues

* Ensure all standard tests are run and start fixing failures

* Fix COPY INTO command

* Escape "value"

* More fixes

* More fixes

* Only two failing tests left

* 1 Test failing

* Remove the flatten functionality

* Fix lint

* remove data_source config option

* Add some verbiage around the lack of CREATE SCHEMA

* Some fixes and add Dremio to staging destination configs

* Remove staging_credentials from DremioLoadJob

* Remove staging_credentials from DremioLoadJob

* update lockfile post merge

* add dremio test workflow

* fixing dremio tests

* fix docs code section types

* fix post devel merge linting errors

* ignore callarg for dremio config test

* Fix test_dremio_client.py

* make minio setup sleep a bit

* fix remaining test

* small refactor of sql job
small cleanups

* remove unneeded statement

* mark dremio as experimental

* reset active destinations

* revert client change and update test

* fix default order by

* merge fixes, dremio factory test

* configures dremio pipeline tests properly

* upgrades dremio ci workflow

* fixes local destinations ci workflow

---------

Co-authored-by: Firman, Max <max.firman@troweprice.com>
Co-authored-by: Dave <shrps@posteo.net>
Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
  • Loading branch information
4 people authored Apr 8, 2024
1 parent df39971 commit cf3e8fc
Show file tree
Hide file tree
Showing 34 changed files with 1,455 additions and 221 deletions.
89 changes: 89 additions & 0 deletions .github/workflows/test_destination_dremio.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@

name: test | dremio

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:
schedule:
- cron: '0 2 * * *'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
RUNTIME__SENTRY_DSN: https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752
RUNTIME__LOG_LEVEL: ERROR

ACTIVE_DESTINATIONS: "[\"dremio\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: test | dremio tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
run:
shell: bash
runs-on: "ubuntu-latest"

steps:

- name: Check out
uses: actions/checkout@master

- name: Start dremio
run: docker-compose -f "tests/load/dremio/docker-compose.yml" up -d

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/install-poetry@v1.3.2
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline

- run: |
poetry run pytest tests/load
if: runner.os != 'Windows'
name: Run tests Linux/MAC
env:
DESTINATION__DREMIO__CREDENTIALS: grpc://dremio:dremio123@localhost:32010/nas
DESTINATION__DREMIO__STAGING_DATA_SOURCE: minio
DESTINATION__FILESYSTEM__BUCKET_URL: s3://dlt-ci-test-bucket
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID: minioadmin
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY: minioadmin
DESTINATION__FILESYSTEM__CREDENTIALS__ENDPOINT_URL: http://127.0.0.1:9010
- run: |
poetry run pytest tests/load
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
- name: Stop dremio
if: always()
run: docker-compose -f "tests/load/dremio/docker-compose.yml" down -v
6 changes: 5 additions & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ concurrency:
cancel-in-progress: true

env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}
# NOTE: this workflow can't use github secrets!
# DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"

DESTINATION__WEAVIATE__VECTORIZER: text2vec-contextionary
DESTINATION__WEAVIATE__MODULE_CONFIG: "{\"text2vec-contextionary\": {\"vectorizeClassName\": false, \"vectorizePropertyName\": true}}"

jobs:
get_docs_changes:
name: docs changes
Expand Down
1 change: 1 addition & 0 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def escape_redshift_identifier(v: str) -> str:

escape_postgres_identifier = escape_redshift_identifier
escape_athena_identifier = escape_postgres_identifier
escape_dremio_identifier = escape_postgres_identifier


def escape_bigquery_identifier(v: str) -> str:
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dlt.destinations.impl.destination.factory import destination
from dlt.destinations.impl.synapse.factory import synapse
from dlt.destinations.impl.databricks.factory import databricks
from dlt.destinations.impl.dremio.factory import dremio


__all__ = [
Expand All @@ -30,5 +31,6 @@
"weaviate",
"synapse",
"databricks",
"dremio",
"destination",
]
27 changes: 27 additions & 0 deletions dlt/destinations/impl/dremio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.data_writers.escape import escape_dremio_identifier
from dlt.common.destination import DestinationCapabilitiesContext


def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = None
caps.supported_loader_file_formats = []
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["jsonl", "parquet"]
caps.escape_identifier = escape_dremio_identifier
caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0)
caps.max_identifier_length = 255
caps.max_column_identifier_length = 255
caps.max_query_length = 2 * 1024 * 1024
caps.is_max_query_length_in_bytes = True
caps.max_text_data_type_length = 16 * 1024 * 1024
caps.is_max_text_data_type_length_in_bytes = True
caps.supports_transactions = False
caps.supports_ddl_transactions = False
caps.alter_add_multi_column = True
caps.supports_clone_table = False
caps.supports_multiple_statements = False
caps.timestamp_precision = 3
return caps
43 changes: 43 additions & 0 deletions dlt/destinations/impl/dremio/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import dataclasses
from typing import Final, Optional, Any, Dict, ClassVar, List

from dlt.common.configuration import configspec
from dlt.common.configuration.specs import ConnectionStringCredentials
from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration
from dlt.common.libs.sql_alchemy import URL
from dlt.common.typing import TSecretStrValue
from dlt.common.utils import digest128


@configspec(init=False)
class DremioCredentials(ConnectionStringCredentials):
drivername: str = "grpc"
username: str = None
password: TSecretStrValue = None
host: str = None
port: Optional[int] = 32010
database: str = None

__config_gen_annotations__: ClassVar[List[str]] = ["port"]

def to_native_credentials(self) -> str:
return URL.create(
drivername=self.drivername, host=self.host, port=self.port
).render_as_string(hide_password=False)

def db_kwargs(self) -> Dict[str, Any]:
return dict(username=self.username, password=self.password)


@configspec
class DremioClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_type: Final[str] = dataclasses.field(default="dremio", init=False, repr=False, compare=False) # type: ignore[misc]
credentials: DremioCredentials = None
staging_data_source: str = None
"""The name of the staging data source"""

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string"""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""
Loading

0 comments on commit cf3e8fc

Please sign in to comment.