Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Looker PDT operators #20882

Merged
merged 12 commits into from
Mar 7, 2022
63 changes: 63 additions & 0 deletions airflow/providers/google/cloud/example_dags/example_looker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that show how to use various Looker
operators to submit PDT materialization job and manage it.
"""

from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.operators.looker import LookerStartPdtBuildOperator
from airflow.providers.google.cloud.sensors.looker import LookerCheckPdtBuildSensor

with models.DAG(
dag_id='example_gcp_looker',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
# [START cloud_looker_async_start_pdt_sensor]
start_pdt_task_async = LookerStartPdtBuildOperator(
task_id='start_pdt_task_async',
looker_conn_id='your_airflow_connection_for_looker',
model='your_lookml_model',
view='your_lookml_view',
asynchronous=True,
)

check_pdt_task_async_sensor = LookerCheckPdtBuildSensor(
task_id='check_pdt_task_async_sensor',
looker_conn_id='your_airflow_connection_for_looker',
materialization_id=start_pdt_task_async.output,
poke_interval=10,
)
# [END cloud_looker_async_start_pdt_sensor]

# [START how_to_cloud_looker_start_pdt_build_operator]
build_pdt_task = LookerStartPdtBuildOperator(
task_id='build_pdt_task',
looker_conn_id='your_airflow_connection_for_looker',
model='your_lookml_model',
view='your_lookml_view',
)
# [END how_to_cloud_looker_start_pdt_build_operator]

start_pdt_task_async >> check_pdt_task_async_sensor

build_pdt_task
258 changes: 258 additions & 0 deletions airflow/providers/google/cloud/hooks/looker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""This module contains a Google Cloud Looker hook."""

import json
import time
from enum import Enum
from typing import Dict, Optional

from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
from looker_sdk.sdk.api40 import methods as methods40
from packaging.version import parse as parse_version

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.models.connection import Connection
from airflow.version import version


class LookerHook(BaseHook):
"""Hook for Looker APIs."""

def __init__(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check if Looker is available for selection during connection setup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean by Looker is available for selection during connection setup. Do you mean the drop down list with connection types (Looker connection has HTTP type)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I mean this dropdown list.
I think you should create a new connection type and then add a new field in the connection form (timeout, veriify_ssl).

conn_name_attr = 'snowflake_conn_id'
default_conn_name = 'snowflake_default'
conn_type = 'snowflake'
hook_name = 'Snowflake'
supports_autocommit = True
@staticmethod
def get_connection_form_widgets() -> Dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import BooleanField, StringField
return {
"extra__snowflake__account": StringField(lazy_gettext('Account'), widget=BS3TextFieldWidget()),
"extra__snowflake__warehouse": StringField(
lazy_gettext('Warehouse'), widget=BS3TextFieldWidget()
),
"extra__snowflake__database": StringField(lazy_gettext('Database'), widget=BS3TextFieldWidget()),
"extra__snowflake__region": StringField(lazy_gettext('Region'), widget=BS3TextFieldWidget()),
"extra__snowflake__role": StringField(lazy_gettext('Role'), widget=BS3TextFieldWidget()),
"extra__snowflake__insecure_mode": BooleanField(
label=lazy_gettext('Insecure mode'), description="Turns off OCSP certificate checks"
),
}
@staticmethod
def get_ui_field_behaviour() -> Dict[str, Any]:
"""Returns custom field behaviour"""
import json
return {
"hidden_fields": ['port'],
"relabeling": {},
"placeholders": {
'extra': json.dumps(
{
"authenticator": "snowflake oauth",
"private_key_file": "private key",
"session_parameters": "session parameters",
},
indent=1,
),
'schema': 'snowflake schema',
'login': 'snowflake username',
'password': 'snowflake password',
'extra__snowflake__account': 'snowflake account name',
'extra__snowflake__warehouse': 'snowflake warehouse name',
'extra__snowflake__database': 'snowflake db name',
'extra__snowflake__region': 'snowflake hosted region',
'extra__snowflake__role': 'snowflake role',
'extra__snowflake__insecure_mode': 'insecure mode',
},
}

This will also allow you to rename the "Login" field to "Client ID" and the "Password" field to "Client Secret".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same thoughts first, but then I decided not to create a new connection as Looker in the process of transitioning to GCP and ideally it should use GCP connection soon. So I didn't want for users to get used to this temporary Looker connection. Hope that does make sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, users will have to consciously migrate to the new authentication method, which is likely to take some time. As part of the migration, we can start accepting GCP connections and add "(Deprecated)" to the name of the old connection type. I don't see why we should now make it difficult to configure a connection by not adding this connection to the UI now.

If you don't want to add the ability to easily create connections via UI, we can limit ourselves to documentation only, but we should make sure that everything is precisely described.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked to @potiuk and he said that the next google provider release is happening tomorrow. I will try to implement it by then, but we would really like to launch it with this release to avoid waiting for another month for the next one.

self,
looker_conn_id: str,
) -> None:
super().__init__()
self.looker_conn_id = looker_conn_id
# source is used to track origin of the requests
self.source = f'airflow:{version}'

def start_pdt_build(
self,
model: str,
view: str,
query_params: Optional[Dict] = None,
):
"""
Submits a PDT materialization job to Looker.

:param model: Required. The model of the PDT to start building.
:param view: Required. The view of the PDT to start building.
:param query_params: Optional. Additional materialization parameters.
"""
self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
self.log.debug("PDT materialization job source: '%s'.", self.source)

