diff --git a/.bumpversion-dbt.cfg b/.bumpversion-dbt.cfg deleted file mode 100644 index f8ad313fe..000000000 --- a/.bumpversion-dbt.cfg +++ /dev/null @@ -1,26 +0,0 @@ -[bumpversion] -current_version = 0.21.0 -parse = (?P\d+) - \.(?P\d+) - \.(?P\d+) - ((?P[a-z]+)(?P\d+))? -serialize = - {major}.{minor}.{patch}{prerelease}{num} - {major}.{minor}.{patch} -commit = False -tag = False - -[bumpversion:part:prerelease] -first_value = a -values = - a - b - rc - -[bumpversion:part:num] -first_value = 1 - -[bumpversion:file:setup.py] - -[bumpversion:file:requirements.txt] - diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 6a532e056..dfa9e0260 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.21.0 +current_version = 1.0.0b1 parse = (?P\d+) \.(?P\d+) \.(?P\d+) @@ -27,4 +27,3 @@ first_value = 1 first_value = 1 [bumpversion:file:dbt/adapters/spark/__version__.py] - diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index 974c36afe..43f19a154 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -19,6 +19,7 @@ A clear and concise description of what you expected to happen. ### Screenshots and log output If applicable, add screenshots or log output to help explain your problem. +### System information **The output of `dbt --version`:** ``` diff --git a/.github/ISSUE_TEMPLATE/release.md b/.github/ISSUE_TEMPLATE/release.md new file mode 100644 index 000000000..a69349f54 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/release.md @@ -0,0 +1,10 @@ +--- +name: Release +about: Release a new version of dbt-spark +title: '' +labels: release +assignees: '' + +--- + +### TBD diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 000000000..2a6f34492 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,8 @@ +version: 2 +updates: + # python dependencies + - package-ecosystem: "pip" + directory: "/" + schedule: + interval: "daily" + rebase-strategy: "disabled" diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 45a73f84b..60e12779b 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -9,15 +9,13 @@ resolves # resolves #1234 --> - ### Description - ### Checklist - - [ ] I have signed the [CLA](https://docs.getdbt.com/docs/contributor-license-agreements) - - [ ] I have run this code in development and it appears to resolve the stated issue - - [ ] This PR includes tests, or tests are not required/relevant for this PR - - [ ] I have updated the `CHANGELOG.md` and added information about my change to the "dbt next" section. - \ No newline at end of file + +- [ ] I have signed the [CLA](https://docs.getdbt.com/docs/contributor-license-agreements) +- [ ] I have run this code in development and it appears to resolve the stated issue +- [ ] This PR includes tests, or tests are not required/relevant for this PR +- [ ] I have updated the `CHANGELOG.md` and added information about my change to the "dbt-spark next" section. \ No newline at end of file diff --git a/.github/scripts/integration-test-matrix.js b/.github/scripts/integration-test-matrix.js new file mode 100644 index 000000000..d6eb56442 --- /dev/null +++ b/.github/scripts/integration-test-matrix.js @@ -0,0 +1,95 @@ +module.exports = ({ context }) => { + const defaultPythonVersion = "3.8"; + const supportedPythonVersions = ["3.6", "3.7", "3.8", "3.9"]; + const supportedAdapters = ["apache_spark", "databricks_http", "databricks_cluster", "databricks_endpoint"]; + + // if PR, generate matrix based on files changed and PR labels + if (context.eventName.includes("pull_request")) { + // `changes` is a list of adapter names that have related + // file changes in the PR + // ex: ['postgres', 'snowflake'] + const changes = JSON.parse(process.env.CHANGES); + const labels = context.payload.pull_request.labels.map(({ name }) => name); + console.log("labels", labels); + console.log("changes", changes); + const testAllLabel = labels.includes("test all"); + const include = []; + + for (const adapter of supportedAdapters) { + if ( + changes.includes(adapter) || + testAllLabel || + labels.includes(`test ${adapter}`) + ) { + for (const pythonVersion of supportedPythonVersions) { + if ( + pythonVersion === defaultPythonVersion || + labels.includes(`test python${pythonVersion}`) || + testAllLabel + ) { + // always run tests on ubuntu by default + include.push({ + os: "ubuntu-latest", + adapter, + "python-version": pythonVersion, + }); + + if (labels.includes("test windows") || testAllLabel) { + include.push({ + os: "windows-latest", + adapter, + "python-version": pythonVersion, + }); + } + + if (labels.includes("test macos") || testAllLabel) { + include.push({ + os: "macos-latest", + adapter, + "python-version": pythonVersion, + }); + } + } + } + } + } + + console.log("matrix", { include }); + + return { + include, + }; + } + // if not PR, generate matrix of python version, adapter, and operating + // system to run integration tests on + + const include = []; + // run for all adapters and python versions on ubuntu + for (const adapter of supportedAdapters) { + for (const pythonVersion of supportedPythonVersions) { + include.push({ + os: 'ubuntu-latest', + adapter: adapter, + "python-version": pythonVersion, + }); + } + } + + // additionally include runs for all adapters, on macos and windows, + // but only for the default python version + for (const adapter of supportedAdapters) { + for (const operatingSystem of ["windows-latest", "macos-latest"]) { + include.push({ + os: operatingSystem, + adapter: adapter, + "python-version": defaultPythonVersion, + }); + } + } + + console.log("matrix", { include }); + + return { + include, + }; +}; diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml new file mode 100644 index 000000000..1ee43a615 --- /dev/null +++ b/.github/workflows/integration.yml @@ -0,0 +1,253 @@ +# **what?** +# This workflow runs all integration tests for supported OS +# and python versions and core adapters. If triggered by PR, +# the workflow will only run tests for adapters related +# to code changes. Use the `test all` and `test ${adapter}` +# label to run all or additional tests. Use `ok to test` +# label to mark PRs from forked repositories that are safe +# to run integration tests for. Requires secrets to run +# against different warehouses. + +# **why?** +# This checks the functionality of dbt from a user's perspective +# and attempts to catch functional regressions. + +# **when?** +# This workflow will run on every push to a protected branch +# and when manually triggered. It will also run for all PRs, including +# PRs from forks. The workflow will be skipped until there is a label +# to mark the PR as safe to run. + +name: Adapter Integration Tests + +on: + # pushes to release branches + push: + branches: + - "main" + - "develop" + - "*.latest" + - "releases/*" + # all PRs, important to note that `pull_request_target` workflows + # will run in the context of the target branch of a PR + pull_request: # TODO change back to pull_request_target + # manual trigger + workflow_dispatch: + # run this once per night to ensure no regressions from latest dbt-core changes + schedule: + - cron: '0 5 * * *' # 5 UTC + +# explicitly turn off permissions for `GITHUB_TOKEN` +permissions: read-all + +# will cancel previous workflows triggered by the same event and for the same ref for PRs or same SHA otherwise +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ contains(github.event_name, 'pull_request') && github.event.pull_request.head.ref || github.sha }} + cancel-in-progress: true + +# sets default shell to bash, for all operating systems +defaults: + run: + shell: bash + +jobs: + # generate test metadata about what files changed and the testing matrix to use + test-metadata: + # run if not a PR from a forked repository or has a label to mark as safe to test + if: >- + github.event_name != 'pull_request_target' || + github.event.pull_request.head.repo.full_name == github.repository || + contains(github.event.pull_request.labels.*.name, 'ok to test') + runs-on: ubuntu-latest + + outputs: + matrix: ${{ steps.generate-matrix.outputs.result }} + + steps: + - name: Check out the repository (non-PR) + if: github.event_name != 'pull_request_target' + uses: actions/checkout@v2 + with: + persist-credentials: false + + - name: Check out the repository (PR) + if: github.event_name == 'pull_request_target' + uses: actions/checkout@v2 + with: + persist-credentials: false + ref: ${{ github.event.pull_request.head.sha }} + + - name: Check if relevant files changed + # https://github.com/marketplace/actions/paths-changes-filter + # For each filter, it sets output variable named by the filter to the text: + # 'true' - if any of changed files matches any of filter rules + # 'false' - if none of changed files matches any of filter rules + # also, returns: + # `changes` - JSON array with names of all filters matching any of the changed files + uses: dorny/paths-filter@v2 + id: get-changes + with: + token: ${{ secrets.GITHUB_TOKEN }} + filters: | + apache_spark: + - 'dbt/**' + - 'tests/**' + databricks_http: + - 'dbt/**' + - 'tests/**' + databricks_cluster: + - 'dbt/**' + - 'tests/**' + databricks_endpoint: + - 'dbt/**' + - 'tests/**' + - name: Generate integration test matrix + id: generate-matrix + uses: actions/github-script@v4 + env: + CHANGES: ${{ steps.get-changes.outputs.changes }} + with: + script: | + const script = require('./.github/scripts/integration-test-matrix.js') + const matrix = script({ context }) + console.log(matrix) + return matrix + test: + name: ${{ matrix.adapter }} / python ${{ matrix.python-version }} / ${{ matrix.os }} + + # run if not a PR from a forked repository or has a label to mark as safe to test + # also checks that the matrix generated is not empty + if: >- + needs.test-metadata.outputs.matrix && + fromJSON( needs.test-metadata.outputs.matrix ).include[0] && + ( + github.event_name != 'pull_request_target' || + github.event.pull_request.head.repo.full_name == github.repository || + contains(github.event.pull_request.labels.*.name, 'ok to test') + ) + #runs-on: ${{ matrix.os }} + runs-on: ubuntu-latest + container: + image: fishtownanalytics/test-container:10 + # image based on `fishtownanalytics/test-container` w/ Simba ODBC Spark driver installed + # image: 828731156495.dkr.ecr.us-east-1.amazonaws.com/dbt-spark-odbc-test-container:latest + # credentials: + # aws_access_key_id: $AWS_ACCESS_KEY_ID_STAGING + # aws_secret_access_key: $AWS_SECRET_ACCESS_KEY_STAGING + + needs: test-metadata + + strategy: + fail-fast: false + matrix: ${{ fromJSON(needs.test-metadata.outputs.matrix) }} + + env: + TOXENV: integration-${{ matrix.adapter }} + PYTEST_ADDOPTS: "-v --color=yes -n4 --csv integration_results.csv" + DBT_INVOCATION_ENV: github-actions + # AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID_STAGING }} + # AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_ACCESS_KEY_ID_STAGING }} + + steps: + - name: Check out the repository + if: github.event_name != 'pull_request_target' + uses: actions/checkout@v2 + with: + persist-credentials: false + + # explicity checkout the branch for the PR, + # this is necessary for the `pull_request_target` event + - name: Check out the repository (PR) + if: github.event_name == 'pull_request_target' + uses: actions/checkout@v2 + with: + persist-credentials: false + ref: ${{ github.event.pull_request.head.sha }} + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Install python dependencies + run: | + pip install --user --upgrade pip + pip install tox + pip --version + tox --version + + docker: + - image: fishtownanalytics/test-container:10 + - image: godatadriven/spark:2 + environment: + WAIT_FOR: localhost:5432 + command: > + --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 + --name Thrift JDBC/ODBC Server + --conf spark.hadoop.javax.jdo.option.ConnectionURL=jdbc:postgresql://localhost/metastore + --conf spark.hadoop.javax.jdo.option.ConnectionUserName=dbt + --conf spark.hadoop.javax.jdo.option.ConnectionPassword=dbt + --conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver + + - name: Run tox (Apache Spark) + if: matrix.adapter == 'apache_spark' + run: tox + + - name: Run tox (Databricks HTTP) + if: matrix.adapter == 'databricks_http' + env: &databricks-creds + DBT_DATABRICKS_HOST_NAME: ${{ secrets.DBT_DATABRICKS_HOST_NAME }} + DBT_DATABRICKS_CLUSTER_NAME: ${{ secrets.DBT_DATABRICKS_CLUSTER_NAME }} + DBT_DATABRICKS_TOKEN: ${{ secrets.DBT_DATABRICKS_TOKEN }} + ODBC_DRIVER: Simba + run: tox + + - name: Run tox (Databricks HTTP) + if: matrix.adapter == 'databricks_cluster' + env: *databricks-creds + run: tox + + - name: Run tox (Databricks HTTP) + if: matrix.adapter == 'databricks_endpoint' + env: *databricks-creds + run: tox + + - uses: actions/upload-artifact@v2 + if: always() + with: + name: logs + path: ./logs + + - name: Get current date + if: always() + id: date + run: echo "::set-output name=date::$(date +'%Y-%m-%dT%H_%M_%S')" #no colons allowed for artifacts + + - uses: actions/upload-artifact@v2 + if: always() + with: + name: integration_results_${{ matrix.python-version }}_${{ matrix.os }}_${{ matrix.adapter }}-${{ steps.date.outputs.date }}.csv + path: integration_results.csv + + require-label-comment: + runs-on: ubuntu-latest + + needs: test + + permissions: + pull-requests: write + + steps: + - name: Needs permission PR comment + if: >- + needs.test.result == 'skipped' && + github.event_name == 'pull_request_target' && + github.event.pull_request.head.repo.full_name != github.repository + uses: unsplash/comment-on-pr@master + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + msg: | + "You do not have permissions to run integration tests, @dbt-labs/core "\ + "needs to label this PR with `ok to test` in order to run integration tests!" + check_for_duplicate_msg: true diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 000000000..23f620393 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,210 @@ +# **what?** +# Runs code quality checks, unit tests, and verifies python build on +# all code commited to the repository. This workflow should not +# require any secrets since it runs for PRs from forked repos. +# By default, secrets are not passed to workflows running from +# a forked repo. + +# **why?** +# Ensure code for dbt meets a certain quality standard. + +# **when?** +# This will run for all PRs, when code is pushed to a release +# branch, and when manually triggered. + +name: Tests and Code Checks + +on: + push: + branches: + - "main" + - "develop" + - "*.latest" + - "releases/*" + pull_request: + workflow_dispatch: + +permissions: read-all + +# will cancel previous workflows triggered by the same event and for the same ref for PRs or same SHA otherwise +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ contains(github.event_name, 'pull_request') && github.event.pull_request.head.ref || github.sha }} + cancel-in-progress: true + +defaults: + run: + shell: bash + +jobs: + code-quality: + name: ${{ matrix.toxenv }} + + runs-on: ubuntu-latest + container: + image: fishtownanalytics/test-container:10 + + strategy: + fail-fast: false + matrix: + toxenv: [flake8] + + env: + TOXENV: ${{ matrix.toxenv }} + PYTEST_ADDOPTS: "-v --color=yes" + + steps: + - name: Check out the repository + uses: actions/checkout@v2 + with: + persist-credentials: false + + - name: Set up Python + uses: actions/setup-python@v2 + + - name: Install python dependencies + run: | + pip install --user --upgrade pip + pip install tox + pip --version + tox --version + + - name: Run tox + run: tox + + unit: + name: unit test / python ${{ matrix.python-version }} + + runs-on: ubuntu-latest + container: + image: fishtownanalytics/test-container:10 + + strategy: + fail-fast: false + matrix: + python-version: [3.6, 3.7, 3.8] # TODO: support unit testing for python 3.9 (https://github.com/dbt-labs/dbt/issues/3689) + + env: + TOXENV: "unit" + PYTEST_ADDOPTS: "-v --color=yes --csv unit_results.csv" + + steps: + - name: Check out the repository + uses: actions/checkout@v2 + with: + persist-credentials: false + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Install python dependencies + run: | + pip install --user --upgrade pip + pip install tox + pip --version + tox --version + + - name: Run tox + run: tox + + - name: Get current date + if: always() + id: date + run: echo "::set-output name=date::$(date +'%Y-%m-%dT%H_%M_%S')" #no colons allowed for artifacts + + - uses: actions/upload-artifact@v2 + if: always() + with: + name: unit_results_${{ matrix.python-version }}-${{ steps.date.outputs.date }}.csv + path: unit_results.csv + + build: + name: build packages + + runs-on: ubuntu-latest + + steps: + - name: Check out the repository + uses: actions/checkout@v2 + with: + persist-credentials: false + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.8 + + - name: Install python dependencies + run: | + pip install --user --upgrade pip + pip install --upgrade setuptools wheel twine check-wheel-contents + pip --version + + - name: Build distributions + run: ./scripts/build-dist.sh + + - name: Show distributions + run: ls -lh dist/ + + - name: Check distribution descriptions + run: | + twine check dist/* + + - name: Check wheel contents + run: | + check-wheel-contents dist/*.whl --ignore W007,W008 + + - uses: actions/upload-artifact@v2 + with: + name: dist + path: dist/ + + test-build: + name: verify packages / python ${{ matrix.python-version }} / ${{ matrix.os }} + + needs: build + + runs-on: ${{ matrix.os }} + + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + python-version: [3.6, 3.7, 3.8, 3.9] + + steps: + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Install python dependencies + run: | + pip install --user --upgrade pip + pip install --upgrade wheel + pip --version + + - uses: actions/download-artifact@v2 + with: + name: dist + path: dist/ + + - name: Show distributions + run: ls -lh dist/ + + - name: Install wheel distributions + run: | + find ./dist/*.whl -maxdepth 1 -type f | xargs pip install --force-reinstall --find-links=dist/ + + - name: Check wheel distributions + run: | + dbt --version + + - name: Install source distributions + run: | + find ./dist/*.gz -maxdepth 1 -type f | xargs pip install --force-reinstall --find-links=dist/ + + - name: Check source distributions + run: | + dbt --version diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml deleted file mode 100644 index b25ea884e..000000000 --- a/.github/workflows/release.yml +++ /dev/null @@ -1,119 +0,0 @@ -# Builds the spark plugin and releases it to GitHub and Pypi -name: Build and Release - -on: - workflow_dispatch: - -# Release version number that must be updated for each release -env: - version_number: '0.20.0rc2' - -jobs: - Test: - runs-on: ubuntu-latest - steps: - - name: Setup Python - uses: actions/setup-python@v2.2.2 - with: - python-version: '3.8' - - - uses: actions/checkout@v2 - - - name: Test release - run: | - python3 -m venv env - source env/bin/activate - sudo apt-get install libsasl2-dev - pip install -r dev_requirements.txt - pip install twine wheel setuptools - python setup.py sdist bdist_wheel - pip install dist/dbt-spark-*.tar.gz - pip install dist/dbt_spark-*-py3-none-any.whl - twine check dist/dbt_spark-*-py3-none-any.whl dist/dbt-spark-*.tar.gz - - GitHubRelease: - name: GitHub release - runs-on: ubuntu-latest - needs: Test - steps: - - name: Setup Python - uses: actions/setup-python@v2.2.2 - with: - python-version: '3.8' - - - uses: actions/checkout@v2 - - - name: Bumping version - run: | - python3 -m venv env - source env/bin/activate - sudo apt-get install libsasl2-dev - pip install -r dev_requirements.txt - bumpversion --config-file .bumpversion-dbt.cfg patch --new-version ${{env.version_number}} - bumpversion --config-file .bumpversion.cfg patch --new-version ${{env.version_number}} --allow-dirty - git status - - - name: Commit version bump and tag - uses: EndBug/add-and-commit@v7 - with: - author_name: 'Leah Antkiewicz' - author_email: 'leah.antkiewicz@dbtlabs.com' - message: 'Bumping version to ${{env.version_number}}' - tag: v${{env.version_number}} - - # Need to set an output variable because env variables can't be taken as input - # This is needed for the next step with releasing to GitHub - - name: Find release type - id: release_type - env: - IS_PRERELEASE: ${{ contains(env.version_number, 'rc') || contains(env.version_number, 'b') }} - run: | - echo ::set-output name=isPrerelease::$IS_PRERELEASE - - - name: Create GitHub release - uses: actions/create-release@v1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # This token is provided by Actions, you do not need to create your own token - with: - tag_name: v${{env.version_number}} - release_name: dbt-spark v${{env.version_number}} - prerelease: ${{ steps.release_type.outputs.isPrerelease }} - body: | - Tracking [dbt-core v${{env.version_number}}](https://github.com/dbt-labs/dbt/releases/tag/v${{env.version_number}}). - - ```sh - $ pip install dbt-spark==${{env.version_number}} - # or - $ pip install "dbt-spark[ODBC]==${{env.version_number}}" - # or - $ pip install "dbt-spark[PyHive]==${{env.version_number}}" - ``` - - PypiRelease: - name: Pypi release - runs-on: ubuntu-latest - needs: GitHubRelease - environment: PypiProd - steps: - - name: Setup Python - uses: actions/setup-python@v2.2.2 - with: - python-version: '3.8' - - - uses: actions/checkout@v2 - with: - ref: v${{env.version_number}} - - - name: Release to pypi - env: - TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} - TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} - run: | - python3 -m venv env - source env/bin/activate - sudo apt-get install libsasl2-dev - pip install -r dev_requirements.txt - pip install twine wheel setuptools - python setup.py sdist bdist_wheel - twine upload --non-interactive dist/dbt_spark-${{env.version_number}}-py3-none-any.whl dist/dbt-spark-${{env.version_number}}.tar.gz - diff --git a/.github/workflows/version-bump.yml b/.github/workflows/version-bump.yml new file mode 100644 index 000000000..4913a6e84 --- /dev/null +++ b/.github/workflows/version-bump.yml @@ -0,0 +1,102 @@ +# **what?** +# This workflow will take a version number and a dry run flag. With that +# it will run versionbump to update the version number everywhere in the +# code base and then generate an update Docker requirements file. If this +# is a dry run, a draft PR will open with the changes. If this isn't a dry +# run, the changes will be committed to the branch this is run on. + +# **why?** +# This is to aid in releasing dbt and making sure we have updated +# the versions and Docker requirements in all places. + +# **when?** +# This is triggered either manually OR +# from the repository_dispatch event "version-bump" which is sent from +# the dbt-release repo Action + +name: Version Bump + +on: + workflow_dispatch: + inputs: + version_number: + description: 'The version number to bump to' + required: true + is_dry_run: + description: 'Creates a draft PR to allow testing instead of committing to a branch' + required: true + default: 'true' + repository_dispatch: + types: [version-bump] + +jobs: + bump: + runs-on: ubuntu-latest + steps: + - name: Check out the repository + uses: actions/checkout@v2 + + - name: Set version and dry run values + id: variables + env: + VERSION_NUMBER: "${{ github.event.client_payload.version_number == '' && github.event.inputs.version_number || github.event.client_payload.version_number }}" + IS_DRY_RUN: "${{ github.event.client_payload.is_dry_run == '' && github.event.inputs.is_dry_run || github.event.client_payload.is_dry_run }}" + run: | + echo Repository dispatch event version: ${{ github.event.client_payload.version_number }} + echo Repository dispatch event dry run: ${{ github.event.client_payload.is_dry_run }} + echo Workflow dispatch event version: ${{ github.event.inputs.version_number }} + echo Workflow dispatch event dry run: ${{ github.event.inputs.is_dry_run }} + echo ::set-output name=VERSION_NUMBER::$VERSION_NUMBER + echo ::set-output name=IS_DRY_RUN::$IS_DRY_RUN + + - uses: actions/setup-python@v2 + with: + python-version: "3.8" + + - name: Install python dependencies + run: | + python3 -m venv env + source env/bin/activate + pip install --upgrade pip + + - name: Create PR branch + if: ${{ steps.variables.outputs.IS_DRY_RUN == 'true' }} + run: | + git checkout -b bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_$GITHUB_RUN_ID + git push origin bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_$GITHUB_RUN_ID + git branch --set-upstream-to=origin/bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_$GITHUB_RUN_ID bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_$GITHUB_RUN_ID + + - name: Bumping version + run: | + source env/bin/activate + pip install -r dev_requirements.txt + env/bin/bumpversion --allow-dirty --new-version ${{steps.variables.outputs.VERSION_NUMBER}} major + git status + + - name: Commit version bump directly + uses: EndBug/add-and-commit@v7 + if: ${{ steps.variables.outputs.IS_DRY_RUN == 'false' }} + with: + author_name: 'Github Build Bot' + author_email: 'buildbot@fishtownanalytics.com' + message: 'Bumping version to ${{steps.variables.outputs.VERSION_NUMBER}}' + + - name: Commit version bump to branch + uses: EndBug/add-and-commit@v7 + if: ${{ steps.variables.outputs.IS_DRY_RUN == 'true' }} + with: + author_name: 'Github Build Bot' + author_email: 'buildbot@fishtownanalytics.com' + message: 'Bumping version to ${{steps.variables.outputs.VERSION_NUMBER}}' + branch: 'bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_${{GITHUB.RUN_ID}}' + push: 'origin origin/bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_${{GITHUB.RUN_ID}}' + + - name: Create Pull Request + uses: peter-evans/create-pull-request@v3 + if: ${{ steps.variables.outputs.IS_DRY_RUN == 'true' }} + with: + author: 'Github Build Bot ' + draft: true + base: ${{github.ref}} + title: 'Bumping version to ${{steps.variables.outputs.VERSION_NUMBER}}' + branch: 'bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_${{GITHUB.RUN_ID}}' diff --git a/README.md b/README.md index c330afde5..58516a1e3 100644 --- a/README.md +++ b/README.md @@ -1,279 +1,39 @@

