From c9ae49255d07abfb542c37566731da30c15c0e5a Mon Sep 17 00:00:00 2001 From: Kurt Convey Date: Tue, 11 Aug 2020 12:29:52 -0600 Subject: [PATCH 1/5] Add test from dev --- .../dbt/adapters/bigquery/connections.py | 31 ++++++++++++++-- test/unit/test_bigquery_adapter.py | 36 +++++++++++++++++-- 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 9ed789e62fb..7f92a2db869 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -1,5 +1,6 @@ from contextlib import contextmanager from dataclasses import dataclass +from requests.exceptions import ConnectionError from typing import Optional, Any, Dict import google.auth @@ -25,6 +26,18 @@ BQ_QUERY_JOB_SPLIT = '-----Query Job SQL Follows-----' +REOPENABLE_ERRORS = { + ConnectionResetError, + ConnectionError, +} + +RETRYABLE_ERRORS = { + google.cloud.exceptions.ServerError, + google.cloud.exceptions.BadRequest, + ConnectionResetError, + ConnectionError, +} + class Priority(StrEnum): Interactive = 'interactive' @@ -390,12 +403,21 @@ def _query_and_results(self, client, sql, conn, job_params, timeout=None): def _retry_and_handle(self, msg, conn, fn): """retry a function call within the context of exception_handler.""" + def reopen_conn_on_error(error): + for type in REOPENABLE_ERRORS: + if isinstance(error, type): + logger.warning('Reopening connection after {!r}', error) + self.close(conn) + self.open(conn) + return + with self.exception_handler(msg): return retry.retry_target( target=fn, predicate=_ErrorCounter(self.get_retries(conn)).count_error, sleep_generator=self._retry_generator(), - deadline=None) + deadline=None, + on_error=reopen_conn_on_error) def _retry_generator(self): """Generates retry intervals that exponentially back off.""" @@ -425,5 +447,8 @@ def count_error(self, error): def _is_retryable(error): - """Return true for 500 level (retryable) errors.""" - return isinstance(error, google.cloud.exceptions.ServerError) + """Return true for errors that are unlikely to occur again if retried.""" + for error_type in RETRYABLE_ERRORS: + if isinstance(error, error_type): + return True + return False diff --git a/test/unit/test_bigquery_adapter.py b/test/unit/test_bigquery_adapter.py index c0e9198850a..ee89592d5f9 100644 --- a/test/unit/test_bigquery_adapter.py +++ b/test/unit/test_bigquery_adapter.py @@ -3,6 +3,7 @@ import re import unittest from contextlib import contextmanager +from requests.exceptions import ConnectionError from unittest.mock import patch, MagicMock, Mock import hologram @@ -427,9 +428,10 @@ def setUp(self): self.connections.get_thread_connection = lambda: self.mock_connection - def test_retry_and_handle(self): + @patch( + 'dbt.adapters.bigquery.connections._is_retryable', return_value=True) + def test_retry_and_handle(self, is_retryable): self.connections.DEFAULT_MAXIMUM_DELAY = 2.0 - dbt.adapters.bigquery.connections._is_retryable = lambda x: True @contextmanager def dummy_handler(msg): @@ -453,14 +455,42 @@ def raiseDummyException(): raiseDummyException) self.assertEqual(DummyException.count, 9) + @patch( + 'dbt.adapters.bigquery.connections._is_retryable', return_value=True) + def test_retry_connection_reset(self, is_retryable): + self.connections.open = MagicMock() + self.connections.close = MagicMock() + self.connections.DEFAULT_MAXIMUM_DELAY = 2.0 + + @contextmanager + def dummy_handler(msg): + yield + + self.connections.exception_handler = dummy_handler + + def raiseConnectionResetError(): + raise ConnectionResetError("Connection broke") + + mock_conn = Mock(credentials=Mock(retries=1)) + with self.assertRaises(ConnectionResetError): + self.connections._retry_and_handle( + "some sql", mock_conn, + raiseConnectionResetError) + self.connections.close.assert_called_once_with(mock_conn) + self.connections.open.assert_called_once_with(mock_conn) + def test_is_retryable(self): _is_retryable = dbt.adapters.bigquery.connections._is_retryable exceptions = dbt.adapters.bigquery.impl.google.cloud.exceptions internal_server_error = exceptions.InternalServerError('code broke') bad_request_error = exceptions.BadRequest('code broke') + connection_error = ConnectionError('code broke') + client_error = exceptions.ClientError('bad code') self.assertTrue(_is_retryable(internal_server_error)) - self.assertFalse(_is_retryable(bad_request_error)) + self.assertTrue(_is_retryable(bad_request_error)) + self.assertTrue(_is_retryable(connection_error)) + self.assertFalse(_is_retryable(client_error)) def test_drop_dataset(self): mock_table = Mock() From 203d8c34815d60e84af18df5c9447c987ef4e50f Mon Sep 17 00:00:00 2001 From: Kurt Convey Date: Tue, 11 Aug 2020 12:42:31 -0600 Subject: [PATCH 2/5] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 897deb3d93f..c6c018ce10b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features - Add support for impersonating a service account using `impersonate_service_account` in the BigQuery profile configuration ([#2677](https://github.com/fishtown-analytics/dbt/issues/2677)) ([docs](https://docs.getdbt.com/reference/warehouse-profiles/bigquery-profile#service-account-impersonation)) - Macros in the current project can override internal dbt macros that are called through `execute_macros`. ([#2301](https://github.com/fishtown-analytics/dbt/issues/2301), [#2686](https://github.com/fishtown-analytics/dbt/pull/2686)) +- Add better retry support when using the BigQuery adapter ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694), follow-up to [#1963](https://github.com/fishtown-analytics/dbt/pull/1963)) ### Breaking changes From afe0f46768c706c29da077f0bd32637b7b1d50d0 Mon Sep 17 00:00:00 2001 From: Kurt Convey Date: Tue, 11 Aug 2020 15:15:04 -0600 Subject: [PATCH 3/5] use isinstance with tuple --- .../dbt/adapters/bigquery/connections.py | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 7f92a2db869..97b386f30a1 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -26,17 +26,17 @@ BQ_QUERY_JOB_SPLIT = '-----Query Job SQL Follows-----' -REOPENABLE_ERRORS = { +REOPENABLE_ERRORS = ( ConnectionResetError, ConnectionError, -} +) -RETRYABLE_ERRORS = { +RETRYABLE_ERRORS = ( google.cloud.exceptions.ServerError, google.cloud.exceptions.BadRequest, ConnectionResetError, ConnectionError, -} +) class Priority(StrEnum): @@ -404,12 +404,11 @@ def _query_and_results(self, client, sql, conn, job_params, timeout=None): def _retry_and_handle(self, msg, conn, fn): """retry a function call within the context of exception_handler.""" def reopen_conn_on_error(error): - for type in REOPENABLE_ERRORS: - if isinstance(error, type): - logger.warning('Reopening connection after {!r}', error) - self.close(conn) - self.open(conn) - return + if isinstance(error, type): + logger.warning('Reopening connection after {!r}', error) + self.close(conn) + self.open(conn) + return with self.exception_handler(msg): return retry.retry_target( @@ -448,7 +447,6 @@ def count_error(self, error): def _is_retryable(error): """Return true for errors that are unlikely to occur again if retried.""" - for error_type in RETRYABLE_ERRORS: - if isinstance(error, error_type): - return True + if isinstance(error, RETRYABLE_ERRORS): + return True return False From ee9ae226516373f35580faca351548659d165a2c Mon Sep 17 00:00:00 2001 From: Kurt Convey Date: Tue, 11 Aug 2020 15:16:15 -0600 Subject: [PATCH 4/5] Fix variable name --- plugins/bigquery/dbt/adapters/bigquery/connections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 97b386f30a1..a64435aecaf 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -404,7 +404,7 @@ def _query_and_results(self, client, sql, conn, job_params, timeout=None): def _retry_and_handle(self, msg, conn, fn): """retry a function call within the context of exception_handler.""" def reopen_conn_on_error(error): - if isinstance(error, type): + if isinstance(error, REOPENABLE_ERRORS): logger.warning('Reopening connection after {!r}', error) self.close(conn) self.open(conn) From 44568726357c5a23a022f863f34d4ed21736f56b Mon Sep 17 00:00:00 2001 From: Kurt Convey Date: Tue, 11 Aug 2020 15:23:02 -0600 Subject: [PATCH 5/5] Didn't forget to add myself to the contributors --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c6c018ce10b..227514a6b9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Contributors: - [@bbhoss](https://github.com/bbhoss) ([#2677](https://github.com/fishtown-analytics/dbt/pull/2677)) +- [@kconvey](https://github.com/kconvey) ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694)) ## dbt 0.18.0b2 (July 30, 2020)