diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index f75a0224fbe1c..c13c622937ffc 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -27,7 +27,6 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models import BaseOperator -from airflow.models.mappedoperator import MappedOperator from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, EmrServerlessHook from airflow.providers.amazon.aws.links.emr import ( EmrClusterLink, @@ -1259,91 +1258,12 @@ class EmrServerlessStartJobOperator(BaseOperator): "configuration_overrides": "json", } - @property - def operator_extra_links(self): - """ - Dynamically add extra links depending on the job type and if they're enabled. - - If S3 or CloudWatch monitoring configurations exist, add links directly to the relevant consoles. - Only add dashboard links if they're explicitly enabled. These are one-time links that any user - can access, but expire on first click or one hour, whichever comes first. - """ - op_extra_links = [] - - if isinstance(self, MappedOperator): - operator_class = self.operator_class - enable_application_ui_links = self.partial_kwargs.get( - "enable_application_ui_links" - ) or self.expand_input.value.get("enable_application_ui_links") - job_driver = self.partial_kwargs.get("job_driver", {}) or self.expand_input.value.get( - "job_driver", {} - ) - configuration_overrides = self.partial_kwargs.get( - "configuration_overrides" - ) or self.expand_input.value.get("configuration_overrides") - - # Configuration overrides can either be a list or a dictionary, depending on whether it's passed in as partial or expand. - if isinstance(configuration_overrides, list): - if any( - [ - operator_class.is_monitoring_in_job_override( - self=operator_class, - config_key="s3MonitoringConfiguration", - job_override=job_override, - ) - for job_override in configuration_overrides - ] - ): - op_extra_links.extend([EmrServerlessS3LogsLink()]) - if any( - [ - operator_class.is_monitoring_in_job_override( - self=operator_class, - config_key="cloudWatchLoggingConfiguration", - job_override=job_override, - ) - for job_override in configuration_overrides - ] - ): - op_extra_links.extend([EmrServerlessCloudWatchLogsLink()]) - else: - if operator_class.is_monitoring_in_job_override( - self=operator_class, - config_key="s3MonitoringConfiguration", - job_override=configuration_overrides, - ): - op_extra_links.extend([EmrServerlessS3LogsLink()]) - if operator_class.is_monitoring_in_job_override( - self=operator_class, - config_key="cloudWatchLoggingConfiguration", - job_override=configuration_overrides, - ): - op_extra_links.extend([EmrServerlessCloudWatchLogsLink()]) - - else: - operator_class = self - enable_application_ui_links = self.enable_application_ui_links - configuration_overrides = self.configuration_overrides - job_driver = self.job_driver - - if operator_class.is_monitoring_in_job_override( - "s3MonitoringConfiguration", configuration_overrides - ): - op_extra_links.extend([EmrServerlessS3LogsLink()]) - if operator_class.is_monitoring_in_job_override( - "cloudWatchLoggingConfiguration", configuration_overrides - ): - op_extra_links.extend([EmrServerlessCloudWatchLogsLink()]) - - if enable_application_ui_links: - op_extra_links.extend([EmrServerlessDashboardLink()]) - if isinstance(job_driver, list): - if any("sparkSubmit" in ind_job_driver for ind_job_driver in job_driver): - op_extra_links.extend([EmrServerlessLogsLink()]) - elif "sparkSubmit" in job_driver: - op_extra_links.extend([EmrServerlessLogsLink()]) - - return tuple(op_extra_links) + operator_extra_links = ( + EmrServerlessS3LogsLink(), + EmrServerlessCloudWatchLogsLink(), + EmrServerlessDashboardLink(), + EmrServerlessLogsLink(), + ) def __init__( self, diff --git a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst index 65a0fc8bfebe6..9915763e19e09 100644 --- a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst +++ b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst @@ -72,7 +72,8 @@ Open Application UIs The operator can also be configured to generate one-time links to the application UIs and Spark stdout logs by passing the ``enable_application_ui_links=True`` as a parameter. Once the job starts running, these links -are available in the Details section of the relevant Task. +are available in the Details section of the relevant Task. If ``enable_application_ui_links=False`` then the +links will be present but grayed out. You need to ensure you have the following IAM permissions to generate the dashboard link. diff --git a/tests/providers/amazon/aws/operators/test_emr_serverless.py b/tests/providers/amazon/aws/operators/test_emr_serverless.py index a93366d296703..4804f2286993c 100644 --- a/tests/providers/amazon/aws/operators/test_emr_serverless.py +++ b/tests/providers/amazon/aws/operators/test_emr_serverless.py @@ -25,23 +25,13 @@ from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, TaskDeferred from airflow.providers.amazon.aws.hooks.emr import EmrServerlessHook -from airflow.providers.amazon.aws.links.emr import ( - EmrServerlessCloudWatchLogsLink, - EmrServerlessDashboardLink, - EmrServerlessLogsLink, - EmrServerlessS3LogsLink, -) from airflow.providers.amazon.aws.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessDeleteApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessStopApplicationOperator, ) -from airflow.serialization.serialized_objects import ( - BaseSerialization, -) from airflow.utils.types import NOTSET -from tests.test_utils.compat import deserialize_operator if TYPE_CHECKING: from unittest.mock import MagicMock @@ -1152,50 +1142,6 @@ def test_links_spark_without_applicationui_enabled( job_run_id=job_run_id, ) - def test_operator_extra_links_mapped_without_applicationui_enabled( - self, - ): - operator = EmrServerlessStartJobOperator.partial( - task_id=task_id, - application_id=application_id, - execution_role_arn=execution_role_arn, - job_driver=spark_job_driver, - enable_application_ui_links=False, - ).expand( - configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides], - ) - - ser_operator = BaseSerialization.serialize(operator) - deser_operator = deserialize_operator(ser_operator) - - assert deser_operator.operator_extra_links == [ - EmrServerlessS3LogsLink(), - EmrServerlessCloudWatchLogsLink(), - ] - - def test_operator_extra_links_mapped_with_applicationui_enabled_at_partial( - self, - ): - operator = EmrServerlessStartJobOperator.partial( - task_id=task_id, - application_id=application_id, - execution_role_arn=execution_role_arn, - job_driver=spark_job_driver, - enable_application_ui_links=True, - ).expand( - configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides], - ) - - ser_operator = BaseSerialization.serialize(operator) - deser_operator = deserialize_operator(ser_operator) - - assert deser_operator.operator_extra_links == [ - EmrServerlessS3LogsLink(), - EmrServerlessCloudWatchLogsLink(), - EmrServerlessDashboardLink(), - EmrServerlessLogsLink(), - ] - class TestEmrServerlessDeleteOperator: @mock.patch.object(EmrServerlessHook, "get_waiter")