- dbt logo + dbt logo

- - CircleCI + + Unit Tests Badge - - Slack + + Integration Tests Badge

-# dbt-spark +**[dbt](https://www.getdbt.com/)** enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications. -This plugin ports [dbt](https://getdbt.com) functionality to Spark. It supports running dbt against Spark clusters that are hosted via Databricks (AWS + Azure), Amazon EMR, or Docker. +dbt is the T in ELT. Organize, cleanse, denormalize, filter, rename, and pre-aggregate the raw data in your warehouse so that it's ready for analysis. -We have not tested extensively against older versions of Apache Spark. The plugin uses syntax that requires version 2.2.0 or newer. Some features require Spark 3.0 and/or Delta Lake. +## dbt-snowflake -### Documentation -For more information on using Spark with dbt, consult the dbt documentation: -- [Spark profile](https://docs.getdbt.com/reference/warehouse-profiles/spark-profile/) -- [Spark specific configs](https://docs.getdbt.com/reference/resource-configs/spark-configs/) +The `dbt-spark` package contains all of the code enabling dbt to work with Apache Spark and Databricks. For +more information, consult [the docs](https://docs.getdbt.com/docs/profile-spark). -### Installation -This plugin can be installed via pip. Depending on your connection method, you need to specify an extra requirement. +## Getting started -If connecting to Databricks via ODBC driver, it requires [`pyodbc`](https://github.com/mkleehammer/pyodbc). Depending on your system1, you can install it seperately or via pip: +- [Install dbt](https://docs.getdbt.com/docs/installation) +- Read the [introduction](https://docs.getdbt.com/docs/introduction/) and [viewpoint](https://docs.getdbt.com/docs/about/viewpoint/) -```bash -# Install dbt-spark from PyPi for odbc connections: -$ pip install "dbt-spark[ODBC]" -``` +## Join the dbt Community -If connecting to a Spark cluster via the generic `thrift` or `http` methods, it requires [`PyHive`](https://github.com/dropbox/PyHive): +- Be part of the conversation in the [dbt Community Slack](http://community.getdbt.com/) +- Read more on the [dbt Community Discourse](https://discourse.getdbt.com) -```bash -# Install dbt-spark from PyPi for thrift or http connections: -$ pip install "dbt-spark[PyHive]" -``` +## Reporting bugs and contributing code -1See the [`pyodbc` wiki](https://github.com/mkleehammer/pyodbc/wiki/Install) for OS-specific installation details. - - -### Configuring your profile - -**Connection Method** - -Connections can be made to Spark in three different modes: -- `odbc` is the preferred mode when connecting to Databricks. It supports connecting to a SQL Endpoint or an all-purpose interactive cluster. -- `http` is a more generic mode for connecting to a managed service that provides an HTTP endpoint. Currently, this includes connections to a Databricks interactive cluster. -- `thrift` connects directly to the lead node of a cluster, either locally hosted / on premise or in the cloud (e.g. Amazon EMR). - -A dbt profile for Spark connections support the following configurations: - -**Key**: -- ✅ Required -- ❌ Not used -- ❔ Optional (followed by `default value` in parentheses) - -| Option | Description | ODBC | Thrift | HTTP | Example | -|-|-|-|-|-|-| -| method | Specify the connection method (`odbc` or `thrift` or `http`) | ✅ | ✅ | ✅ | `odbc` | -| schema | Specify the schema (database) to build models into | ✅ | ✅ | ✅ | `analytics` | -| host | The hostname to connect to | ✅ | ✅ | ✅ | `yourorg.sparkhost.com` | -| port | The port to connect to the host on | ❔ (`443`) | ❔ (`443`) | ❔ (`10001`) | `443` | -| token | The token to use for authenticating to the cluster | ✅ | ❌ | ✅ | `abc123` | -| auth | The value of `hive.server2.authentication` | ❌ | ❔ | ❌ | `KERBEROS` | -| kerberos_service_name | Use with `auth='KERBEROS'` | ❌ | ❔ | ❌ | `hive` | -| organization | Azure Databricks workspace ID (see note) | ❔ | ❌ | ❔ | `1234567891234567` | -| cluster | The name of the cluster to connect to | ✅ (unless `endpoint`) | ❌ | ✅ | `01234-23423-coffeetime` | -| endpoint | The ID of the SQL endpoint to connect to | ✅ (unless `cluster`) | ❌ | ❌ | `1234567891234a` | -| driver | Path of ODBC driver installed or name of the ODBC driver configured | ✅ | ❌ | ❌ | `/opt/simba/spark/lib/64/libsparkodbc_sb64.so` | -| user | The username to use to connect to the cluster | ❔ | ❔ | ❔ | `hadoop` | -| connect_timeout | The number of seconds to wait before retrying to connect to a Pending Spark cluster | ❌ | ❔ (`10`) | ❔ (`10`) | `60` | -| connect_retries | The number of times to try connecting to a Pending Spark cluster before giving up | ❌ | ❔ (`0`) | ❔ (`0`) | `5` | -| use_ssl | The value of `hive.server2.use.SSL` (`True` or `False`). Default ssl store (ssl.get_default_verify_paths()) is the valid location for SSL certificate | ❌ | ❔ (`False`) | ❌ | `True` | -| retry_all | Whether to retry all failed connections, and not just 'retryable' ones | ❌ | ❔ (`false`) | ❔ (`false`) | `false` | - -**Databricks** connections differ based on the cloud provider: - -- **Organization:** To connect to an Azure Databricks cluster, you will need to obtain your organization ID, which is a unique ID Azure Databricks generates for each customer workspace. To find the organization ID, see https://docs.microsoft.com/en-us/azure/databricks/dev-tools/databricks-connect#step-2-configure-connection-properties. This is a string field; if there is a leading zero, be sure to include it. - -- **Host:** The host field for Databricks can be found at the start of your workspace or cluster url: `region.azuredatabricks.net` for Azure, or `account.cloud.databricks.com` for AWS. Do not include `https://`. - -**Amazon EMR**: To connect to Spark running on an Amazon EMR cluster, you will need to run `sudo /usr/lib/spark/sbin/start-thriftserver.sh` on the master node of the cluster to start the Thrift server (see https://aws.amazon.com/premiumsupport/knowledge-center/jdbc-connection-emr/ for further context). You will also need to connect to port `10001`, which will connect to the Spark backend Thrift server; port `10000` will instead connect to a Hive backend, which will not work correctly with dbt. - - -**Example profiles.yml entries:** - -**ODBC** -``` -your_profile_name: - target: dev - outputs: - dev: - type: spark - method: odbc - driver: path/to/driver - host: yourorg.databricks.com - organization: 1234567891234567 # Azure Databricks only - port: 443 # default - token: abc123 - schema: analytics - - # one of: - cluster: 01234-23423-coffeetime - endpoint: coffee01234time -``` - -**Thrift** -``` -your_profile_name: - target: dev - outputs: - dev: - type: spark - method: thrift - host: 127.0.0.1 - port: 10001 # default - schema: analytics - - # optional - user: hadoop - auth: KERBEROS - kerberos_service_name: hive - connect_retries: 5 - connect_timeout: 60 - retry_all: true -``` - - -**HTTP** -``` -your_profile_name: - target: dev - outputs: - dev: - type: spark - method: http - host: yourorg.sparkhost.com - organization: 1234567891234567 # Azure Databricks only - port: 443 # default - token: abc123 - schema: analytics - cluster: 01234-23423-coffeetime - - # optional - connect_retries: 5 - connect_timeout: 60 - retry_all: true -``` - - -### Usage Notes - -**Model Configuration** - -The following configurations can be supplied to models run with the dbt-spark plugin: - - -| Option | Description | Required? | Example | -| -------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- | -------------------- | -| file_format | The file format to use when creating tables (`parquet`, `delta`, `csv`, `json`, `text`, `jdbc`, `orc`, `hive` or `libsvm`). | Optional | `parquet` | -| location_root | The created table uses the specified directory to store its data. The table alias is appended to it. | Optional | `/mnt/root` | -| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` | -| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` | -| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` | -| incremental_strategy | The strategy to use for incremental models (`append`, `insert_overwrite`, or `merge`). | Optional (default: `append`) | `merge` | -| persist_docs | Whether dbt should include the model description as a table or column `comment` | Optional | `{'relation': true, 'columns': true}` | - - -**Incremental Models** - -dbt has a number of ways to build models incrementally, called "incremental strategies." Some strategies depend on certain file formats, connection types, and other model configurations: -- `append` (default): Insert new records without updating or overwriting any existing data. -- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta` or when connectinng via Databricks SQL Endpoints. For dynamic partition replacement with `method: odbc` + Databricks `cluster`, you must you **must** include `set spark.sql.sources.partitionOverwriteMode DYNAMIC` in the [cluster SparkConfig](https://docs.databricks.com/clusters/configure.html#spark-config). For atomic replacement of Delta tables, use the `table` materialization instead.] -- `merge`: Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) [Requires `file_format: delta`. Available only on Databricks Runtime.] - -Examples: - -```sql -{{ config( - materialized='incremental', - incremental_strategy='append', -) }} - - --- All rows returned by this query will be appended to the existing table - -select * from {{ ref('events') }} -{% if is_incremental() %} - where event_ts > (select max(event_ts) from {{ this }}) -{% endif %} -``` - -```sql -{{ config( - materialized='incremental', - incremental_strategy='merge', - partition_by=['date_day'], - file_format='parquet' -) }} - --- Every partition returned by this query will overwrite existing partitions - -select - date_day, - count(*) as users - -from {{ ref('events') }} -{% if is_incremental() %} - where date_day > (select max(date_day) from {{ this }}) -{% endif %} -group by 1 -``` - -```sql -{{ config( - materialized='incremental', - incremental_strategy='merge', - unique_key='event_id', - file_format='delta' -) }} - --- Existing events, matched on `event_id`, will be updated --- New events will be appended - -select * from {{ ref('events') }} -{% if is_incremental() %} - where date_day > (select max(date_day) from {{ this }}) -{% endif %} -``` - -### Running locally - -A `docker-compose` environment starts a Spark Thrift server and a Postgres database as a Hive Metastore backend. - -``` -docker-compose up -``` - -Create a profile like this one: - -``` -spark-testing: - target: local - outputs: - local: - type: spark - method: thrift - host: 127.0.0.1 - port: 10000 - user: dbt - schema: analytics - connect_retries: 5 - connect_timeout: 60 - retry_all: true -``` - -Connecting to the local spark instance: - -* The Spark UI should be available at [http://localhost:4040/sqlserver/](http://localhost:4040/sqlserver/) -* The endpoint for SQL-based testing is at `http://localhost:10000` and can be referenced with the Hive or Spark JDBC drivers using connection string `jdbc:hive2://localhost:10000` and default credentials `dbt`:`dbt` - -Note that the Hive metastore data is persisted under `./.hive-metastore/`, and the Spark-produced data under `./.spark-warehouse/`. To completely reset you environment run the following: - -``` -docker-compose down -rm -rf ./.hive-metastore/ -rm -rf ./.spark-warehouse/ -``` - -### Reporting bugs and contributing code - -- Want to report a bug or request a feature? Let us know on [Slack](http://slack.getdbt.com/), or open [an issue](https://github.com/fishtown-analytics/dbt-spark/issues/new). +- Want to report a bug or request a feature? Let us know on [Slack](http://community.getdbt.com/), or open [an issue](https://github.com/dbt-labs/dbt-snowflake/issues/new) +- Want to help us build dbt? Check out the [Contributing Guide](https://github.com/dbt-labs/dbt/blob/HEAD/CONTRIBUTING.md) ## Code of Conduct -Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the [PyPA Code of Conduct](https://www.pypa.io/en/latest/code-of-conduct/). +Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the [dbt Code of Conduct](https://community.getdbt.com/code-of-conduct). diff --git a/dbt/adapters/spark/__version__.py b/dbt/adapters/spark/__version__.py index 025ca2353..affc65fe4 100644 --- a/dbt/adapters/spark/__version__.py +++ b/dbt/adapters/spark/__version__.py @@ -1 +1 @@ -version = "0.21.0" +version = "1.0.0b1" diff --git a/dev_requirements.txt b/dev_requirements.txt index 95e4df5ed..2bffe09cf 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,3 +1,7 @@ +# install latest changes in dbt-core +# TODO: how to automate switching from develop to version branches? +git+https://github.com/dbt-labs/dbt.git#egg=dbt-core&subdirectory=core + freezegun==0.3.9 pytest==6.0.2 mock>=1.3.0 @@ -10,6 +14,7 @@ pytest-xdist>=2.1.0,<3 flaky>=3.5.3,<4 # Test requirements -pytest-dbt-adapter==0.5.1 +#pytest-dbt-adapter==0.5.1 +git+https://github.com/dbt-labs/dbt-adapter-tests.git#egg=pytest-dbt-adapter sasl==0.2.1 thrift_sasl==0.4.1 diff --git a/etc/dbt-logo-full.svg b/etc/dbt-logo-full.svg deleted file mode 100644 index 88f84b700..000000000 --- a/etc/dbt-logo-full.svg +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file diff --git a/scripts/build-dist.sh b/scripts/build-dist.sh new file mode 100644 index 000000000..65e6dbc97 --- /dev/null +++ b/scripts/build-dist.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +set -eo pipefail + +DBT_PATH="$( cd "$(dirname "$0")/.." ; pwd -P )" + +PYTHON_BIN=${PYTHON_BIN:-python} + +echo "$PYTHON_BIN" + +set -x + +rm -rf "$DBT_PATH"/dist +rm -rf "$DBT_PATH"/build +mkdir -p "$DBT_PATH"/dist + +cd "$DBT_PATH" +$PYTHON_BIN setup.py sdist bdist_wheel + +set +x diff --git a/setup.py b/setup.py index f0f098fda..41ba852b4 100644 --- a/setup.py +++ b/setup.py @@ -1,41 +1,65 @@ #!/usr/bin/env python -from setuptools import find_namespace_packages, setup import os +import sys import re +# require python 3.6 or newer +if sys.version_info < (3, 6): + print('Error: dbt does not support this version of Python.') + print('Please upgrade to Python 3.6 or higher.') + sys.exit(1) + +# require version of setuptools that supports find_namespace_packages +from setuptools import setup +try: + from setuptools import find_namespace_packages +except ImportError: + # the user has a downlevel version of setuptools. + print('Error: dbt requires setuptools v40.1.0 or higher.') + print('Please upgrade setuptools with "pip install --upgrade setuptools" ' + 'and try again') + sys.exit(1) + + +# pull long description from README this_directory = os.path.abspath(os.path.dirname(__file__)) with open(os.path.join(this_directory, 'README.md')) as f: long_description = f.read() -package_name = "dbt-spark" - - -# get this from a separate file -def _dbt_spark_version(): +# get this package's version from dbt/adapters//__version__.py +def _get_plugin_version_dict(): _version_path = os.path.join( this_directory, 'dbt', 'adapters', 'spark', '__version__.py' ) - _version_pattern = r'''version\s*=\s*["'](.+)["']''' + _semver = r'''(?P\d+)\.(?P\d+)\.(?P\d+)''' + _pre = r'''((?Pa|b|rc)(?P
\d+))?'''
+    _version_pattern = fr'''version\s*=\s*["']{_semver}{_pre}["']'''
     with open(_version_path) as f:
         match = re.search(_version_pattern, f.read().strip())
         if match is None:
             raise ValueError(f'invalid version at {_version_path}')
-        return match.group(1)
+        return match.groupdict()
 
 
-package_version = _dbt_spark_version()
-description = """The SparkSQL plugin for dbt (data build tool)"""
+def _get_plugin_version():
+    parts = _get_plugin_version_dict()
+    return "{major}.{minor}.{patch}{prekind}{pre}".format(**parts)
 
-dbt_version = '0.21.0'
-# the package version should be the dbt version, with maybe some things on the
-# ends of it. (0.21.0 vs 0.21.0a1, 0.21.0.1, ...)
-if not package_version.startswith(dbt_version):
-    raise ValueError(
-        f'Invalid setup.py: package_version={package_version} must start with '
-        f'dbt_version={dbt_version}'
-    )
+
+# require a compatible minor version (~=), prerelease if this is a prerelease
+def _get_dbt_core_version():
+    parts = _get_plugin_version_dict()
+    minor = "{major}.{minor}.0".format(**parts)
+    pre = (parts["prekind"]+"1" if parts["prekind"] else "")
+    return f"{minor}{pre}"
+
+
+package_name = "dbt-spark"
+package_version = _get_plugin_version()
+dbt_core_version = _get_dbt_core_version()
+description = """The Apache Spark adapter plugin for dbt"""
 
 odbc_extras = ['pyodbc>=4.0.30']
 pyhive_extras = [
@@ -52,14 +76,14 @@ def _dbt_spark_version():
     long_description=long_description,
     long_description_content_type='text/markdown',
 
-    author='Fishtown Analytics',
-    author_email='info@fishtownanalytics.com',
-    url='https://github.com/fishtown-analytics/dbt-spark',
+    author='dbt Labs',
+    author_email='info@dbtlabs.com',
+    url='https://github.com/dbt-labs/dbt-spark',
 
     packages=find_namespace_packages(include=['dbt', 'dbt.*']),
     include_package_data=True,
     install_requires=[
-        f'dbt-core=={dbt_version}',
+        'dbt-core~={}'.format(dbt_core_version),
         'sqlparams>=3.0.0',
     ],
     extras_require={
diff --git a/test/custom/base.py b/test/custom/base.py
deleted file mode 100644
index 28fcad3cc..000000000
--- a/test/custom/base.py
+++ /dev/null
@@ -1,163 +0,0 @@
-import pytest
-from functools import wraps
-import os
-from dbt_adapter_tests import DBTIntegrationTestBase
-import pyodbc
-
-
-class DBTSparkIntegrationTest(DBTIntegrationTestBase):
-
-    def get_profile(self, adapter_type):
-        if adapter_type == 'apache_spark':
-            return self.apache_spark_profile()
-        elif adapter_type == 'databricks_cluster':
-            return self.databricks_cluster_profile()
-        elif adapter_type == 'databricks_sql_endpoint':
-            return self.databricks_sql_endpoint_profile()
-        else:
-            raise ValueError('invalid adapter type {}'.format(adapter_type))
-
-    @staticmethod
-    def _profile_from_test_name(test_name):
-        adapter_names = ('apache_spark', 'databricks_cluster',
-                         'databricks_sql_endpoint')
-        adapters_in_name = sum(x in test_name for x in adapter_names)
-        if adapters_in_name != 1:
-            raise ValueError(
-                'test names must have exactly 1 profile choice embedded, {} has {}'
-                .format(test_name, adapters_in_name)
-            )
-
-        for adapter_name in adapter_names:
-            if adapter_name in test_name:
-                return adapter_name
-
-        raise ValueError(
-            'could not find adapter name in test name {}'.format(test_name)
-        )
-
-    def run_sql(self, query, fetch='None', kwargs=None, connection_name=None):
-        if connection_name is None:
-            connection_name = '__test'
-
-        if query.strip() == "":
-            return
-
-        sql = self.transform_sql(query, kwargs=kwargs)
-
-        with self.get_connection(connection_name) as conn:
-            cursor = conn.handle.cursor()
-            try:
-                cursor.execute(sql)
-                if fetch == 'one':
-                    return cursor.fetchall()[0]
-                elif fetch == 'all':
-                    return cursor.fetchall()
-                else:
-                    # we have to fetch.
-                    cursor.fetchall()
-            except pyodbc.ProgrammingError as e:
-                # hacks for dropping schema
-                if "No results.  Previous SQL was not a query." not in str(e):
-                    raise e
-            except Exception as e:
-                conn.handle.rollback()
-                conn.transaction_open = False
-                print(sql)
-                print(e)
-                raise
-            else:
-                conn.transaction_open = False
-
-    def apache_spark_profile(self):
-        return {
-            'config': {
-                'send_anonymous_usage_stats': False
-            },
-            'test': {
-                'outputs': {
-                    'thrift': {
-                        'type': 'spark',
-                        'host': 'localhost',
-                        'user': 'dbt',
-                        'method': 'thrift',
-                        'port': 10000,
-                        'connect_retries': 5,
-                        'connect_timeout': 60,
-                        'schema': self.unique_schema()
-                    },
-                },
-                'target': 'thrift'
-            }
-        }
-
-    def databricks_cluster_profile(self):
-        return {
-            'config': {
-                'send_anonymous_usage_stats': False
-            },
-            'test': {
-                'outputs': {
-                    'cluster': {
-                        'type': 'spark',
-                        'method': 'odbc',
-                        'host': os.getenv('DBT_DATABRICKS_HOST_NAME'),
-                        'cluster': os.getenv('DBT_DATABRICKS_CLUSTER_NAME'),
-                        'token': os.getenv('DBT_DATABRICKS_TOKEN'),
-                        'driver': os.getenv('ODBC_DRIVER'),
-                        'port': 443,
-                        'schema': self.unique_schema()
-                    },
-                },
-                'target': 'cluster'
-            }
-        }
-
-    def databricks_sql_endpoint_profile(self):
-        return {
-            'config': {
-                'send_anonymous_usage_stats': False
-            },
-            'test': {
-                'outputs': {
-                    'endpoint': {
-                        'type': 'spark',
-                        'method': 'odbc',
-                        'host': os.getenv('DBT_DATABRICKS_HOST_NAME'),
-                        'endpoint': os.getenv('DBT_DATABRICKS_ENDPOINT'),
-                        'token': os.getenv('DBT_DATABRICKS_TOKEN'),
-                        'driver': os.getenv('ODBC_DRIVER'),
-                        'port': 443,
-                        'schema': self.unique_schema()
-                    },
-                },
-                'target': 'endpoint'
-            }
-        }
-
-
-def use_profile(profile_name):
-    """A decorator to declare a test method as using a particular profile.
-    Handles both setting the nose attr and calling self.use_profile.
-
-    Use like this:
-
-    class TestSomething(DBIntegrationTest):
-        @use_profile('postgres')
-        def test_postgres_thing(self):
-            self.assertEqual(self.adapter_type, 'postgres')
-
-        @use_profile('snowflake')
-        def test_snowflake_thing(self):
-            self.assertEqual(self.adapter_type, 'snowflake')
-    """
-    def outer(wrapped):
-        @getattr(pytest.mark, 'profile_'+profile_name)
-        @wraps(wrapped)
-        def func(self, *args, **kwargs):
-            return wrapped(self, *args, **kwargs)
-        # sanity check at import time
-        assert DBTSparkIntegrationTest._profile_from_test_name(
-            wrapped.__name__) == profile_name
-        return func
-    return outer
diff --git a/test/__init__.py b/tests/__init__.py
similarity index 100%
rename from test/__init__.py
rename to tests/__init__.py
diff --git a/test/custom/__init__.py b/tests/integration/__init__.py
similarity index 100%
rename from test/custom/__init__.py
rename to tests/integration/__init__.py
diff --git a/tests/integration/base.py b/tests/integration/base.py
new file mode 100644
index 000000000..277e85a7b
--- /dev/null
+++ b/tests/integration/base.py
@@ -0,0 +1,953 @@
+import json
+import os
+import io
+import random
+import shutil
+import sys
+import tempfile
+import traceback
+import unittest
+from contextlib import contextmanager
+from datetime import datetime
+from functools import wraps
+
+import pytest
+import yaml
+from unittest.mock import patch
+
+import dbt.main as dbt
+from dbt import flags
+from dbt.deprecations import reset_deprecations
+from dbt.adapters.factory import get_adapter, reset_adapters, register_adapter
+from dbt.clients.jinja import template_cache
+from dbt.config import RuntimeConfig
+from dbt.context import providers
+from dbt.logger import GLOBAL_LOGGER as logger, log_manager
+from dbt.contracts.graph.manifest import Manifest
+
+
+INITIAL_ROOT = os.getcwd()
+
+
+def normalize(path):
+    """On windows, neither is enough on its own:
+
+    >>> normcase('C:\\documents/ALL CAPS/subdir\\..')
+    'c:\\documents\\all caps\\subdir\\..'
+    >>> normpath('C:\\documents/ALL CAPS/subdir\\..')
+    'C:\\documents\\ALL CAPS'
+    >>> normpath(normcase('C:\\documents/ALL CAPS/subdir\\..'))
+    'c:\\documents\\all caps'
+    """
+    return os.path.normcase(os.path.normpath(path))
+
+
+class Normalized:
+    def __init__(self, value):
+        self.value = value
+
+    def __repr__(self):
+        return f'Normalized({self.value!r})'
+
+    def __str__(self):
+        return f'Normalized({self.value!s})'
+
+    def __eq__(self, other):
+        return normalize(self.value) == normalize(other)
+
+
+class FakeArgs:
+    def __init__(self):
+        self.threads = 1
+        self.defer = False
+        self.full_refresh = False
+        self.models = None
+        self.select = None
+        self.exclude = None
+        self.single_threaded = False
+        self.selector_name = None
+        self.state = None
+        self.defer = None
+
+
+class TestArgs:
+    def __init__(self, kwargs):
+        self.which = 'run'
+        self.single_threaded = False
+        self.profiles_dir = None
+        self.project_dir = None
+        self.__dict__.update(kwargs)
+
+
+def _profile_from_test_name(test_name):
+    adapter_names = ('apache_spark', 'databricks_cluster',
+                     'databricks_sql_endpoint')
+    adapters_in_name = sum(x in test_name for x in adapter_names)
+    if adapters_in_name != 1:
+        raise ValueError(
+            'test names must have exactly 1 profile choice embedded, {} has {}'
+            .format(test_name, adapters_in_name)
+        )
+
+    for adapter_name in adapter_names:
+        if adapter_name in test_name:
+            return adapter_name
+
+    raise ValueError(
+        'could not find adapter name in test name {}'.format(test_name)
+    )
+
+
+def _pytest_test_name():
+    return os.environ['PYTEST_CURRENT_TEST'].split()[0]
+
+
+def _pytest_get_test_root():
+    test_path = _pytest_test_name().split('::')[0]
+    relative_to = INITIAL_ROOT
+    head = os.path.relpath(test_path, relative_to)
+
+    path_parts = []
+    while head:
+        head, tail = os.path.split(head)
+        path_parts.append(tail)
+    path_parts.reverse()
+    # dbt tests are all of the form 'tests/integration/suite_name'
+    target = os.path.join(*path_parts[:3])  # TODO: try to not hard code this
+    return os.path.join(relative_to, target)
+
+
+def _really_makedirs(path):
+    while not os.path.exists(path):
+        try:
+            os.makedirs(path)
+        except EnvironmentError:
+            raise
+
+
+class DBTIntegrationTest(unittest.TestCase):
+    CREATE_SCHEMA_STATEMENT = 'CREATE SCHEMA {}'
+    DROP_SCHEMA_STATEMENT = 'DROP SCHEMA IF EXISTS {} CASCADE'
+
+    _randint = random.randint(0, 9999)
+    _runtime_timedelta = (datetime.utcnow() - datetime(1970, 1, 1, 0, 0, 0))
+    _runtime = (
+        (int(_runtime_timedelta.total_seconds() * 1e6)) +
+        _runtime_timedelta.microseconds
+    )
+
+    prefix = f'test{_runtime}{_randint:04}'
+    setup_alternate_db = False
+
+    def apache_spark_profile(self):
+        return {
+            'config': {
+                'send_anonymous_usage_stats': False
+            },
+            'test': {
+                'outputs': {
+                    'thrift': {
+                        'type': 'spark',
+                        'host': 'localhost',
+                        'user': 'dbt',
+                        'method': 'thrift',
+                        'port': 10000,
+                        'connect_retries': 5,
+                        'connect_timeout': 60,
+                        'schema': self.unique_schema()
+                    },
+                },
+                'target': 'thrift'
+            }
+        }
+
+    def databricks_cluster_profile(self):
+        return {
+            'config': {
+                'send_anonymous_usage_stats': False
+            },
+            'test': {
+                'outputs': {
+                    'cluster': {
+                        'type': 'spark',
+                        'method': 'odbc',
+                        'host': os.getenv('DBT_DATABRICKS_HOST_NAME'),
+                        'cluster': os.getenv('DBT_DATABRICKS_CLUSTER_NAME'),
+                        'token': os.getenv('DBT_DATABRICKS_TOKEN'),
+                        'driver': os.getenv('ODBC_DRIVER'),
+                        'port': 443,
+                        'schema': self.unique_schema()
+                    },
+                },
+                'target': 'cluster'
+            }
+        }
+
+    def databricks_sql_endpoint_profile(self):
+        return {
+            'config': {
+                'send_anonymous_usage_stats': False
+            },
+            'test': {
+                'outputs': {
+                    'endpoint': {
+                        'type': 'spark',
+                        'method': 'odbc',
+                        'host': os.getenv('DBT_DATABRICKS_HOST_NAME'),
+                        'endpoint': os.getenv('DBT_DATABRICKS_ENDPOINT'),
+                        'token': os.getenv('DBT_DATABRICKS_TOKEN'),
+                        'driver': os.getenv('ODBC_DRIVER'),
+                        'port': 443,
+                        'schema': self.unique_schema()
+                    },
+                },
+                'target': 'endpoint'
+            }
+        }
+
+    @property
+    def packages_config(self):
+        return None
+
+    @property
+    def selectors_config(self):
+        return None
+
+    def unique_schema(self):
+        schema = self.schema
+
+        to_return = "{}_{}".format(self.prefix, schema)
+
+        return to_return.lower()
+
+    @property
+    def default_database(self):
+        database = self.config.credentials.database
+        return database
+
+    @property
+    def alternative_database(self):
+        return None
+
+    def get_profile(self, adapter_type):
+        if adapter_type == 'apache_spark':
+            return self.apache_spark_profile()
+        elif adapter_type == 'databricks_cluster':
+            return self.databricks_cluster_profile()
+        elif adapter_type == 'databricks_sql_endpoint':
+            return self.databricks_sql_endpoint_profile()
+        else:
+            raise ValueError('invalid adapter type {}'.format(adapter_type))
+
+    def _pick_profile(self):
+        test_name = self.id().split('.')[-1]
+        return _profile_from_test_name(test_name)
+
+    def _symlink_test_folders(self):
+        for entry in os.listdir(self.test_original_source_path):
+            src = os.path.join(self.test_original_source_path, entry)
+            tst = os.path.join(self.test_root_dir, entry)
+            if os.path.isdir(src) or src.endswith('.sql'):
+                # symlink all sql files and all directories.
+                os.symlink(src, tst)
+        os.symlink(self._logs_dir, os.path.join(self.test_root_dir, 'logs'))
+
+    @property
+    def test_root_realpath(self):
+        if sys.platform == 'darwin':
+            return os.path.realpath(self.test_root_dir)
+        else:
+            return self.test_root_dir
+
+    def _generate_test_root_dir(self):
+        return normalize(tempfile.mkdtemp(prefix='dbt-int-test-'))
+
+    def setUp(self):
+        self.dbt_core_install_root = os.path.dirname(dbt.__file__)
+        log_manager.reset_handlers()
+        self.initial_dir = INITIAL_ROOT
+        os.chdir(self.initial_dir)
+        # before we go anywhere, collect the initial path info
+        self._logs_dir = os.path.join(self.initial_dir, 'logs', self.prefix)
+        _really_makedirs(self._logs_dir)
+        self.test_original_source_path = _pytest_get_test_root()
+        self.test_root_dir = self._generate_test_root_dir()
+
+        os.chdir(self.test_root_dir)
+        try:
+            self._symlink_test_folders()
+        except Exception as exc:
+            msg = '\n\t'.join((
+                'Failed to symlink test folders!',
+                'initial_dir={0.initial_dir}',
+                'test_original_source_path={0.test_original_source_path}',
+                'test_root_dir={0.test_root_dir}'
+            )).format(self)
+            logger.exception(msg)
+
+            # if logging isn't set up, I still really want this message.
+            print(msg)
+            traceback.print_exc()
+
+            raise
+
+        self._created_schemas = set()
+        reset_deprecations()
+        template_cache.clear()
+
+        self.use_profile(self._pick_profile())
+        self.use_default_project()
+        self.set_packages()
+        self.set_selectors()
+        self.load_config()
+
+    def use_default_project(self, overrides=None):
+        # create a dbt_project.yml
+        base_project_config = {
+            'name': 'test',
+            'version': '1.0',
+            'config-version': 2,
+            'test-paths': [],
+            'source-paths': [self.models],
+            'profile': 'test',
+        }
+
+        project_config = {}
+        project_config.update(base_project_config)
+        project_config.update(self.project_config)
+        project_config.update(overrides or {})
+
+        with open("dbt_project.yml", 'w') as f:
+            yaml.safe_dump(project_config, f, default_flow_style=True)
+
+    def use_profile(self, adapter_type):
+        self.adapter_type = adapter_type
+
+        profile_config = {}
+        default_profile_config = self.get_profile(adapter_type)
+
+        profile_config.update(default_profile_config)
+        profile_config.update(self.profile_config)
+
+        if not os.path.exists(self.test_root_dir):
+            os.makedirs(self.test_root_dir)
+
+        flags.PROFILES_DIR = self.test_root_dir
+        profiles_path = os.path.join(self.test_root_dir, 'profiles.yml')
+        with open(profiles_path, 'w') as f:
+            yaml.safe_dump(profile_config, f, default_flow_style=True)
+        self._profile_config = profile_config
+
+    def set_packages(self):
+        if self.packages_config is not None:
+            with open('packages.yml', 'w') as f:
+                yaml.safe_dump(self.packages_config, f, default_flow_style=True)
+
+    def set_selectors(self):
+        if self.selectors_config is not None:
+            with open('selectors.yml', 'w') as f:
+                yaml.safe_dump(self.selectors_config, f, default_flow_style=True)
+
+    def load_config(self):
+        # we've written our profile and project. Now we want to instantiate a
+        # fresh adapter for the tests.
+        # it's important to use a different connection handle here so
+        # we don't look into an incomplete transaction
+        kwargs = {
+            'profile': None,
+            'profiles_dir': self.test_root_dir,
+            'target': None,
+        }
+
+        config = RuntimeConfig.from_args(TestArgs(kwargs))
+
+        register_adapter(config)
+        adapter = get_adapter(config)
+        adapter.cleanup_connections()
+        self.adapter_type = adapter.type()
+        self.adapter = adapter
+        self.config = config
+
+        self._drop_schemas()
+        self._create_schemas()
+
+    def quote_as_configured(self, value, quote_key):
+        return self.adapter.quote_as_configured(value, quote_key)
+
+    def tearDown(self):
+        # get any current run adapter and clean up its connections before we
+        # reset them. It'll probably be different from ours because
+        # handle_and_check() calls reset_adapters().
+        register_adapter(self.config)
+        adapter = get_adapter(self.config)
+        if adapter is not self.adapter:
+            adapter.cleanup_connections()
+        if not hasattr(self, 'adapter'):
+            self.adapter = adapter
+
+        self._drop_schemas()
+
+        self.adapter.cleanup_connections()
+        reset_adapters()
+        os.chdir(INITIAL_ROOT)
+        try:
+            shutil.rmtree(self.test_root_dir)
+        except EnvironmentError:
+            logger.exception('Could not clean up after test - {} not removable'
+                             .format(self.test_root_dir))
+
+    def _get_schema_fqn(self, database, schema):
+        schema_fqn = self.quote_as_configured(schema, 'schema')
+        return schema_fqn
+
+    def _create_schema_named(self, database, schema):
+        self.run_sql('CREATE SCHEMA {schema}')
+
+    def _drop_schema_named(self, database, schema):
+        self.run_sql('DROP SCHEMA IF EXISTS {schema} CASCADE')
+
+    def _create_schemas(self):
+        schema = self.unique_schema()
+        with self.adapter.connection_named('__test'):
+            self._create_schema_named(self.default_database, schema)
+
+    def _drop_schemas(self):
+        with self.adapter.connection_named('__test'):
+            schema = self.unique_schema()
+            self._drop_schema_named(self.default_database, schema)
+            if self.setup_alternate_db and self.alternative_database:
+                self._drop_schema_named(self.alternative_database, schema)
+
+    @property
+    def project_config(self):
+        return {
+            'config-version': 2,
+        }
+
+    @property
+    def profile_config(self):
+        return {}
+
+    def run_dbt(self, args=None, expect_pass=True, profiles_dir=True):
+        res, success = self.run_dbt_and_check(args=args, profiles_dir=profiles_dir)
+        self.assertEqual(
+            success, expect_pass,
+            "dbt exit state did not match expected")
+
+        return res
+
+
+    def run_dbt_and_capture(self, *args, **kwargs):
+        try:
+            initial_stdout = log_manager.stdout
+            initial_stderr = log_manager.stderr
+            stringbuf = io.StringIO()
+            log_manager.set_output_stream(stringbuf)
+
+            res = self.run_dbt(*args, **kwargs)
+            stdout = stringbuf.getvalue()
+
+        finally:
+            log_manager.set_output_stream(initial_stdout, initial_stderr)
+
+        return res, stdout
+
+    def run_dbt_and_check(self, args=None, profiles_dir=True):
+        log_manager.reset_handlers()
+        if args is None:
+            args = ["run"]
+
+        final_args = []
+
+        if os.getenv('DBT_TEST_SINGLE_THREADED') in ('y', 'Y', '1'):
+            final_args.append('--single-threaded')
+
+        final_args.extend(args)
+
+        if profiles_dir:
+            final_args.extend(['--profiles-dir', self.test_root_dir])
+        final_args.append('--log-cache-events')
+
+        logger.info("Invoking dbt with {}".format(final_args))
+        return dbt.handle_and_check(final_args)
+
+    def run_sql_file(self, path, kwargs=None):
+        with open(path, 'r') as f:
+            statements = f.read().split(";")
+            for statement in statements:
+                self.run_sql(statement, kwargs=kwargs)
+
+    def transform_sql(self, query, kwargs=None):
+        to_return = query
+
+        base_kwargs = {
+            'schema': self.unique_schema(),
+            'database': self.adapter.quote(self.default_database),
+        }
+        if kwargs is None:
+            kwargs = {}
+        base_kwargs.update(kwargs)
+
+        to_return = to_return.format(**base_kwargs)
+
+        return to_return
+
+    def run_sql(self, query, fetch='None', kwargs=None, connection_name=None):
+        if connection_name is None:
+            connection_name = '__test'
+
+        if query.strip() == "":
+            return
+
+        sql = self.transform_sql(query, kwargs=kwargs)
+
+        with self.get_connection(connection_name) as conn:
+            cursor = conn.handle.cursor()
+            try:
+                cursor.execute(sql)
+                if fetch == 'one':
+                    return cursor.fetchall()[0]
+                elif fetch == 'all':
+                    return cursor.fetchall()
+                else:
+                    # we have to fetch.
+                    cursor.fetchall()
+            except pyodbc.ProgrammingError as e:
+                # hacks for dropping schema
+                if "No results.  Previous SQL was not a query." not in str(e):
+                    raise e
+            except Exception as e:
+                conn.handle.rollback()
+                conn.transaction_open = False
+                print(sql)
+                print(e)
+                raise
+            else:
+                conn.transaction_open = False
+
+    def _ilike(self, target, value):
+        return "{} ilike '{}'".format(target, value)
+
+    def get_many_table_columns_bigquery(self, tables, schema, database=None):
+        result = []
+        for table in tables:
+            relation = self._make_relation(table, schema, database)
+            columns = self.adapter.get_columns_in_relation(relation)
+            for col in columns:
+                result.append((table, col.column, col.dtype, col.char_size))
+        return result
+
+    def get_many_table_columns(self, tables, schema, database=None):
+        result = self.get_many_table_columns_bigquery(tables, schema, database)
+        result.sort(key=lambda x: '{}.{}'.format(x[0], x[1]))
+        return result
+
+    def filter_many_columns(self, column):
+        if len(column) == 3:
+            table_name, column_name, data_type = column
+            char_size = None
+        else:
+            table_name, column_name, data_type, char_size = column
+        return (table_name, column_name, data_type, char_size)
+
+    @contextmanager
+    def get_connection(self, name=None):
+        """Create a test connection context where all executed macros, etc will
+        get self.adapter as the adapter.
+
+        This allows tests to run normal adapter macros as if reset_adapters()
+        were not called by handle_and_check (for asserts, etc)
+        """
+        if name is None:
+            name = '__test'
+        with patch.object(providers, 'get_adapter', return_value=self.adapter):
+            with self.adapter.connection_named(name):
+                conn = self.adapter.connections.get_thread_connection()
+                yield conn
+
+    def get_relation_columns(self, relation):
+        with self.get_connection():
+            columns = self.adapter.get_columns_in_relation(relation)
+
+        return sorted(((c.name, c.dtype, c.char_size) for c in columns),
+                      key=lambda x: x[0])
+
+    def get_table_columns(self, table, schema=None, database=None):
+        schema = self.unique_schema() if schema is None else schema
+        database = self.default_database if database is None else database
+        relation = self.adapter.Relation.create(
+            database=database,
+            schema=schema,
+            identifier=table,
+            type='table',
+            quote_policy=self.config.quoting
+        )
+        return self.get_relation_columns(relation)
+
+    def get_table_columns_as_dict(self, tables, schema=None):
+        col_matrix = self.get_many_table_columns(tables, schema)
+        res = {}
+        for row in col_matrix:
+            table_name = row[0]
+            col_def = row[1:]
+            if table_name not in res:
+                res[table_name] = []
+            res[table_name].append(col_def)
+        return res
+
+    def get_models_in_schema(self, schema=None):
+        schema = self.unique_schema() if schema is None else schema
+        sql = """
+                select table_name,
+                        case when table_type = 'BASE TABLE' then 'table'
+                             when table_type = 'VIEW' then 'view'
+                             else table_type
+                        end as materialization
+                from information_schema.tables
+                where {}
+                order by table_name
+                """
+
+        sql = sql.format(self._ilike('table_schema', schema))
+        result = self.run_sql(sql, fetch='all')
+
+        return {model_name: materialization for (model_name, materialization) in result}
+
+    def _assertTablesEqualSql(self, relation_a, relation_b, columns=None):
+        if columns is None:
+            columns = self.get_relation_columns(relation_a)
+        column_names = [c[0] for c in columns]
+
+        sql = self.adapter.get_rows_different_sql(
+            relation_a, relation_b, column_names
+        )
+
+        return sql
+
+    def assertTablesEqual(self, table_a, table_b,
+                          table_a_schema=None, table_b_schema=None,
+                          table_a_db=None, table_b_db=None):
+        if table_a_schema is None:
+            table_a_schema = self.unique_schema()
+
+        if table_b_schema is None:
+            table_b_schema = self.unique_schema()
+
+        if table_a_db is None:
+            table_a_db = self.default_database
+
+        if table_b_db is None:
+            table_b_db = self.default_database
+
+        relation_a = self._make_relation(table_a, table_a_schema, table_a_db)
+        relation_b = self._make_relation(table_b, table_b_schema, table_b_db)
+
+        self._assertTableColumnsEqual(relation_a, relation_b)
+
+        sql = self._assertTablesEqualSql(relation_a, relation_b)
+        result = self.run_sql(sql, fetch='one')
+
+        self.assertEqual(
+            result[0],
+            0,
+            'row_count_difference nonzero: ' + sql
+        )
+        self.assertEqual(
+            result[1],
+            0,
+            'num_mismatched nonzero: ' + sql
+        )
+
+    def _make_relation(self, identifier, schema=None, database=None):
+        if schema is None:
+            schema = self.unique_schema()
+        if database is None:
+            database = self.default_database
+        return self.adapter.Relation.create(
+            database=database,
+            schema=schema,
+            identifier=identifier,
+            quote_policy=self.config.quoting
+        )
+
+    def get_many_relation_columns(self, relations):
+        """Returns a dict of (datbase, schema) -> (dict of (table_name -> list of columns))
+        """
+        schema_fqns = {}
+        for rel in relations:
+            this_schema = schema_fqns.setdefault((rel.database, rel.schema), [])
+            this_schema.append(rel.identifier)
+
+        column_specs = {}
+        for key, tables in schema_fqns.items():
+            database, schema = key
+            columns = self.get_many_table_columns(tables, schema, database=database)
+            table_columns = {}
+            for col in columns:
+                table_columns.setdefault(col[0], []).append(col[1:])
+            for rel_name, columns in table_columns.items():
+                key = (database, schema, rel_name)
+                column_specs[key] = columns
+
+        return column_specs
+
+    def assertManyRelationsEqual(self, relations, default_schema=None, default_database=None):
+        if default_schema is None:
+            default_schema = self.unique_schema()
+        if default_database is None:
+            default_database = self.default_database
+
+        specs = []
+        for relation in relations:
+            if not isinstance(relation, (tuple, list)):
+                relation = [relation]
+
+            assert len(relation) <= 3
+
+            if len(relation) == 3:
+                relation = self._make_relation(*relation)
+            elif len(relation) == 2:
+                relation = self._make_relation(relation[0], relation[1], default_database)
+            elif len(relation) == 1:
+                relation = self._make_relation(relation[0], default_schema, default_database)
+            else:
+                raise ValueError('relation must be a sequence of 1, 2, or 3 values')
+
+            specs.append(relation)
+
+        with self.get_connection():
+            column_specs = self.get_many_relation_columns(specs)
+
+        # make sure everyone has equal column definitions
+        first_columns = None
+        for relation in specs:
+            key = (relation.database, relation.schema, relation.identifier)
+            # get a good error here instead of a hard-to-diagnose KeyError
+            self.assertIn(key, column_specs, f'No columns found for {key}')
+            columns = column_specs[key]
+            if first_columns is None:
+                first_columns = columns
+            else:
+                self.assertEqual(
+                    first_columns, columns,
+                    '{} did not match {}'.format(str(specs[0]), str(relation))
+                )
+
+        # make sure everyone has the same data. if we got here, everyone had
+        # the same column specs!
+        first_relation = None
+        for relation in specs:
+            if first_relation is None:
+                first_relation = relation
+            else:
+                sql = self._assertTablesEqualSql(first_relation, relation,
+                                                 columns=first_columns)
+                result = self.run_sql(sql, fetch='one')
+
+                self.assertEqual(
+                    result[0],
+                    0,
+                    'row_count_difference nonzero: ' + sql
+                )
+                self.assertEqual(
+                    result[1],
+                    0,
+                    'num_mismatched nonzero: ' + sql
+                )
+
+    def assertManyTablesEqual(self, *args):
+        schema = self.unique_schema()
+
+        all_tables = []
+        for table_equivalencies in args:
+            all_tables += list(table_equivalencies)
+
+        all_cols = self.get_table_columns_as_dict(all_tables, schema)
+
+        for table_equivalencies in args:
+            first_table = table_equivalencies[0]
+            first_relation = self._make_relation(first_table)
+
+            # assert that all tables have the same columns
+            base_result = all_cols[first_table]
+            self.assertTrue(len(base_result) > 0)
+
+            for other_table in table_equivalencies[1:]:
+                other_result = all_cols[other_table]
+                self.assertTrue(len(other_result) > 0)
+                self.assertEqual(base_result, other_result)
+
+                other_relation = self._make_relation(other_table)
+                sql = self._assertTablesEqualSql(first_relation,
+                                                 other_relation,
+                                                 columns=base_result)
+                result = self.run_sql(sql, fetch='one')
+
+                self.assertEqual(
+                    result[0],
+                    0,
+                    'row_count_difference nonzero: ' + sql
+                )
+                self.assertEqual(
+                    result[1],
+                    0,
+                    'num_mismatched nonzero: ' + sql
+                )
+
+
+    def _assertTableRowCountsEqual(self, relation_a, relation_b):
+        cmp_query = """
+            with table_a as (
+
+                select count(*) as num_rows from {}
+
+            ), table_b as (
+
+                select count(*) as num_rows from {}
+
+            )
+
+            select table_a.num_rows - table_b.num_rows as difference
+            from table_a, table_b
+
+        """.format(str(relation_a), str(relation_b))
+
+        res = self.run_sql(cmp_query, fetch='one')
+
+        self.assertEqual(int(res[0]), 0, "Row count of table {} doesn't match row count of table {}. ({} rows different)".format(
+                relation_a.identifier,
+                relation_b.identifier,
+                res[0]
+            )
+        )
+
+    def assertTableDoesNotExist(self, table, schema=None, database=None):
+        columns = self.get_table_columns(table, schema, database)
+
+        self.assertEqual(
+            len(columns),
+            0
+        )
+
+    def assertTableDoesExist(self, table, schema=None, database=None):
+        columns = self.get_table_columns(table, schema, database)
+
+        self.assertGreater(
+            len(columns),
+            0
+        )
+
+    def _assertTableColumnsEqual(self, relation_a, relation_b):
+        table_a_result = self.get_relation_columns(relation_a)
+        table_b_result = self.get_relation_columns(relation_b)
+
+        text_types = {'text', 'character varying', 'character', 'varchar'}
+
+        self.assertEqual(len(table_a_result), len(table_b_result))
+        for a_column, b_column in zip(table_a_result, table_b_result):
+            a_name, a_type, a_size = a_column
+            b_name, b_type, b_size = b_column
+            self.assertEqual(a_name, b_name,
+                '{} vs {}: column "{}" != "{}"'.format(
+                    relation_a, relation_b, a_name, b_name
+                ))
+
+            self.assertEqual(a_type, b_type,
+                '{} vs {}: column "{}" has type "{}" != "{}"'.format(
+                    relation_a, relation_b, a_name, a_type, b_type
+                ))
+
+            self.assertEqual(a_size, b_size,
+                '{} vs {}: column "{}" has size "{}" != "{}"'.format(
+                    relation_a, relation_b, a_name, a_size, b_size
+                ))
+
+    def assertEquals(self, *args, **kwargs):
+        # assertEquals is deprecated. This makes the warnings less chatty
+        self.assertEqual(*args, **kwargs)
+
+    def assertBetween(self, timestr, start, end=None):
+        datefmt = '%Y-%m-%dT%H:%M:%S.%fZ'
+        if end is None:
+            end = datetime.utcnow()
+
+        parsed = datetime.strptime(timestr, datefmt)
+
+        self.assertLessEqual(start, parsed,
+            'parsed date {} happened before {}'.format(
+                parsed,
+                start.strftime(datefmt))
+        )
+        self.assertGreaterEqual(end, parsed,
+            'parsed date {} happened after {}'.format(
+                parsed,
+                end.strftime(datefmt))
+        )
+
+
+def use_profile(profile_name):
+    """A decorator to declare a test method as using a particular profile.
+    Handles both setting the nose attr and calling self.use_profile.
+
+    Use like this:
+
+    class TestSomething(DBIntegrationTest):
+        @use_profile('postgres')
+        def test_postgres_thing(self):
+            self.assertEqual(self.adapter_type, 'postgres')
+
+        @use_profile('snowflake')
+        def test_snowflake_thing(self):
+            self.assertEqual(self.adapter_type, 'snowflake')
+    """
+    def outer(wrapped):
+        @getattr(pytest.mark, 'profile_'+profile_name)
+        @wraps(wrapped)
+        def func(self, *args, **kwargs):
+            return wrapped(self, *args, **kwargs)
+        # sanity check at import time
+        assert _profile_from_test_name(wrapped.__name__) == profile_name
+        return func
+    return outer
+
+
+class AnyFloat:
+    """Any float. Use this in assertEqual() calls to assert that it is a float.
+    """
+    def __eq__(self, other):
+        return isinstance(other, float)
+
+
+class AnyString:
+    """Any string. Use this in assertEqual() calls to assert that it is a string.
+    """
+    def __eq__(self, other):
+        return isinstance(other, str)
+
+
+class AnyStringWith:
+    def __init__(self, contains=None):
+        self.contains = contains
+
+    def __eq__(self, other):
+        if not isinstance(other, str):
+            return False
+
+        if self.contains is None:
+            return True
+
+        return self.contains in other
+
+    def __repr__(self):
+        return 'AnyStringWith<{!r}>'.format(self.contains)
+
+
+def get_manifest():
+    path = './target/partial_parse.msgpack'
+    if os.path.exists(path):
+        with open(path, 'rb') as fp:
+            manifest_mp = fp.read()
+        manifest: Manifest = Manifest.from_msgpack(manifest_mp)
+        return manifest
+    else:
+        return None
diff --git a/test/custom/conftest.py b/tests/integration/conftest.py
similarity index 100%
rename from test/custom/conftest.py
rename to tests/integration/conftest.py
diff --git a/test/custom/get_columns_in_relation/models/child.sql b/tests/integration/get_columns_in_relation/models/child.sql
similarity index 100%
rename from test/custom/get_columns_in_relation/models/child.sql
rename to tests/integration/get_columns_in_relation/models/child.sql
diff --git a/test/custom/get_columns_in_relation/models/get_columns_from_child.sql b/tests/integration/get_columns_in_relation/models/get_columns_from_child.sql
similarity index 100%
rename from test/custom/get_columns_in_relation/models/get_columns_from_child.sql
rename to tests/integration/get_columns_in_relation/models/get_columns_from_child.sql
diff --git a/test/custom/get_columns_in_relation/test_get_columns_in_relation.py b/tests/integration/get_columns_in_relation/test_get_columns_in_relation.py
similarity index 84%
rename from test/custom/get_columns_in_relation/test_get_columns_in_relation.py
rename to tests/integration/get_columns_in_relation/test_get_columns_in_relation.py
index e2c1d7a48..418cbd99c 100644
--- a/test/custom/get_columns_in_relation/test_get_columns_in_relation.py
+++ b/tests/integration/get_columns_in_relation/test_get_columns_in_relation.py
@@ -1,7 +1,7 @@
-from test.custom.base import DBTSparkIntegrationTest, use_profile
+from tests.integration.base import DBTIntegrationTest, use_profile
 
 
-class TestGetColumnInRelationInSameRun(DBTSparkIntegrationTest):
+class TestGetColumnInRelationInSameRun(DBTIntegrationTest):
     @property
     def schema(self):
         return "get_columns_in_relation"
diff --git a/test/custom/incremental_on_schema_change/models/incremental_append_new_columns.sql b/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns.sql
similarity index 100%
rename from test/custom/incremental_on_schema_change/models/incremental_append_new_columns.sql
rename to tests/integration/incremental_on_schema_change/models/incremental_append_new_columns.sql
diff --git a/test/custom/incremental_on_schema_change/models/incremental_append_new_columns_target.sql b/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns_target.sql
similarity index 100%
rename from test/custom/incremental_on_schema_change/models/incremental_append_new_columns_target.sql
rename to tests/integration/incremental_on_schema_change/models/incremental_append_new_columns_target.sql
diff --git a/test/custom/incremental_on_schema_change/models/incremental_fail.sql b/tests/integration/incremental_on_schema_change/models/incremental_fail.sql
similarity index 100%
rename from test/custom/incremental_on_schema_change/models/incremental_fail.sql
rename to tests/integration/incremental_on_schema_change/models/incremental_fail.sql
diff --git a/test/custom/incremental_on_schema_change/models/incremental_ignore.sql b/tests/integration/incremental_on_schema_change/models/incremental_ignore.sql
similarity index 100%
rename from test/custom/incremental_on_schema_change/models/incremental_ignore.sql
rename to tests/integration/incremental_on_schema_change/models/incremental_ignore.sql
diff --git a/test/custom/incremental_on_schema_change/models/incremental_ignore_target.sql b/tests/integration/incremental_on_schema_change/models/incremental_ignore_target.sql
similarity index 100%
rename from test/custom/incremental_on_schema_change/models/incremental_ignore_target.sql
rename to tests/integration/incremental_on_schema_change/models/incremental_ignore_target.sql
diff --git a/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns.sql b/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns.sql
similarity index 100%
rename from test/custom/incremental_on_schema_change/models/incremental_sync_all_columns.sql
rename to tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns.sql
diff --git a/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql b/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql
similarity index 100%
rename from test/custom/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql
rename to tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql
diff --git a/test/custom/incremental_on_schema_change/models/model_a.sql b/tests/integration/incremental_on_schema_change/models/model_a.sql
similarity index 100%
rename from test/custom/incremental_on_schema_change/models/model_a.sql
rename to tests/integration/incremental_on_schema_change/models/model_a.sql
diff --git a/test/custom/incremental_on_schema_change/test_incremental_on_schema_change.py b/tests/integration/incremental_on_schema_change/test_incremental_on_schema_change.py
similarity index 100%
rename from test/custom/incremental_on_schema_change/test_incremental_on_schema_change.py
rename to tests/integration/incremental_on_schema_change/test_incremental_on_schema_change.py
diff --git a/test/custom/incremental_strategies/models/default_append.sql b/tests/integration/incremental_strategies/models/default_append.sql
similarity index 100%
rename from test/custom/incremental_strategies/models/default_append.sql
rename to tests/integration/incremental_strategies/models/default_append.sql
diff --git a/test/custom/incremental_strategies/models_bad/bad_file_format.sql b/tests/integration/incremental_strategies/models_bad/bad_file_format.sql
similarity index 100%
rename from test/custom/incremental_strategies/models_bad/bad_file_format.sql
rename to tests/integration/incremental_strategies/models_bad/bad_file_format.sql
diff --git a/test/custom/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql b/tests/integration/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql
similarity index 100%
rename from test/custom/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql
rename to tests/integration/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql
diff --git a/test/custom/incremental_strategies/models_bad/bad_merge_not_delta.sql b/tests/integration/incremental_strategies/models_bad/bad_merge_not_delta.sql
similarity index 100%
rename from test/custom/incremental_strategies/models_bad/bad_merge_not_delta.sql
rename to tests/integration/incremental_strategies/models_bad/bad_merge_not_delta.sql
diff --git a/test/custom/incremental_strategies/models_bad/bad_strategy.sql b/tests/integration/incremental_strategies/models_bad/bad_strategy.sql
similarity index 100%
rename from test/custom/incremental_strategies/models_bad/bad_strategy.sql
rename to tests/integration/incremental_strategies/models_bad/bad_strategy.sql
diff --git a/test/custom/incremental_strategies/models_delta/append_delta.sql b/tests/integration/incremental_strategies/models_delta/append_delta.sql
similarity index 100%
rename from test/custom/incremental_strategies/models_delta/append_delta.sql
rename to tests/integration/incremental_strategies/models_delta/append_delta.sql
diff --git a/test/custom/incremental_strategies/models_delta/merge_no_key.sql b/tests/integration/incremental_strategies/models_delta/merge_no_key.sql
similarity index 100%
rename from test/custom/incremental_strategies/models_delta/merge_no_key.sql
rename to tests/integration/incremental_strategies/models_delta/merge_no_key.sql
diff --git a/test/custom/incremental_strategies/models_delta/merge_unique_key.sql b/tests/integration/incremental_strategies/models_delta/merge_unique_key.sql
similarity index 100%
rename from test/custom/incremental_strategies/models_delta/merge_unique_key.sql
rename to tests/integration/incremental_strategies/models_delta/merge_unique_key.sql
diff --git a/test/custom/incremental_strategies/models_delta/merge_update_columns.sql b/tests/integration/incremental_strategies/models_delta/merge_update_columns.sql
similarity index 100%
rename from test/custom/incremental_strategies/models_delta/merge_update_columns.sql
rename to tests/integration/incremental_strategies/models_delta/merge_update_columns.sql
diff --git a/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql b/tests/integration/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql
similarity index 100%
rename from test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql
rename to tests/integration/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql
diff --git a/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql b/tests/integration/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql
similarity index 100%
rename from test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql
rename to tests/integration/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql
diff --git a/test/custom/incremental_strategies/data/expected_append.csv b/tests/integration/incremental_strategies/seeds/expected_append.csv
similarity index 100%
rename from test/custom/incremental_strategies/data/expected_append.csv
rename to tests/integration/incremental_strategies/seeds/expected_append.csv
diff --git a/test/custom/incremental_strategies/data/expected_overwrite.csv b/tests/integration/incremental_strategies/seeds/expected_overwrite.csv
similarity index 100%
rename from test/custom/incremental_strategies/data/expected_overwrite.csv
rename to tests/integration/incremental_strategies/seeds/expected_overwrite.csv
diff --git a/test/custom/incremental_strategies/data/expected_partial_upsert.csv b/tests/integration/incremental_strategies/seeds/expected_partial_upsert.csv
similarity index 100%
rename from test/custom/incremental_strategies/data/expected_partial_upsert.csv
rename to tests/integration/incremental_strategies/seeds/expected_partial_upsert.csv
diff --git a/test/custom/incremental_strategies/data/expected_upsert.csv b/tests/integration/incremental_strategies/seeds/expected_upsert.csv
similarity index 100%
rename from test/custom/incremental_strategies/data/expected_upsert.csv
rename to tests/integration/incremental_strategies/seeds/expected_upsert.csv
diff --git a/test/custom/incremental_strategies/test_incremental_strategies.py b/tests/integration/incremental_strategies/test_incremental_strategies.py
similarity index 95%
rename from test/custom/incremental_strategies/test_incremental_strategies.py
rename to tests/integration/incremental_strategies/test_incremental_strategies.py
index 64966ece5..ef253fc5b 100644
--- a/test/custom/incremental_strategies/test_incremental_strategies.py
+++ b/tests/integration/incremental_strategies/test_incremental_strategies.py
@@ -1,9 +1,9 @@
 from cProfile import run
-from test.custom.base import DBTSparkIntegrationTest, use_profile
+from tests.integration.base import DBTIntegrationTest, use_profile
 import dbt.exceptions
 
 
-class TestIncrementalStrategies(DBTSparkIntegrationTest):
+class TestIncrementalStrategies(DBTIntegrationTest):
     @property
     def schema(self):
         return "incremental_strategies"
diff --git a/test/custom/persist_docs/models/incremental_delta_model.sql b/tests/integration/persist_docs/models/incremental_delta_model.sql
similarity index 100%
rename from test/custom/persist_docs/models/incremental_delta_model.sql
rename to tests/integration/persist_docs/models/incremental_delta_model.sql
diff --git a/test/custom/persist_docs/models/my_fun_docs.md b/tests/integration/persist_docs/models/my_fun_docs.md
similarity index 100%
rename from test/custom/persist_docs/models/my_fun_docs.md
rename to tests/integration/persist_docs/models/my_fun_docs.md
diff --git a/test/custom/persist_docs/models/no_docs_model.sql b/tests/integration/persist_docs/models/no_docs_model.sql
similarity index 100%
rename from test/custom/persist_docs/models/no_docs_model.sql
rename to tests/integration/persist_docs/models/no_docs_model.sql
diff --git a/test/custom/persist_docs/models/schema.yml b/tests/integration/persist_docs/models/schema.yml
similarity index 100%
rename from test/custom/persist_docs/models/schema.yml
rename to tests/integration/persist_docs/models/schema.yml
diff --git a/test/custom/persist_docs/models/table_delta_model.sql b/tests/integration/persist_docs/models/table_delta_model.sql
similarity index 100%
rename from test/custom/persist_docs/models/table_delta_model.sql
rename to tests/integration/persist_docs/models/table_delta_model.sql
diff --git a/test/custom/persist_docs/models/view_model.sql b/tests/integration/persist_docs/models/view_model.sql
similarity index 100%
rename from test/custom/persist_docs/models/view_model.sql
rename to tests/integration/persist_docs/models/view_model.sql
diff --git a/test/custom/persist_docs/data/seed.csv b/tests/integration/persist_docs/seeds/seed.csv
similarity index 100%
rename from test/custom/persist_docs/data/seed.csv
rename to tests/integration/persist_docs/seeds/seed.csv
diff --git a/test/custom/persist_docs/data/seeds.yml b/tests/integration/persist_docs/seeds/seeds.yml
similarity index 100%
rename from test/custom/persist_docs/data/seeds.yml
rename to tests/integration/persist_docs/seeds/seeds.yml
diff --git a/test/custom/persist_docs/test_persist_docs.py b/tests/integration/persist_docs/test_persist_docs.py
similarity index 94%
rename from test/custom/persist_docs/test_persist_docs.py
rename to tests/integration/persist_docs/test_persist_docs.py
index 64c540854..bc93f491b 100644
--- a/test/custom/persist_docs/test_persist_docs.py
+++ b/tests/integration/persist_docs/test_persist_docs.py
@@ -1,11 +1,11 @@
 from cProfile import run
-from test.custom.base import DBTSparkIntegrationTest, use_profile
+from tests.integration.base import DBTIntegrationTest, use_profile
 import dbt.exceptions
 
 import json
 
 
-class TestPersistDocsDelta(DBTSparkIntegrationTest):
+class TestPersistDocsDelta(DBTIntegrationTest):
     @property
     def schema(self):
         return "persist_docs_columns"
diff --git a/test/custom/seed_column_types/data/payments.csv b/tests/integration/seed_column_types/seeds/payments.csv
similarity index 100%
rename from test/custom/seed_column_types/data/payments.csv
rename to tests/integration/seed_column_types/seeds/payments.csv
diff --git a/test/custom/seed_column_types/test_seed_column_types.py b/tests/integration/seed_column_types/test_seed_column_types.py
similarity index 86%
rename from test/custom/seed_column_types/test_seed_column_types.py
rename to tests/integration/seed_column_types/test_seed_column_types.py
index e1fc32788..326c9f523 100644
--- a/test/custom/seed_column_types/test_seed_column_types.py
+++ b/tests/integration/seed_column_types/test_seed_column_types.py
@@ -1,9 +1,9 @@
 from cProfile import run
-from test.custom.base import DBTSparkIntegrationTest, use_profile
+from tests.integration.base import DBTIntegrationTest, use_profile
 import dbt.exceptions
 
 
-class TestSeedColumnTypeCast(DBTSparkIntegrationTest):
+class TestSeedColumnTypeCast(DBTIntegrationTest):
     @property
     def schema(self):
         return "seed_column_types"
diff --git a/test/custom/store_failures/models/schema.yml b/tests/integration/store_failures/models/schema.yml
similarity index 100%
rename from test/custom/store_failures/models/schema.yml
rename to tests/integration/store_failures/models/schema.yml
diff --git a/test/custom/store_failures/models/view_model.sql b/tests/integration/store_failures/models/view_model.sql
similarity index 100%
rename from test/custom/store_failures/models/view_model.sql
rename to tests/integration/store_failures/models/view_model.sql
diff --git a/test/custom/store_failures/test_store_failures.py b/tests/integration/store_failures/test_store_failures.py
similarity index 100%
rename from test/custom/store_failures/test_store_failures.py
rename to tests/integration/store_failures/test_store_failures.py
diff --git a/test/integration/spark-databricks-http.dbtspec b/tests/specs/spark-databricks-http.dbtspec
similarity index 100%
rename from test/integration/spark-databricks-http.dbtspec
rename to tests/specs/spark-databricks-http.dbtspec
diff --git a/test/integration/spark-databricks-odbc-cluster.dbtspec b/tests/specs/spark-databricks-odbc-cluster.dbtspec
similarity index 100%
rename from test/integration/spark-databricks-odbc-cluster.dbtspec
rename to tests/specs/spark-databricks-odbc-cluster.dbtspec
diff --git a/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec b/tests/specs/spark-databricks-odbc-sql-endpoint.dbtspec
similarity index 100%
rename from test/integration/spark-databricks-odbc-sql-endpoint.dbtspec
rename to tests/specs/spark-databricks-odbc-sql-endpoint.dbtspec
diff --git a/test/integration/spark-thrift.dbtspec b/tests/specs/spark-thrift.dbtspec
similarity index 100%
rename from test/integration/spark-thrift.dbtspec
rename to tests/specs/spark-thrift.dbtspec
diff --git a/test/unit/__init__.py b/tests/unit/__init__.py
similarity index 100%
rename from test/unit/__init__.py
rename to tests/unit/__init__.py
diff --git a/test/unit/test_adapter.py b/tests/unit/test_adapter.py
similarity index 100%
rename from test/unit/test_adapter.py
rename to tests/unit/test_adapter.py
diff --git a/test/unit/test_column.py b/tests/unit/test_column.py
similarity index 100%
rename from test/unit/test_column.py
rename to tests/unit/test_column.py
diff --git a/test/unit/test_macros.py b/tests/unit/test_macros.py
similarity index 100%
rename from test/unit/test_macros.py
rename to tests/unit/test_macros.py
diff --git a/test/unit/utils.py b/tests/unit/utils.py
similarity index 100%
rename from test/unit/utils.py
rename to tests/unit/utils.py
diff --git a/tox.ini b/tox.ini
index 76b34f6db..b21f2ac12 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,59 +1,44 @@
 [tox]
 skipsdist = True
-envlist = unit, flake8, integration-spark-thrift
-
+envlist = py36,py37,py38,py39,flake8
 
 [testenv:flake8]
-basepython = python3
-commands = /bin/bash -c '$(which flake8) --select=E,W,F --ignore=W504 dbt/'
-passenv = DBT_INVOCATION_ENV
-deps =
-     -r{toxinidir}/dev_requirements.txt
-
-[testenv:unit]
-basepython = python3
-commands = /bin/bash -c '{envpython} -m pytest -v {posargs} test/unit'
-passenv = DBT_INVOCATION_ENV
+description = flake8 code checks
+basepython = python3.8
+skip_install = true
+commands = flake8 --select=E,W,F --ignore=W504,E741 --max-line-length 99 \
+  dbt
 deps =
-    -r{toxinidir}/requirements.txt
-    -r{toxinidir}/dev_requirements.txt
+  -rdev_requirements.txt
 
-[testenv:integration-spark-databricks-http]
-basepython = python3
-commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks-http.dbtspec'
-passenv = DBT_DATABRICKS_HOST_NAME DBT_DATABRICKS_CLUSTER_NAME DBT_DATABRICKS_TOKEN DBT_INVOCATION_ENV
+[testenv:{unit,py36,py37,py38,py39,py}]
+description = unit testing
+skip_install = true
+passenv = DBT_* PYTEST_ADDOPTS
+commands = {envpython} -m pytest {posargs} tests/unit
 deps =
-    -r{toxinidir}/requirements.txt
-    -r{toxinidir}/dev_requirements.txt
-    -e.
+  -rdev_requirements.txt
+  -e.[all]
 
-[testenv:integration-spark-databricks-odbc-cluster]
-basepython = python3
-commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks-odbc-cluster.dbtspec'
-           /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_cluster {posargs} -n4 test/custom/*'
+[testenv:{integration,py36,py37,py38,py39,py}-{apache_spark,databricks_http,databricks_cluster,databricks_endpoint}]
+description = adapter plugin integration testing
+skip_install = true
 passenv = DBT_DATABRICKS_HOST_NAME DBT_DATABRICKS_CLUSTER_NAME DBT_DATABRICKS_TOKEN DBT_INVOCATION_ENV ODBC_DRIVER
+commands =
+  apache_spark: {envpython} -m pytest -v tests/specs/spark-thrift.dbtspec
+  apache_spark: {envpython} -m pytest {posargs} -m profile_apache_spark tests/integration
+  databricks_http: {envpython} -m pytest -v tests/specs/spark-databricks-http.dbtspec
+  databricks_cluster: {envpython} -m pytest -v tests/specs/spark-databricks-odbc-cluster.dbtspec
+  databricks_cluster: {envpython} -m pytest {posargs} -m profile_databricks_cluster tests/integration
+  databricks_endpoint: {envpython} -m pytest -v tests/specs/spark-databricks-odbc-sql-endpoint.dbtspec
+  databricks_endpoint: {envpython} -m pytest {posargs} -m profile_databricks_sql_endpoint tests/integration
 deps =
-    -r{toxinidir}/requirements.txt
-    -r{toxinidir}/dev_requirements.txt
-    -e.
-
-[testenv:integration-spark-databricks-odbc-sql-endpoint]
-basepython = python3
-commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks-odbc-sql-endpoint.dbtspec'
-           /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_sql_endpoint {posargs} -n4 test/custom/*'
-passenv = DBT_DATABRICKS_HOST_NAME DBT_DATABRICKS_ENDPOINT DBT_DATABRICKS_TOKEN DBT_INVOCATION_ENV ODBC_DRIVER
-deps =
-    -r{toxinidir}/requirements.txt
-    -r{toxinidir}/dev_requirements.txt
-    -e.
-
-
-[testenv:integration-spark-thrift]
-basepython = python3
-commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-thrift.dbtspec'
-           /bin/bash -c '{envpython} -m pytest -v -m profile_apache_spark {posargs} -n4 test/custom/*'
-passenv = DBT_INVOCATION_ENV
-deps =
-    -r{toxinidir}/requirements.txt
-    -r{toxinidir}/dev_requirements.txt
-    -e.
+  -rdev_requirements.txt
+  -e.[all]
+  
+[pytest]
+env_files =
+    test.env
+testpaths =
+    tests/unit
+    tests/integration