diff --git a/airflow/contrib/sensors/__init__.py b/airflow/contrib/sensors/__init__.py index 65739f1ed0eb5..664a860cc6263 100644 --- a/airflow/contrib/sensors/__init__.py +++ b/airflow/contrib/sensors/__init__.py @@ -114,12 +114,6 @@ 'airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor' ), }, - 'hdfs_sensor': { - 'HdfsFolderSensor': 'airflow.providers.apache.hdfs.sensors.hdfs.HdfsFolderSensor', - 'HdfsRegexSensor': 'airflow.providers.apache.hdfs.sensors.hdfs.HdfsRegexSensor', - 'HdfsSensorFolder': 'airflow.providers.apache.hdfs.sensors.hdfs.HdfsFolderSensor', - 'HdfsSensorRegex': 'airflow.providers.apache.hdfs.sensors.hdfs.HdfsRegexSensor', - }, 'imap_attachment_sensor': { 'ImapAttachmentSensor': 'airflow.providers.imap.sensors.imap_attachment.ImapAttachmentSensor', }, diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py index db2b7f0a298ee..779a9a3351284 100644 --- a/airflow/hooks/__init__.py +++ b/airflow/hooks/__init__.py @@ -39,10 +39,6 @@ "DruidDbApiHook": "airflow.providers.apache.druid.hooks.druid.DruidDbApiHook", "DruidHook": "airflow.providers.apache.druid.hooks.druid.DruidHook", }, - "hdfs_hook": { - "HDFSHook": "airflow.providers.apache.hdfs.hooks.hdfs.HDFSHook", - "HDFSHookException": "airflow.providers.apache.hdfs.hooks.hdfs.HDFSHookException", - }, "hive_hooks": { "HIVE_QUEUE_PRIORITIES": "airflow.providers.apache.hive.hooks.hive.HIVE_QUEUE_PRIORITIES", "HiveCliHook": "airflow.providers.apache.hive.hooks.hive.HiveCliHook", diff --git a/airflow/providers/apache/hdfs/CHANGELOG.rst b/airflow/providers/apache/hdfs/CHANGELOG.rst index 62cfb1904db01..5f41592482e65 100644 --- a/airflow/providers/apache/hdfs/CHANGELOG.rst +++ b/airflow/providers/apache/hdfs/CHANGELOG.rst @@ -24,6 +24,24 @@ Changelog --------- +4.0.0 +----- + +Breaking changes +~~~~~~~~~~~~~~~~ + +The original HDFS Hook and sensor has been removed. It used the old HDFS snakebite-py3 library that had no +update in years and the protobuf they are using reached end of life. + +The 3.* version of the provider is still available and can be used if you need to use the old hooks and +sensors. + +The ``HDFSHook``, ``HDFSSensor``, ``HdfsRegexSensor``, ``HdfsRegexSensor`` that have been removed from +this provider and they are not available any more. If you want to continue using them, +you can use 3.* version of the provider, but the recommendation is to switch to the new +``WebHDFSHook`` and ``WebHDFSSensor`` that use the ``WebHDFS`` API. + + 3.2.1 ..... diff --git a/airflow/providers/apache/hdfs/hooks/hdfs.py b/airflow/providers/apache/hdfs/hooks/hdfs.py index fda716327a022..7f6f892b4e60b 100644 --- a/airflow/providers/apache/hdfs/hooks/hdfs.py +++ b/airflow/providers/apache/hdfs/hooks/hdfs.py @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,98 +14,38 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Hook for HDFS operations""" from __future__ import annotations -from typing import Any - -from airflow.configuration import conf -from airflow.exceptions import AirflowException +from airflow import AirflowException from airflow.hooks.base import BaseHook -try: - from snakebite.client import AutoConfigClient, Client, HAClient, Namenode - - snakebite_loaded = True -except ImportError: - snakebite_loaded = False +_EXCEPTION_MESSAGE = """The old HDFS Hooks have been removed in 4.0.0 version of the apache.hdfs provider. +Please convert your DAGs to use the WebHdfsHook or downgrade the provider to below 4.* +if you want to continue using it. +If you want to use earlier provider you can downgrade to latest released 3.* version +using `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints) +""" class HDFSHookException(AirflowException): - """Exception specific for HDFS""" - - -class HDFSHook(BaseHook): """ - Interact with HDFS. This class is a wrapper around the snakebite library. - - :param hdfs_conn_id: Connection id to fetch connection info - :param proxy_user: effective user for HDFS operations - :param autoconfig: use snakebite's automatically configured client + This Exception has been removed and is not functional. Please convert your DAGs to use the + WebHdfsHook or downgrade the provider to below 4.* if you want to continue using it. + If you want to use earlier provider you can downgrade to latest released 3.* version + using `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints). """ - conn_name_attr = "hdfs_conn_id" - default_conn_name = "hdfs_default" - conn_type = "hdfs" - hook_name = "HDFS" + def __init__(self, *args, **kwargs): + raise Exception(_EXCEPTION_MESSAGE) - def __init__( - self, - hdfs_conn_id: str | set[str] = "hdfs_default", - proxy_user: str | None = None, - autoconfig: bool = False, - ): - super().__init__() - if not snakebite_loaded: - raise ImportError( - "This HDFSHook implementation requires snakebite, but " - "snakebite is not compatible with Python 3 " - "(as of August 2015). Please help by submitting a PR!" - ) - self.hdfs_conn_id = {hdfs_conn_id} if isinstance(hdfs_conn_id, str) else hdfs_conn_id - self.proxy_user = proxy_user - self.autoconfig = autoconfig - def get_conn(self) -> Any: - """Returns a snakebite HDFSClient object.""" - # When using HAClient, proxy_user must be the same, so is ok to always - # take the first. - effective_user = self.proxy_user - autoconfig = self.autoconfig - use_sasl = conf.get("core", "security") == "kerberos" - - try: - connections = [self.get_connection(i) for i in self.hdfs_conn_id] - - if not effective_user: - effective_user = connections[0].login - if not autoconfig: - autoconfig = connections[0].extra_dejson.get("autoconfig", False) - hdfs_namenode_principal = connections[0].extra_dejson.get("hdfs_namenode_principal") - except AirflowException: - if not autoconfig: - raise - - if autoconfig: - # will read config info from $HADOOP_HOME conf files - client = AutoConfigClient(effective_user=effective_user, use_sasl=use_sasl) - elif len(connections) == 1: - client = Client( - connections[0].host, - connections[0].port, - effective_user=effective_user, - use_sasl=use_sasl, - hdfs_namenode_principal=hdfs_namenode_principal, - ) - elif len(connections) > 1: - name_node = [Namenode(conn.host, conn.port) for conn in connections] - client = HAClient( - name_node, - effective_user=effective_user, - use_sasl=use_sasl, - hdfs_namenode_principal=hdfs_namenode_principal, - ) - else: - raise HDFSHookException("conn_id doesn't exist in the repository and autoconfig is not specified") +class HDFSHook(BaseHook): + """ + This Hook has been removed and is not functional. Please convert your DAGs to use the + WebHdfsHook or downgrade the provider to below 4.*. if you want to continue using it. + If you want to use earlier provider you can downgrade to latest released 3.* version + using `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints). + """ - return client + def __init__(self, *args, **kwargs): + raise Exception(_EXCEPTION_MESSAGE) diff --git a/airflow/providers/apache/hdfs/provider.yaml b/airflow/providers/apache/hdfs/provider.yaml index 4eb7e0e2bb522..19168ea6a48dd 100644 --- a/airflow/providers/apache/hdfs/provider.yaml +++ b/airflow/providers/apache/hdfs/provider.yaml @@ -24,6 +24,7 @@ description: | suspended: false versions: + - 4.0.0 - 3.2.1 - 3.2.0 - 3.1.0 @@ -41,14 +42,11 @@ versions: dependencies: - apache-airflow>=2.4.0 - - snakebite-py3 - hdfs[avro,dataframe,kerberos]>=2.0.4 integrations: - integration-name: Hadoop Distributed File System (HDFS) external-doc-url: https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html - how-to-guide: - - /docs/apache-airflow-providers-apache-hdfs/operators/hdfs.rst logo: /integration-logos/apache/hadoop.png tags: [apache] - integration-name: WebHDFS @@ -59,22 +57,11 @@ integrations: tags: [apache] sensors: - - integration-name: Hadoop Distributed File System (HDFS) - python-modules: - - airflow.providers.apache.hdfs.sensors.hdfs - integration-name: WebHDFS python-modules: - airflow.providers.apache.hdfs.sensors.web_hdfs hooks: - - integration-name: Hadoop Distributed File System (HDFS) - python-modules: - - airflow.providers.apache.hdfs.hooks.hdfs - integration-name: WebHDFS python-modules: - airflow.providers.apache.hdfs.hooks.webhdfs - - -connection-types: - - hook-class-name: airflow.providers.apache.hdfs.hooks.hdfs.HDFSHook - connection-type: hdfs diff --git a/airflow/providers/apache/hdfs/sensors/hdfs.py b/airflow/providers/apache/hdfs/sensors/hdfs.py index 5c55209f4d1de..da7e5e017184e 100644 --- a/airflow/providers/apache/hdfs/sensors/hdfs.py +++ b/airflow/providers/apache/hdfs/sensors/hdfs.py @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -17,181 +16,33 @@ # under the License. from __future__ import annotations -import logging -import re -import sys -from typing import TYPE_CHECKING, Any, Pattern, Sequence - -from airflow import settings -from airflow.providers.apache.hdfs.hooks.hdfs import HDFSHook from airflow.sensors.base import BaseSensorOperator -if TYPE_CHECKING: - from airflow.utils.context import Context - -log = logging.getLogger(__name__) +_EXCEPTION_MESSAGE = """The old HDFS Sensors have been removed in 4.0.0 version of the apache.hdfs provider. +Please convert your DAGs to use the WebHdfsSensor or downgrade the provider to below 4.* +if you want to continue using it. +If you want to use earlier provider you can downgrade to latest released 3.* version +using `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints) +""" class HdfsSensor(BaseSensorOperator): """ - Waits for a file or folder to land in HDFS - - :param filepath: The route to a stored file. - :param hdfs_conn_id: The Airflow connection used for HDFS credentials. - :param ignored_ext: This is the list of ignored extensions. - :param ignore_copying: Shall we ignore? - :param file_size: This is the size of the file. - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:HdfsSensor` + This Sensor has been removed and is not functional. Please convert your DAGs to use the + WebHdfsSensor or downgrade the provider to below 4.* if you want to continue using it. + If you want to use earlier provider you can downgrade to latest released 3.* version + using `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints). """ - template_fields: Sequence[str] = ("filepath",) - ui_color = settings.WEB_COLORS["LIGHTBLUE"] - - def __init__( - self, - *, - filepath: str, - hdfs_conn_id: str = "hdfs_default", - ignored_ext: list[str] | None = None, - ignore_copying: bool = True, - file_size: int | None = None, - hook: type[HDFSHook] = HDFSHook, - **kwargs: Any, - ) -> None: - super().__init__(**kwargs) - if ignored_ext is None: - ignored_ext = ["_COPYING_"] - self.filepath = filepath - self.hdfs_conn_id = hdfs_conn_id - self.file_size = file_size - self.ignored_ext = ignored_ext - self.ignore_copying = ignore_copying - self.hook = hook - - @staticmethod - def filter_for_filesize(result: list[dict[Any, Any]], size: int | None = None) -> list[dict[Any, Any]]: - """ - Will test the filepath result and test if its size is at least self.filesize - - :param result: a list of dicts returned by Snakebite ls - :param size: the file size in MB a file should be at least to trigger True - :return: (bool) depending on the matching criteria - """ - if size: - log.debug("Filtering for file size >= %s in files: %s", size, map(lambda x: x["path"], result)) - size *= settings.MEGABYTE - result = [x for x in result if x["length"] >= size] - log.debug("HdfsSensor.poke: after size filter result is %s", result) - return result - - @staticmethod - def filter_for_ignored_ext( - result: list[dict[Any, Any]], ignored_ext: list[str], ignore_copying: bool - ) -> list[dict[Any, Any]]: - """ - Will filter if instructed to do so the result to remove matching criteria - - :param result: list of dicts returned by Snakebite ls - :param ignored_ext: list of ignored extensions - :param ignore_copying: shall we ignore ? - :return: list of dicts which were not removed - """ - if ignore_copying: - regex_builder = r"^.*\.(%s$)$" % "$|".join(ignored_ext) - ignored_extensions_regex = re.compile(regex_builder) - log.debug( - "Filtering result for ignored extensions: %s in files %s", - ignored_extensions_regex.pattern, - map(lambda x: x["path"], result), - ) - result = [x for x in result if not ignored_extensions_regex.match(x["path"])] - log.debug("HdfsSensor.poke: after ext filter result is %s", result) - return result - - def poke(self, context: Context) -> bool: - """Get a snakebite client connection and check for file.""" - sb_client = self.hook(self.hdfs_conn_id).get_conn() - self.log.info("Poking for file %s", self.filepath) - try: - # IMOO it's not right here, as there is no raise of any kind. - # if the filepath is let's say '/data/mydirectory', - # it's correct but if it is '/data/mydirectory/*', - # it's not correct as the directory exists and sb_client does not raise any error - # here is a quick fix - result = sb_client.ls([self.filepath], include_toplevel=False) - self.log.debug("HdfsSensor.poke: result is %s", result) - result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) - result = self.filter_for_filesize(result, self.file_size) - return bool(result) - except Exception: - e = sys.exc_info() - self.log.debug("Caught an exception !: %s", str(e)) - return False - + def __init__(self, *args, **kwargs): + raise Exception(_EXCEPTION_MESSAGE) -class HdfsRegexSensor(HdfsSensor): - """ - Waits for matching files by matching on regex - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:HdfsRegexSensor` - """ - - def __init__(self, regex: Pattern[str], *args: Any, **kwargs: Any) -> None: - super().__init__(*args, **kwargs) - self.regex = regex - - def poke(self, context: Context) -> bool: - """ - Poke matching files in a directory with self.regex - - :return: Bool depending on the search criteria - """ - sb_client = self.hook(self.hdfs_conn_id).get_conn() - self.log.info( - "Poking for %s to be a directory with files matching %s", self.filepath, self.regex.pattern - ) - result = [ - f - for f in sb_client.ls([self.filepath], include_toplevel=False) - if f["file_type"] == "f" and self.regex.match(f["path"].replace(f"{self.filepath}/", "")) - ] - result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) - result = self.filter_for_filesize(result, self.file_size) - return bool(result) - - -class HdfsFolderSensor(HdfsSensor): - """ - Waits for a non-empty directory - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:HdfsFolderSensor` - """ - def __init__(self, be_empty: bool = False, *args: Any, **kwargs: Any): - super().__init__(*args, **kwargs) - self.be_empty = be_empty +class HdfsRegexSensor(HdfsSensor): # noqa: D101 Ignore missing docstring + def __init__(self, *args, **kwargs): + raise Exception(_EXCEPTION_MESSAGE) - def poke(self, context: Context) -> bool: - """ - Poke for a non empty directory - :return: Bool depending on the search criteria - """ - sb_client = self.hook(self.hdfs_conn_id).get_conn() - result = sb_client.ls([self.filepath], include_toplevel=True) - result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) - result = self.filter_for_filesize(result, self.file_size) - if self.be_empty: - self.log.info("Poking for filepath %s to a empty directory", self.filepath) - return len(result) == 1 and result[0]["path"] == self.filepath - else: - self.log.info("Poking for filepath %s to a non empty directory", self.filepath) - result.pop(0) - return bool(result) and result[0]["file_type"] == "f" +class HdfsFolderSensor(HdfsSensor): # noqa: D101 Ignore missing docstring + def __init__(self, *args, **kwargs): + raise Exception(_EXCEPTION_MESSAGE) diff --git a/airflow/sensors/__init__.py b/airflow/sensors/__init__.py index 4de283d1107d3..17cabf43be368 100644 --- a/airflow/sensors/__init__.py +++ b/airflow/sensors/__init__.py @@ -33,9 +33,6 @@ 'ExternalTaskSensor': 'airflow.sensors.external_task.ExternalTaskSensor', 'ExternalTaskSensorLink': 'airflow.sensors.external_task.ExternalTaskSensorLink', }, - 'hdfs_sensor': { - 'HdfsSensor': 'airflow.providers.apache.hdfs.sensors.hdfs.HdfsSensor', - }, 'hive_partition_sensor': { 'HivePartitionSensor': 'airflow.providers.apache.hive.sensors.hive_partition.HivePartitionSensor', }, diff --git a/docs/apache-airflow-providers-apache-hdfs/connections.rst b/docs/apache-airflow-providers-apache-hdfs/connections.rst index 6cbee0c09dbd4..c67331aaedfd3 100644 --- a/docs/apache-airflow-providers-apache-hdfs/connections.rst +++ b/docs/apache-airflow-providers-apache-hdfs/connections.rst @@ -23,8 +23,6 @@ The Apache HDFS connection type enables connection to Apache HDFS. Default Connection IDs ---------------------- -HDFS Hook uses parameter ``hdfs_conn_id`` for Connection IDs and the value of the parameter -as ``hdfs_default`` by default. Web HDFS Hook uses parameter ``webhdfs_conn_id`` for Connection IDs and the value of the parameter as ``webhdfs_default`` by default. @@ -40,12 +38,7 @@ Login Effective user for HDFS operations (non-Kerberized). Extra (optional, connection parameters) - Specify the extra parameters (as json dictionary) that can be used in HDFS connection. The following - parameters out of the standard python parameters are supported: - - * ``autoconfig`` - Default value is bool: False. Use snakebite's automatically configured client. This HDFSHook implementation requires snakebite. - * ``hdfs_namenode_principal`` - Specifies the Kerberos principal to use for HDFS. - + Specify the extra parameters (as json dictionary) that can be used in Web HDFS connection. The following extra parameters can be used to configure SSL for Web HDFS Hook: * ``use_ssl`` - If SSL should be used. By default is set to `false`. diff --git a/docs/apache-airflow-providers-apache-hdfs/operators/hdfs.rst b/docs/apache-airflow-providers-apache-hdfs/operators/hdfs.rst deleted file mode 100644 index b688be08f6cf6..0000000000000 --- a/docs/apache-airflow-providers-apache-hdfs/operators/hdfs.rst +++ /dev/null @@ -1,74 +0,0 @@ - .. Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - .. http://www.apache.org/licenses/LICENSE-2.0 - - .. Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - - - -HDFS Operators -============== - -`Apache Hadoop HDFS `__ is a distributed file system -designed to run on commodity hardware. It has many similarities with existing distributed file systems. -However, the differences from other distributed file systems are significant. -HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. -HDFS provides high throughput access to application data and is suitable for applications that have -large data sets. HDFS relaxes a few POSIX requirements to enable streaming access to file -system data. HDFS is now an Apache Hadoop sub project. - -Prerequisite ------------- - -To use operators, you must configure a :doc:`HDFS Connection `. - -.. _howto/operator:HdfsFolderSensor: - -HdfsFolderSensor ----------------- -Waits for a non-empty directory -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The :class:`~airflow.providers.apache.hdfs.sensors.hdfs.HdfsFolderSensor` operator is used to -check for a non-empty directory in HDFS. - -Use the ``filepath`` parameter to poke until the provided file is found. - -.. _howto/operator:HdfsRegexSensor: - -HdfsRegexSensor ---------------- -Waits for matching files by matching on regex -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The :class:`~airflow.providers.apache.hdfs.sensors.hdfs.HdfsRegexSensor` operator is used to check for -matching files by matching on regex in HDFS. - -Use the ``filepath`` parameter to mention the keyspace and table for the record. Use dot notation to target a -specific keyspace. - - -.. _howto/operator:HdfsSensor: - -Waits for a file or folder to land in HDFS -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The :class:`~airflow.providers.apache.hdfs.sensors.hdfs.HdfsSensor` operator is used to check for a file or folder to land in HDFS. - -Use the ``filepath`` parameter to poke until the provided file is found. - -Reference -^^^^^^^^^ - -For further information, look at `HDFS Architecture Guide `_. diff --git a/docs/apache-airflow-providers-apache-hdfs/operators/index.rst b/docs/apache-airflow-providers-apache-hdfs/operators/index.rst index 7146237a441eb..7006064342268 100644 --- a/docs/apache-airflow-providers-apache-hdfs/operators/index.rst +++ b/docs/apache-airflow-providers-apache-hdfs/operators/index.rst @@ -23,5 +23,4 @@ Apache Hadoop HDFS Operators .. toctree:: :maxdepth: 1 - hdfs webhdfs diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index a9aae0f43b74a..9c36462bf1e0e 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -93,8 +93,7 @@ "apache.hdfs": { "deps": [ "apache-airflow>=2.4.0", - "hdfs[avro,dataframe,kerberos]>=2.0.4", - "snakebite-py3" + "hdfs[avro,dataframe,kerberos]>=2.0.4" ], "cross-providers-deps": [] }, diff --git a/scripts/in_container/run_provider_yaml_files_check.py b/scripts/in_container/run_provider_yaml_files_check.py index 5140c53cb3ded..40bf38e50f1e5 100755 --- a/scripts/in_container/run_provider_yaml_files_check.py +++ b/scripts/in_container/run_provider_yaml_files_check.py @@ -38,6 +38,15 @@ from airflow.cli.commands.info_command import Architecture +# Those are deprecated modules that contain removed Hooks/Sensors/Operators that we left in the code +# so that users can get a very specific error message when they try to use them. + +EXCLUDED_MODULES = [ + "airflow.providers.apache.hdfs.sensors.hdfs", + "airflow.providers.apache.hdfs.hooks.hdfs", +] + + try: from yaml import CSafeLoader as SafeLoader except ImportError: @@ -228,8 +237,9 @@ def check_correctness_of_list_of_sensors_operators_hook_modules(yaml_files: dict expected_modules, provider_package, resource_data = parse_module_data( provider_data, resource_type, yaml_file_path ) - + expected_modules = {module for module in expected_modules if module not in EXCLUDED_MODULES} current_modules = {str(i) for r in resource_data for i in r.get("python-modules", [])} + check_if_objects_exist_and_belong_to_package( current_modules, provider_package, yaml_file_path, resource_type, ObjectType.MODULE ) @@ -268,8 +278,9 @@ def check_completeness_of_list_of_transfers(yaml_files: dict[str, dict]): expected_modules, provider_package, resource_data = parse_module_data( provider_data, resource_type, yaml_file_path ) - + expected_modules = {module for module in expected_modules if module not in EXCLUDED_MODULES} current_modules = {r.get("python-module") for r in resource_data} + check_if_objects_exist_and_belong_to_package( current_modules, provider_package, yaml_file_path, resource_type, ObjectType.MODULE ) @@ -440,6 +451,7 @@ def check_unique_provider_name(yaml_files: dict[str, dict]): def check_providers_are_mentioned_in_issue_template(yaml_files: dict[str, dict]): + print("Checking providers are mentioned in issue template") prefix_len = len("apache-airflow-providers-") short_provider_names = [d["package-name"][prefix_len:] for d in yaml_files.values()] # exclude deprecated provider that shouldn't be in issue template diff --git a/setup.py b/setup.py index e6db11ef48219..b42c3f055bd53 100644 --- a/setup.py +++ b/setup.py @@ -639,8 +639,7 @@ def get_all_db_dependencies() -> list[str]: ) # Those are packages excluded for "all" dependencies -PACKAGES_EXCLUDED_FOR_ALL = [] -PACKAGES_EXCLUDED_FOR_ALL.extend(["snakebite"]) +PACKAGES_EXCLUDED_FOR_ALL: list[str] = [] def is_package_excluded(package: str, exclusion_list: list[str]) -> bool: diff --git a/tests/providers/apache/hdfs/hooks/test_hdfs.py b/tests/providers/apache/hdfs/hooks/test_hdfs.py deleted file mode 100644 index bf724737a9604..0000000000000 --- a/tests/providers/apache/hdfs/hooks/test_hdfs.py +++ /dev/null @@ -1,86 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import json -from unittest import mock - -import pytest - -from airflow.models import Connection -from airflow.providers.apache.hdfs.hooks.hdfs import HDFSHook - -snakebite = pytest.importorskip("snakebite") - - -class TestHDFSHook: - @mock.patch.dict( - "os.environ", - { - "AIRFLOW_CONN_HDFS_DEFAULT": "hdfs://localhost:8020", - }, - ) - def test_get_client(self): - client = HDFSHook(proxy_user="foo").get_conn() - assert isinstance(client, snakebite.client.Client) - assert "localhost" == client.host - assert 8020 == client.port - assert "foo" == client.service.channel.effective_user - - @mock.patch.dict( - "os.environ", - { - "AIRFLOW_CONN_HDFS_DEFAULT": "hdfs://localhost:8020", - }, - ) - @mock.patch("airflow.providers.apache.hdfs.hooks.hdfs.AutoConfigClient") - @mock.patch("airflow.providers.apache.hdfs.hooks.hdfs.HDFSHook.get_connections") - def test_get_autoconfig_client(self, mock_get_connections, mock_client): - conn = Connection( - conn_id="hdfs", - conn_type="hdfs", - host="localhost", - port=8020, - login="foo", - extra=json.dumps({"autoconfig": True}), - ) - mock_get_connections.return_value = [conn] - HDFSHook(hdfs_conn_id="hdfs").get_conn() - mock_client.assert_called_once_with(effective_user="foo", use_sasl=False) - - @mock.patch.dict( - "os.environ", - { - "AIRFLOW_CONN_HDFS_DEFAULT": "hdfs://localhost:8020", - }, - ) - @mock.patch("airflow.providers.apache.hdfs.hooks.hdfs.AutoConfigClient") - def test_get_autoconfig_client_no_conn(self, mock_client): - HDFSHook(hdfs_conn_id="hdfs_missing", autoconfig=True).get_conn() - mock_client.assert_called_once_with(effective_user=None, use_sasl=False) - - @mock.patch.dict( - "os.environ", - { - "AIRFLOW_CONN_HDFS1": "hdfs://host1:8020", - "AIRFLOW_CONN_HDFS2": "hdfs://host2:8020", - }, - ) - def test_get_ha_client(self): - client = HDFSHook(hdfs_conn_id={"hdfs1", "hdfs2"}).get_conn() - assert isinstance(client, snakebite.client.HAClient) diff --git a/tests/providers/apache/hdfs/sensors/test_hdfs.py b/tests/providers/apache/hdfs/sensors/test_hdfs.py deleted file mode 100644 index c5deccb65f4af..0000000000000 --- a/tests/providers/apache/hdfs/sensors/test_hdfs.py +++ /dev/null @@ -1,320 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import logging -import re -from datetime import timedelta - -import pytest - -from airflow.exceptions import AirflowSensorTimeout -from airflow.providers.apache.hdfs.sensors.hdfs import HdfsFolderSensor, HdfsRegexSensor, HdfsSensor -from airflow.utils.timezone import datetime -from tests.test_utils.hdfs_utils import FakeHDFSHook - -DEFAULT_DATE = datetime(2015, 1, 1) -TEST_DAG_ID = "unit_test_dag" - - -class TestHdfsSensor: - def setup_method(self): - self.hook = FakeHDFSHook - - def test_legacy_file_exist(self): - """ - Test the legacy behaviour - :return: - """ - # When - task = HdfsSensor( - task_id="Should_be_file_legacy", - filepath="/datadirectory/datafile", - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - task.execute(None) - - # Then - # Nothing happens, nothing is raised exec is ok - - def test_legacy_file_exist_but_filesize(self): - """ - Test the legacy behaviour with the filesize - :return: - """ - # When - task = HdfsSensor( - task_id="Should_be_file_legacy", - filepath="/datadirectory/datafile", - timeout=1, - file_size=20, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - - # When - # Then - with pytest.raises(AirflowSensorTimeout): - task.execute(None) - - def test_legacy_file_does_not_exists(self): - """ - Test the legacy behaviour - :return: - """ - task = HdfsSensor( - task_id="Should_not_be_file_legacy", - filepath="/datadirectory/not_existing_file_or_directory", - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - - # When - # Then - with pytest.raises(AirflowSensorTimeout): - task.execute(None) - - -class TestHdfsSensorFolder: - def setup_method(self, method): - self.hook = FakeHDFSHook - - logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) - logger.debug("#" * 10) - logger.debug("Running test case: %s.%s", self.__class__.__name__, method.__name__) - logger.debug("#" * 10) - - def test_should_be_empty_directory(self): - """ - test the empty directory behaviour - :return: - """ - # Given - task = HdfsFolderSensor( - task_id="Should_be_empty_directory", - filepath="/datadirectory/empty_directory", - be_empty=True, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - - # When - task.execute(None) - - # Then - # Nothing happens, nothing is raised exec is ok - - def test_should_be_empty_directory_fail(self): - """ - test the empty directory behaviour - :return: - """ - # Given - task = HdfsFolderSensor( - task_id="Should_be_empty_directory_fail", - filepath="/datadirectory/not_empty_directory", - be_empty=True, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - - # When - # Then - with pytest.raises(AirflowSensorTimeout): - task.execute(None) - - def test_should_be_a_non_empty_directory(self): - """ - test the empty directory behaviour - :return: - """ - # Given - task = HdfsFolderSensor( - task_id="Should_be_non_empty_directory", - filepath="/datadirectory/not_empty_directory", - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - - # When - task.execute(None) - - # Then - # Nothing happens, nothing is raised exec is ok - - def test_should_be_non_empty_directory_fail(self): - """ - test the empty directory behaviour - :return: - """ - # Given - task = HdfsFolderSensor( - task_id="Should_be_empty_directory_fail", - filepath="/datadirectory/empty_directory", - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - - # When - # Then - with pytest.raises(AirflowSensorTimeout): - task.execute(None) - - -class TestHdfsSensorRegex: - def setup_method(self, method): - self.hook = FakeHDFSHook - - logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) - logger.debug("#" * 10) - logger.debug("Running test case: %s.%s", self.__class__.__name__, method.__name__) - logger.debug("#" * 10) - - def test_should_match_regex(self): - """ - test the empty directory behaviour - :return: - """ - # Given - compiled_regex = re.compile("test[1-2]file") - task = HdfsRegexSensor( - task_id="Should_match_the_regex", - filepath="/datadirectory/regex_dir", - regex=compiled_regex, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - - # When - task.execute(None) - - # Then - # Nothing happens, nothing is raised exec is ok - - def test_should_not_match_regex(self): - """ - test the empty directory behaviour - :return: - """ - # Given - compiled_regex = re.compile("^IDoNotExist") - task = HdfsRegexSensor( - task_id="Should_not_match_the_regex", - filepath="/datadirectory/regex_dir", - regex=compiled_regex, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - - # When - # Then - with pytest.raises(AirflowSensorTimeout): - task.execute(None) - - def test_should_match_regex_and_filesize(self): - """ - test the file size behaviour with regex - :return: - """ - # Given - compiled_regex = re.compile("test[1-2]file") - task = HdfsRegexSensor( - task_id="Should_match_the_regex_and_filesize", - filepath="/datadirectory/regex_dir", - regex=compiled_regex, - ignore_copying=True, - ignored_ext=["_COPYING_", "sftp"], - file_size=10, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - - # When - task.execute(None) - - # Then - # Nothing happens, nothing is raised exec is ok - - def test_should_match_regex_but_filesize(self): - """ - test the file size behaviour with regex - :return: - """ - # Given - compiled_regex = re.compile("test[1-2]file") - task = HdfsRegexSensor( - task_id="Should_match_the_regex_but_filesize", - filepath="/datadirectory/regex_dir", - regex=compiled_regex, - file_size=20, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - - # When - # Then - with pytest.raises(AirflowSensorTimeout): - task.execute(None) - - def test_should_match_regex_but_copyingext(self): - """ - test the file size behaviour with regex - :return: - """ - # Given - compiled_regex = re.compile(r"copying_file_\d+.txt") - task = HdfsRegexSensor( - task_id="Should_match_the_regex_but_filesize", - filepath="/datadirectory/regex_dir", - regex=compiled_regex, - ignored_ext=["_COPYING_", "sftp"], - file_size=20, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook, - ) - - # When - # Then - with pytest.raises(AirflowSensorTimeout): - task.execute(None) diff --git a/tests/providers/apache/hive/sensors/test_hdfs.py b/tests/providers/apache/hive/sensors/test_hdfs.py deleted file mode 100644 index a5cb9c6e91713..0000000000000 --- a/tests/providers/apache/hive/sensors/test_hdfs.py +++ /dev/null @@ -1,38 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import os - -import pytest - -from airflow.providers.apache.hdfs.sensors.hdfs import HdfsSensor -from tests.providers.apache.hive import DEFAULT_DATE, TestHiveEnvironment - - -@pytest.mark.skipif( - "AIRFLOW_RUNALL_TESTS" not in os.environ, reason="Skipped because AIRFLOW_RUNALL_TESTS is not set" -) -class TestHdfsSensor(TestHiveEnvironment): - def test_hdfs_sensor(self): - op = HdfsSensor( - task_id="hdfs_sensor_check", - filepath="hdfs://user/hive/warehouse/airflow.db/static_babynames", - dag=self.dag, - ) - op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) diff --git a/tests/test_utils/hdfs_utils.py b/tests/test_utils/hdfs_utils.py index 8924e9277b78f..f429c9ab0cf45 100644 --- a/tests/test_utils/hdfs_utils.py +++ b/tests/test_utils/hdfs_utils.py @@ -27,171 +27,3 @@ def get_conn(self): def check_for_path(self, hdfs_path): return hdfs_path - - -class FakeSnakeBiteClientException(Exception): - pass - - -class FakeSnakeBiteClient: - def __init__(self): - self.started = True - - def ls(self, path, include_toplevel=False): - """ - the fake snakebite client - - :param path: the array of path to test - :param include_toplevel: to return the toplevel directory info - :return: a list for path for the matching queries - """ - if path[0] == "/datadirectory/empty_directory" and not include_toplevel: - return [] - elif path[0] == "/datadirectory/datafile": - return [ - { - "group": "supergroup", - "permission": 420, - "file_type": "f", - "access_time": 1481122343796, - "block_replication": 3, - "modification_time": 1481122343862, - "length": 0, - "blocksize": 134217728, - "owner": "hdfs", - "path": "/datadirectory/datafile", - } - ] - elif path[0] == "/datadirectory/empty_directory" and include_toplevel: - return [ - { - "group": "supergroup", - "permission": 493, - "file_type": "d", - "access_time": 0, - "block_replication": 0, - "modification_time": 1481132141540, - "length": 0, - "blocksize": 0, - "owner": "hdfs", - "path": "/datadirectory/empty_directory", - } - ] - elif path[0] == "/datadirectory/not_empty_directory" and include_toplevel: - return [ - { - "group": "supergroup", - "permission": 493, - "file_type": "d", - "access_time": 0, - "block_replication": 0, - "modification_time": 1481132141540, - "length": 0, - "blocksize": 0, - "owner": "hdfs", - "path": "/datadirectory/empty_directory", - }, - { - "group": "supergroup", - "permission": 420, - "file_type": "f", - "access_time": 1481122343796, - "block_replication": 3, - "modification_time": 1481122343862, - "length": 0, - "blocksize": 134217728, - "owner": "hdfs", - "path": "/datadirectory/not_empty_directory/test_file", - }, - ] - elif path[0] == "/datadirectory/not_empty_directory": - return [ - { - "group": "supergroup", - "permission": 420, - "file_type": "f", - "access_time": 1481122343796, - "block_replication": 3, - "modification_time": 1481122343862, - "length": 0, - "blocksize": 134217728, - "owner": "hdfs", - "path": "/datadirectory/not_empty_directory/test_file", - } - ] - elif path[0] == "/datadirectory/not_existing_file_or_directory": - raise FakeSnakeBiteClientException - elif path[0] == "/datadirectory/regex_dir": - return [ - { - "group": "supergroup", - "permission": 420, - "file_type": "f", - "access_time": 1481122343796, - "block_replication": 3, - "modification_time": 1481122343862, - "length": 12582912, - "blocksize": 134217728, - "owner": "hdfs", - "path": "/datadirectory/regex_dir/test1file", - }, - { - "group": "supergroup", - "permission": 420, - "file_type": "f", - "access_time": 1481122343796, - "block_replication": 3, - "modification_time": 1481122343862, - "length": 12582912, - "blocksize": 134217728, - "owner": "hdfs", - "path": "/datadirectory/regex_dir/test2file", - }, - { - "group": "supergroup", - "permission": 420, - "file_type": "f", - "access_time": 1481122343796, - "block_replication": 3, - "modification_time": 1481122343862, - "length": 12582912, - "blocksize": 134217728, - "owner": "hdfs", - "path": "/datadirectory/regex_dir/test3file", - }, - { - "group": "supergroup", - "permission": 420, - "file_type": "f", - "access_time": 1481122343796, - "block_replication": 3, - "modification_time": 1481122343862, - "length": 12582912, - "blocksize": 134217728, - "owner": "hdfs", - "path": "/datadirectory/regex_dir/copying_file_1.txt._COPYING_", - }, - { - "group": "supergroup", - "permission": 420, - "file_type": "f", - "access_time": 1481122343796, - "block_replication": 3, - "modification_time": 1481122343862, - "length": 12582912, - "blocksize": 134217728, - "owner": "hdfs", - "path": "/datadirectory/regex_dir/copying_file_3.txt.sftp", - }, - ] - else: - raise FakeSnakeBiteClientException - - -class FakeHDFSHook: - def __init__(self, conn_id=None): - self.conn_id = conn_id - - def get_conn(self): - client = FakeSnakeBiteClient() - return client