Skip to content

Commit

Permalink
add support for Yandex Dataproc cluster labels (#29811)
Browse files Browse the repository at this point in the history
Co-authored-by: Maksim Zinal <mzinal@ru.ibm.com>
  • Loading branch information
zinal and Maksim Zinal authored Aug 23, 2023
1 parent 92474db commit 2ae1c10
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
5 changes: 5 additions & 0 deletions airflow/providers/yandex/operators/yandexcloud_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class DataprocCreateClusterOperator(BaseOperator):
Docs: https://cloud.yandex.com/docs/data-proc/concepts/logs
:param initialization_actions: Set of init-actions to run when cluster starts.
Docs: https://cloud.yandex.com/docs/data-proc/concepts/init-action
:param labels: Cluster labels as key:value pairs. No more than 64 per resource.
Docs: https://cloud.yandex.ru/docs/resource-manager/concepts/labels
"""

def __init__(
Expand Down Expand Up @@ -135,6 +137,7 @@ def __init__(
security_group_ids: Iterable[str] | None = None,
log_group_id: str | None = None,
initialization_actions: Iterable[InitializationAction] | None = None,
labels: dict[str, str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand Down Expand Up @@ -173,6 +176,7 @@ def __init__(
self.security_group_ids = security_group_ids
self.log_group_id = log_group_id
self.initialization_actions = initialization_actions
self.labels = labels

self.hook: DataprocHook | None = None

Expand Down Expand Up @@ -214,6 +218,7 @@ def execute(self, context: Context) -> dict:
host_group_ids=self.host_group_ids,
security_group_ids=self.security_group_ids,
log_group_id=self.log_group_id,
labels=self.labels,
initialization_actions=self.initialization_actions
and [
self.hook.sdk.wrappers.InitializationAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def test_create_cluster(self, create_cluster_mock, *_):
enable_ui_proxy=False,
host_group_ids=None,
security_group_ids=None,
labels=None,
initialization_actions=None,
)
context["task_instance"].xcom_push.assert_has_calls(
Expand Down

0 comments on commit 2ae1c10

Please sign in to comment.