Skip to content

Commit

Permalink
Ensure adapter does not fetch actual records from materialization (#214)
Browse files Browse the repository at this point in the history
### Summary

During the second run of an incremental materialization, the adapter
would try and fetch records from the temporary table it is creating.
This made the adapter hang if the table was large enough.

### Description
Made the fetch configuration false by default. Also changed the
run_query macro to use call_statement, as run_query will always fetch
the data that is returned by the sql query. Thanks to @mxmarg for the
initial iteration of this fix.

### Related Issue

#211
  • Loading branch information
ravjotbrar authored Jan 29, 2024
1 parent 6bb880e commit ef025e7
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
9 changes: 7 additions & 2 deletions dbt/adapters/dremio/api/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def close(self):
self._initialize()
self.closed = True

def execute(self, sql, bindings=None, fetch=True):
def execute(self, sql, bindings=None, fetch=False):
if self.closed:
raise Exception("CursorClosed")
if bindings is None:
Expand Down Expand Up @@ -147,7 +147,7 @@ def _populate_rowcount(self):

self._rowcount = rows

def _populate_job_results(self, row_limit=100):
def _populate_job_results(self, row_limit=500):
if self._job_results == None:
combined_job_results = job_results(
self._parameters,
Expand All @@ -158,6 +158,11 @@ def _populate_job_results(self, row_limit=100):
total_row_count = combined_job_results["rowCount"]
current_row_count = len(combined_job_results["rows"])

if total_row_count > 100000:
logger.warning(
"Fetching more than 100000 records. This may result in slower performance."
)

while current_row_count < total_row_count:
combined_job_results["rows"].extend(
job_results(
Expand Down
3 changes: 1 addition & 2 deletions dbt/adapters/dremio/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def add_commit_query(self):

# Auto_begin may not be relevant with the rest_api
def add_query(
self, sql, auto_begin=True, bindings=None, abridge_sql_log=False, fetch=True
self, sql, auto_begin=True, bindings=None, abridge_sql_log=False, fetch=False
):
connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
Expand Down Expand Up @@ -176,7 +176,6 @@ def execute(
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql, auto_begin, fetch=fetch)
response = self.get_response(cursor)
# fetch = True
if fetch:
table = cursor.table
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ limitations under the License.*/
{% set build_sql = get_create_table_as_sql(False, intermediate_relation, external_query(sql)) %}
{% set need_swap = true %}
{% else %}
{% do run_query(get_create_table_as_sql(True, temp_relation, external_query(sql))) %}
{% call statement('temp') -%}
{{ create_table_as(True, temp_relation, external_query(sql)) }}
{%- endcall %}
{% do to_drop.append(temp_relation) %}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
Expand Down

0 comments on commit ef025e7

Please sign in to comment.