Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Yandex Query support from Yandex.Cloud #37458

Merged
merged 34 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ef12228
initial commit
uzhastik Feb 9, 2024
ae4fab2
support web link
uzhastik Feb 9, 2024
aad01bd
move jwt logic out of base hook
uzhastik Feb 9, 2024
39ecf8e
use http client in hook
uzhastik Feb 10, 2024
71917b5
add yq_results
uzhastik Feb 12, 2024
f9d7e72
add token_prefix, format exception message
uzhastik Feb 13, 2024
318f652
use YQResults inside client
uzhastik Feb 13, 2024
8faf664
add tests, fix provider.yaml
uzhastik Feb 13, 2024
d5f0e3f
fix oauth token usage, add tests for complex results
uzhastik Feb 14, 2024
90b891c
add tests for YQ operator
uzhastik Feb 14, 2024
88156b2
fix test name
uzhastik Feb 14, 2024
04e8c92
linting
uzhastik Feb 14, 2024
be6bbc7
restyling
uzhastik Feb 14, 2024
0708e52
improve tests, fix close(), add link to YQ service
uzhastik Feb 15, 2024
0f7f968
trim spaces
uzhastik Feb 15, 2024
cf4c6ef
add docstrings, remove query description, move privates to bottom of …
uzhastik Feb 15, 2024
773d996
fix last newline
uzhastik Feb 15, 2024
d23a317
restyling
uzhastik Feb 15, 2024
ec97073
restyling
uzhastik Feb 15, 2024
20fed73
refactor, restyling
uzhastik Feb 16, 2024
ebe4a47
revert version
uzhastik Feb 16, 2024
e6badc2
change text to trigger CI checks
uzhastik Feb 17, 2024
8a8cc97
fixes for linters
uzhastik Feb 17, 2024
ece1e0c
rework
uzhastik Mar 2, 2024
843cd93
restyling
uzhastik Mar 2, 2024
f2fc45a
fix CI tests, add yq link tests
uzhastik Mar 10, 2024
54110ac
add doc strings
uzhastik Mar 10, 2024
d238e20
fix link style tests
uzhastik Mar 10, 2024
ac36e53
rename files, add deps, fix doc string
uzhastik Mar 14, 2024
2c3f315
replace SQLExecuteQueryOperator with BaseOperator
uzhastik Mar 14, 2024
c1a760d
fix static checks
uzhastik Mar 14, 2024
daa9680
fight with static checks
uzhastik Mar 14, 2024
072e625
remove http client, use py package
uzhastik Mar 16, 2024
da10459
fix static checks
uzhastik Mar 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/providers/yandex/hooks/yandex.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ def __init__(
self.connection_id = yandex_conn_id or connection_id or default_conn_name
self.connection = self.get_connection(self.connection_id)
self.extras = self.connection.extra_dejson
credentials = get_credentials(
self.credentials = get_credentials(
oauth_token=self._get_field("oauth"),
service_account_json=self._get_field("service_account_json"),
service_account_json_path=self._get_field("service_account_json_path"),
)
sdk_config = self._get_endpoint()
self.sdk = yandexcloud.SDK(user_agent=provider_user_agent(), **sdk_config, **credentials)
self.sdk = yandexcloud.SDK(user_agent=provider_user_agent(), **sdk_config, **self.credentials)
self.default_folder_id = default_folder_id or self._get_field("folder_id")
self.default_public_ssh_key = default_public_ssh_key or self._get_field("public_ssh_key")
self.default_service_account_id = default_service_account_id or get_service_account_id(
Expand Down
132 changes: 132 additions & 0 deletions airflow/providers/yandex/hooks/yq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import time
from datetime import timedelta
from typing import Any

import jwt
import requests
from urllib3.util.retry import Retry
from yandex_query_client import YQHttpClient, YQHttpClientConfig

from airflow.exceptions import AirflowException
from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook
from airflow.providers.yandex.utils.user_agent import provider_user_agent


class YQHook(YandexCloudBaseHook):
"""A hook for Yandex Query."""

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)

config = YQHttpClientConfig(
token=self._get_iam_token(), project=self.default_folder_id, user_agent=provider_user_agent()
)

self.client: YQHttpClient = YQHttpClient(config=config)

def close(self):
"""Release all resources."""
self.client.close()

def create_query(self, query_text: str | None, name: str | None = None) -> str:
"""Create and run query.

:param query_text: SQL text.
:param name: name for the query
"""
return self.client.create_query(
name=name,
query_text=query_text,
)

def wait_results(self, query_id: str, execution_timeout: timedelta = timedelta(minutes=30)) -> Any:
"""Wait for query complete and get results.

:param query_id: ID of query.
:param execution_timeout: how long to wait for the query to complete.
"""
result_set_count = self.client.wait_query_to_succeed(
query_id, execution_timeout=execution_timeout, stop_on_timeout=True
)

return self.client.get_query_all_result_sets(query_id=query_id, result_set_count=result_set_count)

def stop_query(self, query_id: str) -> None:
"""Stop the query.

:param query_id: ID of the query.
"""
self.client.stop_query(query_id)

def get_query(self, query_id: str) -> Any:
"""Get query info.

:param query_id: ID of the query.
"""
return self.client.get_query(query_id)

def get_query_status(self, query_id: str) -> str:
"""Get status fo the query.

:param query_id: ID of query.
"""
return self.client.get_query_status(query_id)

def compose_query_web_link(self, query_id: str):
"""Compose web link to query in Yandex Query UI.

:param query_id: ID of query.
"""
return self.client.compose_query_web_link(query_id)

def _get_iam_token(self) -> str:
if "token" in self.credentials:
return self.credentials["token"]
if "service_account_key" in self.credentials:
return YQHook._resolve_service_account_key(self.credentials["service_account_key"])
raise AirflowException(f"Unknown credentials type, available keys {self.credentials.keys()}")

@staticmethod
def _resolve_service_account_key(sa_info: dict) -> str:
with YQHook._create_session() as session:
api = "https://iam.api.cloud.yandex.net/iam/v1/tokens"
now = int(time.time())
payload = {"aud": api, "iss": sa_info["service_account_id"], "iat": now, "exp": now + 360}

encoded_token = jwt.encode(
payload, sa_info["private_key"], algorithm="PS256", headers={"kid": sa_info["id"]}
)

data = {"jwt": encoded_token}
iam_response = session.post(api, json=data)
iam_response.raise_for_status()

return iam_response.json()["iamToken"]

@staticmethod
def _create_session() -> requests.Session:
session = requests.Session()
session.verify = False
retry = Retry(backoff_factor=0.3, total=10)
session.mount("http://", requests.adapters.HTTPAdapter(max_retries=retry))
session.mount("https://", requests.adapters.HTTPAdapter(max_retries=retry))

return session
16 changes: 16 additions & 0 deletions airflow/providers/yandex/links/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
41 changes: 41 additions & 0 deletions airflow/providers/yandex/links/yq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.models import BaseOperatorLink, XCom

if TYPE_CHECKING:
from airflow.models import BaseOperator
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context

XCOM_WEBLINK_KEY = "web_link"


class YQLink(BaseOperatorLink):
"""Web link to query in Yandex Query UI."""

name = "Yandex Query"

def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
return XCom.get_value(key=XCOM_WEBLINK_KEY, ti_key=ti_key) or "https://yq.cloud.yandex.ru"

@staticmethod
def persist(context: Context, task_instance: BaseOperator, web_link: str) -> None:
task_instance.xcom_push(context, key=XCOM_WEBLINK_KEY, value=web_link)
92 changes: 92 additions & 0 deletions airflow/providers/yandex/operators/yq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence

from airflow.models import BaseOperator
from airflow.providers.yandex.hooks.yq import YQHook
from airflow.providers.yandex.links.yq import YQLink

if TYPE_CHECKING:
from airflow.utils.context import Context


class YQExecuteQueryOperator(BaseOperator):
"""
Executes sql code using Yandex Query service.

:param sql: the SQL code to be executed as a single string
:param name: name of the query in YandexQuery
:param folder_id: cloud folder id where to create query
:param yandex_conn_id: Airflow connection ID to get parameters from
"""

operator_extra_links = (YQLink(),)
template_fields: Sequence[str] = ("sql",)
template_fields_renderers = {"sql": "sql"}
template_ext: Sequence[str] = (".sql",)
ui_color = "#ededed"

def __init__(
self,
*,
name: str | None = None,
folder_id: str | None = None,
yandex_conn_id: str | None = None,
public_ssh_key: str | None = None,
service_account_id: str | None = None,
sql: str,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.name = name
self.folder_id = folder_id
self.yandex_conn_id = yandex_conn_id
self.public_ssh_key = public_ssh_key
self.service_account_id = service_account_id
self.sql = sql

self.query_id: str | None = None

@cached_property
def hook(self) -> YQHook:
"""Get valid hook."""
return YQHook(
yandex_conn_id=self.yandex_conn_id,
default_folder_id=self.folder_id,
default_public_ssh_key=self.public_ssh_key,
default_service_account_id=self.service_account_id,
)

def execute(self, context: Context) -> Any:
self.query_id = self.hook.create_query(query_text=self.sql, name=self.name)

# pass to YQLink
web_link = self.hook.compose_query_web_link(self.query_id)
YQLink.persist(context, self, web_link)

results = self.hook.wait_results(self.query_id)
# forget query to avoid 'stop_query' in on_kill
self.query_id = None
return results

def on_kill(self) -> None:
if self.hook is not None and self.query_id is not None:
self.hook.stop_query(self.query_id)
self.hook.close()
21 changes: 21 additions & 0 deletions airflow/providers/yandex/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ versions:
dependencies:
- apache-airflow>=2.6.0
- yandexcloud>=0.228.0
- yandex-query-client>=0.1.2
- python-dateutil>=2.8.0
# Requests 3 if it will be released, will be heavily breaking.
- requests>=2.27.0,<3

integrations:
- integration-name: Yandex.Cloud
Expand All @@ -63,18 +67,32 @@ integrations:
logo: /integration-logos/yandex/Yandex-Cloud.png
tags: [service]

- integration-name: Yandex.Cloud YQ
external-doc-url: https://cloud.yandex.com/en/services/query
how-to-guide:
- /docs/apache-airflow-providers-yandex/operators.rst
logo: /integration-logos/yandex/Yandex-Cloud.png
tags: [service]

operators:
- integration-name: Yandex.Cloud Dataproc
python-modules:
- airflow.providers.yandex.operators.yandexcloud_dataproc

- integration-name: Yandex.Cloud YQ
python-modules:
- airflow.providers.yandex.operators.yq

hooks:
- integration-name: Yandex.Cloud
python-modules:
- airflow.providers.yandex.hooks.yandex
- integration-name: Yandex.Cloud Dataproc
python-modules:
- airflow.providers.yandex.hooks.yandexcloud_dataproc
- integration-name: Yandex.Cloud YQ
python-modules:
- airflow.providers.yandex.hooks.yq

connection-types:
- hook-class-name: airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook
Expand All @@ -83,6 +101,9 @@ connection-types:
secrets-backends:
- airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend

extra-links:
- airflow.providers.yandex.links.yq.YQLink

config:
yandex:
description: This section contains settings for Yandex Cloud integration.
Expand Down
3 changes: 3 additions & 0 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,9 @@
"yandex": {
"deps": [
"apache-airflow>=2.6.0",
"python-dateutil>=2.8.0",
"requests>=2.27.0,<3",
"yandex-query-client>=0.1.2",
"yandexcloud>=0.228.0"
],
"devel-deps": [],
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,9 @@ weaviate = [ # source: airflow/providers/weaviate/provider.yaml
"weaviate-client>=3.24.2",
]
yandex = [ # source: airflow/providers/yandex/provider.yaml
"python-dateutil>=2.8.0",
"requests>=2.27.0,<3",
"yandex-query-client>=0.1.2",
"yandexcloud>=0.228.0",
]
zendesk = [ # source: airflow/providers/zendesk/provider.yaml
Expand Down
Loading