Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add assume role as a credential source for AWS Athena Query runner #4028

Merged
merged 6 commits into from
Aug 12, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 50 additions & 13 deletions redash/query_runner/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
logger = logging.getLogger(__name__)
ANNOTATE_QUERY = parse_boolean(os.environ.get('ATHENA_ANNOTATE_QUERY', 'true'))
SHOW_EXTRA_SETTINGS = parse_boolean(os.environ.get('ATHENA_SHOW_EXTRA_SETTINGS', 'true'))
ASSUME_ROLE = parse_boolean(os.environ.get('ATHENA_ASSUME_ROLE', 'false'))
OPTIONAL_CREDENTIALS = parse_boolean(os.environ.get('ATHENA_OPTIONAL_CREDENTIALS', 'true'))

try:
Expand Down Expand Up @@ -85,7 +86,7 @@ def configuration_schema(cls):
},
},
'required': ['region', 's3_staging_dir'],
'order': ['region', 'aws_access_key', 'aws_secret_key', 's3_staging_dir', 'schema', 'work_group'],
'order': ['region', 's3_staging_dir', 'schema', 'work_group'],
'secret': ['aws_secret_key']
}

Expand All @@ -101,8 +102,29 @@ def configuration_schema(cls):
},
})

if not OPTIONAL_CREDENTIALS:
schema['required'] += ['aws_access_key', 'aws_secret_key']
if ASSUME_ROLE:
del schema['properties']['aws_access_key']
del schema['properties']['aws_secret_key']
schema['secret'] = []

schema['order'].insert(1, 'iam_role')
schema['order'].insert(2, 'external_id')
schema['properties'].update({
'iam_role': {
'type': 'string',
'title': 'IAM role to assume',
},
'external_id': {
'type': 'string',
'title': 'External ID to be used while STS assume role',
},
})
else:
schema['order'].insert(1, 'aws_access_key')
schema['order'].insert(2, 'aws_secret_key')

if not OPTIONAL_CREDENTIALS and not ASSUME_ROLE:
schema['required'] += ['aws_access_key', 'aws_secret_key']

return schema

Expand All @@ -118,13 +140,30 @@ def annotate_query(cls):
def type(cls):
return "athena"

def __get_schema_from_glue(self):
client = boto3.client(
'glue',
aws_access_key_id=self.configuration.get('aws_access_key', None),
aws_secret_access_key=self.configuration.get('aws_secret_key', None),
region_name=self.configuration['region']
def _get_iam_credentials(self, user=None):
if ASSUME_ROLE:
role_session_name = 'redash' if user is None else user.email
sts = boto3.client('sts')
creds = sts.assume_role(
RoleArn=self.configuration.get('iam_role'),
RoleSessionName=role_session_name,
ExternalId=self.configuration.get('external_id')
)
return {
'aws_access_key_id': creds['Credentials']['AccessKeyId'],
'aws_secret_access_key': creds['Credentials']['SecretAccessKey'],
'aws_session_token': creds['Credentials']['SessionToken'],
'region_name': self.configuration['region']
}
else:
return {
'aws_access_key_id': self.configuration.get('aws_access_key', None),
'aws_secret_access_key': self.configuration.get('aws_secret_key', None),
'region_name': self.configuration['region']
}

def __get_schema_from_glue(self):
client = boto3.client('glue', **self._get_iam_credentials())
schema = {}

database_paginator = client.get_paginator('get_databases')
Expand Down Expand Up @@ -169,14 +208,12 @@ def get_schema(self, get_stats=False):
def run_query(self, query, user):
cursor = pyathena.connect(
s3_staging_dir=self.configuration['s3_staging_dir'],
region_name=self.configuration['region'],
aws_access_key_id=self.configuration.get('aws_access_key', None),
aws_secret_access_key=self.configuration.get('aws_secret_key', None),
schema_name=self.configuration.get('schema', 'default'),
encryption_option=self.configuration.get('encryption_option', None),
kms_key=self.configuration.get('kms_key', None),
work_group=self.configuration.get('work_group', 'primary'),
formatter=SimpleFormatter()).cursor()
formatter=SimpleFormatter(),
**self._get_iam_credentials(user=user)).cursor()

try:
cursor.execute(query)
Expand Down