Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data Sources] Implement Apache Drill #3188

Merged
merged 5 commits into from
Jan 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added client/app/assets/images/db-logos/drill.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
40 changes: 36 additions & 4 deletions redash/query_runner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging

from dateutil import parser
import requests

from redash import settings
Expand All @@ -20,7 +22,8 @@
'SUPPORTED_COLUMN_TYPES',
'register',
'get_query_runner',
'import_query_runners'
'import_query_runners',
'guess_type'
]

# Valid types of columns returned in results:
Expand Down Expand Up @@ -169,7 +172,8 @@ def configuration_schema(cls):
'title': cls.password_title,
},
},
'secret': ['password']
'secret': ['password'],
'order': ['url', 'username', 'password']
}

if cls.requires_url or cls.requires_authentication:
Expand All @@ -192,7 +196,7 @@ def get_auth(self):
else:
return None

def get_response(self, url, auth=None, **kwargs):
def get_response(self, url, auth=None, http_method='get', **kwargs):
# Get authentication values if not given
if auth is None:
auth = self.get_auth()
Expand All @@ -202,7 +206,7 @@ def get_response(self, url, auth=None, **kwargs):
error = None
response = None
try:
response = requests.get(url, auth=auth, **kwargs)
response = requests.request(http_method, url, auth=auth, **kwargs)
# Raise a requests HTTP exception with the appropriate reason
# for 4xx and 5xx response status codes which is later caught
# and passed back.
Expand Down Expand Up @@ -265,3 +269,31 @@ def get_configuration_schema_for_query_runner_type(query_runner_type):
def import_query_runners(query_runner_imports):
for runner_import in query_runner_imports:
__import__(runner_import)


def guess_type(string_value):
if string_value == '' or string_value is None:
return TYPE_STRING

try:
int(string_value)
return TYPE_INTEGER
except (ValueError, OverflowError):
pass

try:
float(string_value)
return TYPE_FLOAT
except (ValueError, OverflowError):
pass

if unicode(string_value).lower() in ('true', 'false'):
return TYPE_BOOLEAN

try:
parser.parse(string_value)
return TYPE_DATETIME
except (ValueError, OverflowError):
pass

return TYPE_STRING
143 changes: 143 additions & 0 deletions redash/query_runner/drill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import os
import logging
import re

from dateutil import parser

from redash.query_runner import (
BaseHTTPQueryRunner, register,
TYPE_DATETIME, TYPE_INTEGER, TYPE_FLOAT, TYPE_BOOLEAN,
guess_type
)
from redash.utils import json_dumps, json_loads

logger = logging.getLogger(__name__)


# Convert Drill string value to actual type
def convert_type(string_value, actual_type):
if string_value is None or string_value == '':
return ''

if actual_type == TYPE_INTEGER:
return int(string_value)

if actual_type == TYPE_FLOAT:
return float(string_value)

if actual_type == TYPE_BOOLEAN:
return unicode(string_value).lower() == 'true'

if actual_type == TYPE_DATETIME:
return parser.parse(string_value)

return unicode(string_value)


# Parse Drill API response and translate it to accepted format
def parse_response(data):
cols = data['columns']
rows = data['rows']

if len(cols) == 0:
return {'columns': [], 'rows': []}

first_row = rows[0]
columns = []
types = {}

for c in cols:
columns.append({'name': c, 'type': guess_type(first_row[c]), 'friendly_name': c})

for col in columns:
types[col['name']] = col['type']

for row in rows:
for key, value in row.iteritems():
row[key] = convert_type(value, types[key])

return {'columns': columns, 'rows': rows}


class Drill(BaseHTTPQueryRunner):
noop_query = 'select version from sys.version'
response_error = "Drill API returned unexpected status code"
requires_authentication = False
requires_url = True
url_title = 'Drill URL'
username_title = 'Username'
password_title = 'Password'

@classmethod
def name(cls):
return 'Apache Drill'

@classmethod
def configuration_schema(cls):
schema = super(Drill, cls).configuration_schema()
# Since Drill itself can act as aggregator of various datasources,
# it can contain quite a lot of schemas in `INFORMATION_SCHEMA`
# We added this to improve user experience and let users focus only on desired schemas.
schema['properties']['allowed_schemas'] = {
'type': 'string',
'title': 'List of schemas to use in schema browser (comma separated)'
}
schema['order'] += ['allowed_schemas']
return schema

def run_query(self, query, user):
drill_url = os.path.join(self.configuration['url'], 'query.json')

