Skip to content

Commit

Permalink
refreshing snowflake schema w/o waking cluster (#4285)
Browse files Browse the repository at this point in the history
* refreshing snowflake schema w/o waking cluster

Have also added a new internal method to not select a
warehouse while executing query
Using 'show columns' to fetch database schema instead of
executing a select query in information schema
show columns does not require a warehouse to run

* modularising snowflake code to avoid repetitions

fixing internal function syntax and avoiding
code repetition

* removing user object in snowflake schema query
  • Loading branch information
monicagangwar authored and arikfr committed Dec 2, 2019
1 parent 36ab8ea commit 4d6c30e
Showing 1 changed file with 39 additions and 19 deletions.
58 changes: 39 additions & 19 deletions redash/query_runner/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def determine_type(cls, data_type, scale):
return TYPE_FLOAT
return t

def run_query(self, query, user):
def _get_connection(self):
region = self.configuration.get('region')

# for us-west we don't need to pass a region (and if we do, it fails to connect)
Expand All @@ -81,6 +81,19 @@ def run_query(self, query, user):
region=region
)

return connection

def _parse_results(self, cursor):
columns = self.fetch_columns(
[(i[0], self.determine_type(i[1], i[5])) for i in cursor.description])
rows = [dict(zip((column['name'] for column in columns), row))
for row in cursor]

data = {'columns': columns, 'rows': rows}
return data

def run_query(self, query, user):
connection = self._get_connection()
cursor = connection.cursor()

try:
Expand All @@ -90,12 +103,7 @@ def run_query(self, query, user):

cursor.execute(query)

columns = self.fetch_columns(
[(i[0], self.determine_type(i[1], i[5])) for i in cursor.description])
rows = [dict(zip((column['name'] for column in columns), row))
for row in cursor]

data = {'columns': columns, 'rows': rows}
data = self._parse_results(cursor)
error = None
json_data = json_dumps(data)
finally:
Expand All @@ -104,30 +112,42 @@ def run_query(self, query, user):

return json_data, error

def _run_query_without_warehouse(self, query):
connection = self._get_connection()
cursor = connection.cursor()

try:
cursor.execute("USE {}".format(self.configuration['database']))

cursor.execute(query)

data = self._parse_results(cursor)
error = None
finally:
cursor.close()
connection.close()

return data, error

def get_schema(self, get_stats=False):
query = """
SELECT col.table_schema,
col.table_name,
col.column_name
FROM {database}.information_schema.columns col
WHERE col.table_schema <> 'INFORMATION_SCHEMA'
SHOW COLUMNS IN DATABASE {database}
""".format(database=self.configuration['database'])

results, error = self.run_query(query, None)
results, error = self._run_query_without_warehouse(query)

if error is not None:
raise Exception("Failed getting schema.")

schema = {}
results = json_loads(results)

for row in results['rows']:
table_name = '{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME'])
if row['kind'] == 'COLUMN':
table_name = '{}.{}'.format(row['schema_name'], row['table_name'])

if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}
if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}

schema[table_name]['columns'].append(row['COLUMN_NAME'])
schema[table_name]['columns'].append(row['column_name'])

return list(schema.values())

Expand Down

0 comments on commit 4d6c30e

Please sign in to comment.