sdk = self.get_looker_sdk()
looker_ver = sdk.versions().looker_release_version
if parse_version(looker_ver) < parse_version("22.2.0"):
raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')

# unpack query_params dict into kwargs (if not None)
if query_params:
resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
else:
resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)

self.log.info("Start PDT build response: '%s'.", resp)

return resp

def check_pdt_build(
self,
materialization_id: str,
):
"""
Gets the PDT materialization job status from Looker.

:param materialization_id: Required. The materialization id to check status for.
"""
self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)

sdk = self.get_looker_sdk()
resp = sdk.check_pdt_build(materialization_id=materialization_id)

self.log.info("Check PDT build response: '%s'.", resp)
return resp

def pdt_build_status(
self,
materialization_id: str,
) -> Dict:
"""
Gets the PDT materialization job status.

:param materialization_id: Required. The materialization id to check status for.
"""
resp = self.check_pdt_build(materialization_id=materialization_id)

status_json = resp['resp_text']
status_dict = json.loads(status_json)

self.log.info(
"PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
)

return status_dict

def stop_pdt_build(
self,
materialization_id: str,
):
"""
Starts a PDT materialization job cancellation request.

:param materialization_id: Required. The materialization id to stop.
"""
self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
self.log.debug("PDT materialization job source: '%s'.", self.source)

sdk = self.get_looker_sdk()
resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)

self.log.info("Stop PDT build response: '%s'.", resp)
return resp

def wait_for_job(
self,
materialization_id: str,
wait_time: int = 10,
timeout: Optional[int] = None,
) -> None:
"""
Helper method which polls a PDT materialization job to check if it finishes.

:param materialization_id: Required. The materialization id to wait for.
:param wait_time: Optional. Number of seconds between checks.
:param timeout: Optional. How many seconds wait for job to be ready.
Used only if ``asynchronous`` is False.
"""
self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)

status = None
start = time.monotonic()

while status not in (
JobStatus.DONE.value,
JobStatus.ERROR.value,
JobStatus.CANCELLED.value,
JobStatus.UNKNOWN.value,
):

if timeout and start + timeout < time.monotonic():
self.stop_pdt_build(materialization_id=materialization_id)
raise AirflowException(
f"Timeout: PDT materialization job is not ready after {timeout}s. "
f"Job id: {materialization_id}."
)

time.sleep(wait_time)

status_dict = self.pdt_build_status(materialization_id=materialization_id)
status = status_dict['status']

if status == JobStatus.ERROR.value:
msg = status_dict['message']
raise AirflowException(
f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
)
if status == JobStatus.CANCELLED.value:
raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
if status == JobStatus.UNKNOWN.value:
raise AirflowException(
f'PDT materialization job has unknown status. Job id: {materialization_id}.'
)

self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)

def get_looker_sdk(self):
"""Returns Looker SDK client for Looker API 4.0."""

conn = self.get_connection(self.looker_conn_id)
settings = LookerApiSettings(conn)

transport = requests_transport.RequestsTransport.configure(settings)
return methods40.Looker40SDK(
auth_session.AuthSession(settings, transport, serialize.deserialize40, "4.0"),
serialize.deserialize40,
serialize.serialize,
transport,
"4.0",
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about adding the test_connection method to make it possible to check the connection configuration from the UI?

def test_connection(self):
"""Tests the Airbyte connection by hitting the health API"""
self.method = 'GET'
try:
res = self.run(
endpoint=f"api/{self.api_version}/health",
headers={"accept": "application/json"},
extra_options={'check_response': False},
)
if res.status_code == 200:
return True, 'Connection successfully tested'
else:
return False, res.text
except Exception as e:
return False, str(e)
finally:
self.method = 'POST'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to add this, but looks like it only works when the hook is associated with the connection type. And since I'd prefer to keep it HTTP for now, we would need to wait to add this feature until we switch to GCP connection type.


class LookerApiSettings(api_settings.ApiSettings):
"""Custom implementation of Looker SDK's `ApiSettings` class."""

def __init__(
self,
conn: Connection,
) -> None:
self.conn = conn # need to init before `read_config` is called in super
super().__init__()

def read_config(self):
"""
Overrides the default logic of getting connection settings. Fetches
the connection settings from Airflow's connection object.
"""

config = {}

if self.conn.host is None:
raise AirflowException(f'No `host` was supplied in connection: {self.conn.id}.')

if self.conn.port:
config["base_url"] = f"{self.conn.host}:{self.conn.port}" # port is optional
else:
config["base_url"] = self.conn.host

if self.conn.login:
config["client_id"] = self.conn.login
else:
raise AirflowException(f'No `login` was supplied in connection: {self.conn.id}.')

if self.conn.password:
config["client_secret"] = self.conn.password
else:
raise AirflowException(f'No `password` was supplied in connection: {self.conn.id}.')

extras = self.conn.extra_dejson # type: Dict

if 'verify_ssl' in extras:
config["verify_ssl"] = extras["verify_ssl"] # optional

if 'timeout' in extras:
config["timeout"] = extras["timeout"] # optional

return config


class JobStatus(Enum):
"""The job status string."""

QUEUED = "added"
PENDING = 'pending'
RUNNING = 'running'
CANCELLED = 'killed'
DONE = 'complete'
ERROR = 'error'
UNKNOWN = 'unknown'
Loading