try:
payload = {'queryType': 'SQL', 'query': query}

response, error = self.get_response(drill_url, http_method='post', json=payload)
if error is not None:
return None, error

results = parse_response(response.json())

return json_dumps(results), None
except KeyboardInterrupt:
return None, 'Query cancelled by user.'

def get_schema(self, get_stats=False):

query = """
SELECT DISTINCT
TABLE_SCHEMA,
TABLE_NAME,
COLUMN_NAME
FROM
INFORMATION_SCHEMA.`COLUMNS`
WHERE
TABLE_SCHEMA not in ('INFORMATION_SCHEMA', 'information_schema', 'sys')
and TABLE_SCHEMA not like '%.information_schema'
and TABLE_SCHEMA not like '%.INFORMATION_SCHEMA'

"""
allowed_schemas = self.configuration.get('allowed_schemas')
if allowed_schemas:
query += "and TABLE_SCHEMA in ({})".format(', '.join(map(lambda x: "'{}'".format(re.sub('[^a-zA-Z0-9_.`]', '', x)), allowed_schemas.split(','))))

results, error = self.run_query(query, None)

if error is not None:
raise Exception("Failed getting schema.")

results = json_loads(results)

schema = {}

for row in results['rows']:
table_name = u'{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME'])

if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}

schema[table_name]['columns'].append(row['COLUMN_NAME'])

return schema.values()


register(Drill)
25 changes: 1 addition & 24 deletions redash/query_runner/google_spreadsheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,6 @@ def _get_columns_and_column_names(row):
return columns, column_names


def _guess_type(value):
if value == '':
return TYPE_STRING
try:
val = int(value)
return TYPE_INTEGER
except ValueError:
pass
try:
val = float(value)
return TYPE_FLOAT
except ValueError:
pass
if unicode(value).lower() in ('true', 'false'):
return TYPE_BOOLEAN
try:
val = parser.parse(value)
return TYPE_DATETIME
except (ValueError, OverflowError):
pass
return TYPE_STRING


def _value_eval_list(row_values, col_types):
value_list = []
raw_values = zip(col_types, row_values)
Expand Down Expand Up @@ -120,7 +97,7 @@ def parse_worksheet(worksheet):

if len(worksheet) > 1:
for j, value in enumerate(worksheet[HEADER_INDEX + 1]):
columns[j]['type'] = _guess_type(value)
columns[j]['type'] = guess_type(value)

column_types = [c['type'] for c in columns]
rows = [dict(zip(column_names, _value_eval_list(row, column_types))) for row in worksheet[HEADER_INDEX + 1:]]
Expand Down
28 changes: 2 additions & 26 deletions redash/query_runner/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

from redash import models
from redash.permissions import has_access, not_view_only
from redash.query_runner import (TYPE_BOOLEAN, TYPE_DATETIME, TYPE_FLOAT,
TYPE_INTEGER, TYPE_STRING, BaseQueryRunner,
register)
from redash.query_runner import guess_type, TYPE_STRING, BaseQueryRunner, register
from redash.utils import json_dumps, json_loads

logger = logging.getLogger(__name__)
Expand All @@ -24,28 +22,6 @@ class CreateTableError(Exception):
pass


def _guess_type(value):
if value == '' or value is None:
return TYPE_STRING

if isinstance(value, numbers.Integral):
return TYPE_INTEGER

if isinstance(value, float):
return TYPE_FLOAT

if text_type(value).lower() in ('true', 'false'):
return TYPE_BOOLEAN

try:
parser.parse(value)
return TYPE_DATETIME
except (ValueError, OverflowError):
pass

return TYPE_STRING


def extract_query_ids(query):
queries = re.findall(r'(?:join|from)\s+query_(\d+)', query, re.IGNORECASE)
return [int(q) for q in queries]
Expand Down Expand Up @@ -164,7 +140,7 @@ def run_query(self, query, user):

for i, row in enumerate(cursor):
for j, col in enumerate(row):
guess = _guess_type(col)
guess = guess_type(col)

if columns[j]['type'] is None:
columns[j]['type'] = guess
Expand Down
3 changes: 2 additions & 1 deletion redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ def all_settings():
'redash.query_runner.qubole',
'redash.query_runner.db2',
'redash.query_runner.druid',
'redash.query_runner.kylin'
'redash.query_runner.kylin',
'redash.query_runner.drill',
]

enabled_query_runners = array_from_string(os.environ.get("REDASH_ENABLED_QUERY_RUNNERS", ",".join(default_query_runners)))
Expand Down
Loading