Skip to content

Commit

Permalink
Adding base Dremio rest client (#257)
Browse files Browse the repository at this point in the history
### Summary

Adding base Dremio rest client

### Description

- Rest client is more accurate in terms of OOP
- Rest client can be used in unit tests

### Test Results


### Changelog

-   [x] Added a summary of what this PR accomplishes to CHANGELOG.md

### Related Issue

Should become a part of Dremio reflection PR
  • Loading branch information
howareyouman authored Jan 2, 2025
1 parent 790a6a4 commit f5392a1
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 185 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Changes

- Added [DremioRestClient](dbt/adapters/dremio/api/rest/client.py) to isolate all Dremio API calls inside one class

## Dependency

- [#222](https://github.com/dremio/dbt-dremio/issues/222) Upgrade dbt-core to 1.8.8 and dbt-tests-adapter to 1.8.0
Expand Down
23 changes: 0 additions & 23 deletions dbt/adapters/dremio/api/__init__.py

This file was deleted.

30 changes: 9 additions & 21 deletions dbt/adapters/dremio/api/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,16 @@

import agate

from dbt.adapters.dremio.api.rest.endpoints import (
sql_endpoint,
job_status,
job_results,
job_cancel_api,
)
from dbt.adapters.dremio.api.parameters import Parameters
from dbt.adapters.dremio.api.rest.client import DremioRestClient

from dbt.adapters.events.logging import AdapterLogger

logger = AdapterLogger("dremio")


class DremioCursor:
def __init__(self, api_parameters: Parameters):
self._parameters = api_parameters
def __init__(self, rest_client: DremioRestClient):
self._rest_client = rest_client

self._closed = False
self._job_id = None
Expand All @@ -41,10 +35,6 @@ def __init__(self, api_parameters: Parameters):
self._table_results: agate.Table = None
self._description = None

@property
def parameters(self):
return self._parameters

@property
def description(self):
return self._description
Expand Down Expand Up @@ -80,7 +70,7 @@ def job_results(self):
def job_cancel(self):
# cancels current job
logger.debug(f"Cancelling job {self._job_id}")
return job_cancel_api(self._parameters, self._job_id)
return self._rest_client.job_cancel_api(self._job_id)

def close(self):
if self.closed:
Expand All @@ -94,7 +84,7 @@ def execute(self, sql, bindings=None, fetch=False):
if bindings is None:
self._initialize()

json_payload = sql_endpoint(self._parameters, sql, context=None)
json_payload = self._rest_client.sql_endpoint(sql, context=None)

self._job_id = json_payload["id"]

Expand Down Expand Up @@ -130,7 +120,7 @@ def _populate_rowcount(self):
job_id = self._job_id

last_job_state = ""
job_status_response = job_status(self._parameters, job_id)
job_status_response = self._rest_client.job_status(job_id)
job_status_state = job_status_response["jobState"]

while True:
Expand All @@ -145,7 +135,7 @@ def _populate_rowcount(self):
if job_status_state == "COMPLETED" or job_status_state == "CANCELLED":
break
last_job_state = job_status_state
job_status_response = job_status(self._parameters, job_id)
job_status_response = self._rest_client.job_status(job_id)
job_status_state = job_status_response["jobState"]

# this is done as job status does not return a rowCount if there are no rows affected (even in completed job_state)
Expand All @@ -161,8 +151,7 @@ def _populate_rowcount(self):

def _populate_job_results(self, row_limit=500):
if self._job_results == None:
combined_job_results = job_results(
self._parameters,
combined_job_results = self._rest_client.job_results(
self._job_id,
offset=0,
limit=row_limit,
Expand All @@ -177,8 +166,7 @@ def _populate_job_results(self, row_limit=500):

while current_row_count < total_row_count:
combined_job_results["rows"].extend(
job_results(
self._parameters,
self._rest_client.job_results(
self._job_id,
offset=current_row_count,
limit=row_limit,
Expand Down
12 changes: 6 additions & 6 deletions dbt/adapters/dremio/api/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from dbt.adapters.dremio.api.cursor import DremioCursor
from dbt.adapters.dremio.api.parameters import Parameters
from dbt.adapters.dremio.api.rest.endpoints import login
from dbt.adapters.dremio.api.rest.client import DremioRestClient

from dbt.adapters.events.logging import AdapterLogger

Expand All @@ -23,19 +23,19 @@

class DremioHandle:
def __init__(self, parameters: Parameters):
self._parameters = parameters
self._rest_client = DremioRestClient(parameters)
self._cursor = None
self.closed = False

def get_parameters(self):
return self._parameters
def get_client(self):
return self._rest_client

def cursor(self):
if self.closed:
raise Exception("HandleClosed")
if self._cursor is None or self._cursor.closed:
self._parameters = login(self._parameters)
self._cursor = DremioCursor(self._parameters)
self._rest_client.start()
self._cursor = DremioCursor(self._rest_client)
return self._cursor

def close(self):
Expand Down
135 changes: 135 additions & 0 deletions dbt/adapters/dremio/api/rest/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright (C) 2022 Dremio Corporation

# Copyright (c) 2019 Ryan Murray.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.



import requests

from dbt.adapters.dremio.api.authentication import DremioPatAuthentication
from dbt.adapters.dremio.api.parameters import Parameters
from dbt.adapters.dremio.api.rest.utils import _post, _get, _delete
from dbt.adapters.dremio.api.rest.url_builder import UrlBuilder

from dbt.adapters.events.logging import AdapterLogger

logger = AdapterLogger("dremio")

session = requests.Session()


class DremioRestClient:
def __init__(self, api_parameters: Parameters):
self._parameters = api_parameters

def start(self):
self._parameters = self.__login()

def __login(self, timeout=10):
if isinstance(self._parameters.authentication, DremioPatAuthentication):
return self._parameters

url = UrlBuilder.login_url(self._parameters)
response = _post(
url,
json={
"userName": self._parameters.authentication.username,
"password": self._parameters.authentication.password,
},
timeout=timeout,
ssl_verify=self._parameters.authentication.verify_ssl,
)

self._parameters.authentication.token = response["token"]

return self._parameters

def sql_endpoint(self, query, context=None):
url = UrlBuilder.sql_url(self._parameters)
return _post(
url,
self._parameters.authentication.get_headers(),
ssl_verify=self._parameters.authentication.verify_ssl,
json={"sql": query, "context": context},
)

def job_status(self, job_id):
url = UrlBuilder.job_status_url(self._parameters, job_id)
return _get(
url,
self._parameters.authentication.get_headers(),
ssl_verify=self._parameters.authentication.verify_ssl,
)

def job_cancel_api(self, job_id):
url = UrlBuilder.job_cancel_url(self._parameters, job_id)
return _post(
url,
self._parameters.authentication.get_headers(),
json=None,
ssl_verify=self._parameters.authentication.verify_ssl,
)

def job_results(self, job_id, offset=0, limit=100):
url = UrlBuilder.job_results_url(
self._parameters,
job_id,
offset,
limit,
)
return _get(
url,
self._parameters.authentication.get_headers(),
ssl_verify=self._parameters.authentication.verify_ssl,
)

def create_catalog_api(self, json):
url = UrlBuilder.catalog_url(self._parameters)
return _post(
url,
self._parameters.authentication.get_headers(),
json=json,
ssl_verify=self._parameters.authentication.verify_ssl,
)

def get_catalog_item(self, catalog_id=None, catalog_path=None):
if catalog_id is None and catalog_path is None:
raise TypeError(
"both id and path can't be None for a catalog_item call")

# Will use path if both id and path are specified
if catalog_path:
url = UrlBuilder.catalog_item_by_path_url(
self._parameters,
catalog_path,
)
else:
url = UrlBuilder.catalog_item_by_id_url(
self._parameters,
catalog_id,
)
return _get(
url,
self._parameters.authentication.get_headers(),
ssl_verify=self._parameters.authentication.verify_ssl,
)

def delete_catalog(self, cid):
url = UrlBuilder.delete_catalog_url(self._parameters, cid)
return _delete(
url,
self._parameters.authentication.get_headers(),
ssl_verify=self._parameters.authentication.verify_ssl,
)
Loading

0 comments on commit f5392a1

Please sign in to comment.