Skip to content

Commit

Permalink
Standardize AWS Glue naming (#20372)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferruzzi authored Dec 19, 2021
1 parent 96c6bb0 commit 1baa648
Show file tree
Hide file tree
Showing 16 changed files with 264 additions and 128 deletions.
19 changes: 18 additions & 1 deletion airflow/providers/amazon/aws/hooks/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
# under the License.

import time
import warnings
from typing import Dict, List, Optional

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


class AwsGlueJobHook(AwsBaseHook):
class GlueJobHook(AwsBaseHook):
"""
Interact with AWS Glue - create job, trigger, crawler
Expand Down Expand Up @@ -222,3 +223,19 @@ def get_or_create_glue_job(self) -> str:
except Exception as general_error:
self.log.error("Failed to create aws glue job, error: %s", general_error)
raise


class AwsGlueJobHook(GlueJobHook):
"""
This hook is deprecated.
Please use :class:`airflow.providers.amazon.aws.hooks.glue.GlueJobHook`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"This hook is deprecated. "
"Please use :class:`airflow.providers.amazon.aws.hooks.glue.GlueJobHook`.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
23 changes: 20 additions & 3 deletions airflow/providers/amazon/aws/hooks/glue_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
# under the License.

"""This module contains AWS Glue Catalog Hook"""
import warnings
from typing import Optional, Set

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


class AwsGlueCatalogHook(AwsBaseHook):
class GlueCatalogHook(AwsBaseHook):
"""
Interact with AWS Glue Catalog
Expand Down Expand Up @@ -93,7 +94,7 @@ def check_for_partition(self, database_name: str, table_name: str, expression: s
:type expression: str
:rtype: bool
>>> hook = AwsGlueCatalogHook()
>>> hook = GlueCatalogHook()
>>> t = 'static_babynames_partitioned'
>>> hook.check_for_partition('airflow', t, "ds='2015-01-01'")
True
Expand All @@ -112,7 +113,7 @@ def get_table(self, database_name: str, table_name: str) -> dict:
:type table_name: str
:rtype: dict
>>> hook = AwsGlueCatalogHook()
>>> hook = GlueCatalogHook()
>>> r = hook.get_table('db', 'table_foo')
>>> r['Name'] = 'table_foo'
"""
Expand All @@ -133,3 +134,19 @@ def get_table_location(self, database_name: str, table_name: str) -> str:
table = self.get_table(database_name, table_name)

return table['StorageDescriptor']['Location']


class AwsGlueCatalogHook(GlueCatalogHook):
"""
This hook is deprecated.
Please use :class:`airflow.providers.amazon.aws.hooks.glue_catalog.GlueCatalogHook`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"This hook is deprecated. "
"Please use :class:`airflow.providers.amazon.aws.hooks.glue_catalog.GlueCatalogHook`.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
19 changes: 18 additions & 1 deletion airflow/providers/amazon/aws/hooks/glue_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
import sys
import warnings
from time import sleep

if sys.version_info >= (3, 8):
Expand All @@ -27,7 +28,7 @@
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


class AwsGlueCrawlerHook(AwsBaseHook):
class GlueCrawlerHook(AwsBaseHook):
"""
Interacts with AWS Glue Crawler.
Expand Down Expand Up @@ -171,3 +172,19 @@ def wait_for_crawler_completion(self, crawler_name: str, poll_interval: int = 5)
self.log.info("Crawler should finish soon")

sleep(poll_interval)


class AwsGlueCrawlerHook(GlueCrawlerHook):
"""
This hook is deprecated.
Please use :class:`airflow.providers.amazon.aws.hooks.glue_crawler.GlueCrawlerHook`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"This hook is deprecated. "
"Please use :class:`airflow.providers.amazon.aws.hooks.glue_crawler.GlueCrawlerHook`.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
23 changes: 20 additions & 3 deletions airflow/providers/amazon/aws/operators/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
# under the License.

import os.path
import warnings
from typing import Optional

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


class AwsGlueJobOperator(BaseOperator):
class GlueJobOperator(BaseOperator):
"""
Creates an AWS Glue Job. AWS Glue is a serverless Spark
ETL service for running Spark Jobs on the AWS cloud.
Expand Down Expand Up @@ -118,7 +119,7 @@ def execute(self, context):
s3_script_location = f"s3://{self.s3_bucket}/{self.s3_artifacts_prefix}{script_name}"
else:
s3_script_location = self.script_location
glue_job = AwsGlueJobHook(
glue_job = GlueJobHook(
job_name=self.job_name,
desc=self.job_desc,
concurrent_run_limit=self.concurrent_run_limit,
Expand Down Expand Up @@ -148,3 +149,19 @@ def execute(self, context):
else:
self.log.info("AWS Glue Job: %s. Run Id: %s", self.job_name, glue_job_run['JobRunId'])
return glue_job_run['JobRunId']


class AwsGlueJobOperator(GlueJobOperator):
"""
This operator is deprecated.
Please use :class:`airflow.providers.amazon.aws.operators.glue.GlueJobOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"This operator is deprecated. "
"Please use :class:`airflow.providers.amazon.aws.operators.glue.GlueJobOperator`.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
27 changes: 22 additions & 5 deletions airflow/providers/amazon/aws/operators/glue_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
# specific language governing permissions and limitations
# under the License.
import sys
import warnings

if sys.version_info >= (3, 8):
from functools import cached_property
else:
from cached_property import cached_property

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook


class AwsGlueCrawlerOperator(BaseOperator):
class GlueCrawlerOperator(BaseOperator):
"""
Creates, updates and triggers an AWS Glue Crawler. AWS Glue Crawler is a serverless
service that manages a catalog of metadata tables that contain the inferred
Expand Down Expand Up @@ -55,9 +56,9 @@ def __init__(
self.config = config

@cached_property
def hook(self) -> AwsGlueCrawlerHook:
"""Create and return an AwsGlueCrawlerHook."""
return AwsGlueCrawlerHook(self.aws_conn_id)
def hook(self) -> GlueCrawlerHook:
"""Create and return an GlueCrawlerHook."""
return GlueCrawlerHook(self.aws_conn_id)

def execute(self, context):
"""
Expand All @@ -77,3 +78,19 @@ def execute(self, context):
self.hook.wait_for_crawler_completion(crawler_name=crawler_name, poll_interval=self.poll_interval)

return crawler_name


class AwsGlueCrawlerOperator(GlueCrawlerOperator):
"""
This operator is deprecated.
Please use :class:`airflow.providers.amazon.aws.operators.glue_crawler.GlueCrawlerOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"This operator is deprecated. "
"Please use :class:`airflow.providers.amazon.aws.operators.glue_crawler.GlueCrawlerOperator`.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
23 changes: 20 additions & 3 deletions airflow/providers/amazon/aws/sensors/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import warnings

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
from airflow.sensors.base import BaseSensorOperator


class AwsGlueJobSensor(BaseSensorOperator):
class GlueJobSensor(BaseSensorOperator):
"""
Waits for an AWS Glue Job to reach any of the status below
'FAILED', 'STOPPED', 'SUCCEEDED'
Expand All @@ -43,7 +44,7 @@ def __init__(self, *, job_name: str, run_id: str, aws_conn_id: str = 'aws_defaul
self.errored_states = ['FAILED', 'STOPPED', 'TIMEOUT']

def poke(self, context):
hook = AwsGlueJobHook(aws_conn_id=self.aws_conn_id)
hook = GlueJobHook(aws_conn_id=self.aws_conn_id)
self.log.info("Poking for job run status :for Glue Job %s and ID %s", self.job_name, self.run_id)
job_state = hook.get_job_state(job_name=self.job_name, run_id=self.run_id)
if job_state in self.success_states:
Expand All @@ -54,3 +55,19 @@ def poke(self, context):
raise AirflowException(job_error_message)
else:
return False


class AwsGlueJobSensor(GlueJobSensor):
"""
This sensor is deprecated.
Please use :class:`airflow.providers.amazon.aws.sensors.glue.GlueJobSensor`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"This sensor is deprecated. "
"Please use :class:`airflow.providers.amazon.aws.sensors.glue.GlueJobSensor`.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
29 changes: 23 additions & 6 deletions airflow/providers/amazon/aws/sensors/glue_catalog_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import warnings
from typing import Optional

from airflow.providers.amazon.aws.hooks.glue_catalog import AwsGlueCatalogHook
from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
from airflow.sensors.base import BaseSensorOperator


class AwsGlueCatalogPartitionSensor(BaseSensorOperator):
class GlueCatalogPartitionSensor(BaseSensorOperator):
"""
Waits for a partition to show up in AWS Glue Catalog.
Expand Down Expand Up @@ -72,7 +73,7 @@ def __init__(
self.table_name = table_name
self.expression = expression
self.database_name = database_name
self.hook: Optional[AwsGlueCatalogHook] = None
self.hook: Optional[GlueCatalogHook] = None

def poke(self, context):
"""Checks for existence of the partition in the AWS Glue Catalog table"""
Expand All @@ -84,10 +85,26 @@ def poke(self, context):

return self.get_hook().check_for_partition(self.database_name, self.table_name, self.expression)

def get_hook(self) -> AwsGlueCatalogHook:
"""Gets the AwsGlueCatalogHook"""
def get_hook(self) -> GlueCatalogHook:
"""Gets the GlueCatalogHook"""
if self.hook:
return self.hook

self.hook = AwsGlueCatalogHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
self.hook = GlueCatalogHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
return self.hook


class AwsGlueCatalogPartitionSensor(GlueCatalogPartitionSensor):
"""
This sensor is deprecated. Please use
:class:`airflow.providers.amazon.aws.sensors.glue_catalog_partition.GlueCatalogPartitionSensor`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"This sensor is deprecated. "
"Please use :class:`airflow.providers.amazon.aws.sensors.glue_catalog_partition.GlueCatalogPartitionSensor`.", # noqa: 501
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
29 changes: 23 additions & 6 deletions airflow/providers/amazon/aws/sensors/glue_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import warnings
from typing import Optional

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook
from airflow.sensors.base import BaseSensorOperator


class AwsGlueCrawlerSensor(BaseSensorOperator):
class GlueCrawlerSensor(BaseSensorOperator):
"""
Waits for an AWS Glue crawler to reach any of the statuses below
'FAILED', 'CANCELLED', 'SUCCEEDED'
Expand All @@ -39,7 +40,7 @@ def __init__(self, *, crawler_name: str, aws_conn_id: str = 'aws_default', **kwa
self.aws_conn_id = aws_conn_id
self.success_statuses = 'SUCCEEDED'
self.errored_statuses = ('FAILED', 'CANCELLED')
self.hook: Optional[AwsGlueCrawlerHook] = None
self.hook: Optional[GlueCrawlerHook] = None

def poke(self, context):
hook = self.get_hook()
Expand All @@ -56,10 +57,26 @@ def poke(self, context):
else:
return False

def get_hook(self) -> AwsGlueCrawlerHook:
"""Returns a new or pre-existing AwsGlueCrawlerHook"""
def get_hook(self) -> GlueCrawlerHook:
"""Returns a new or pre-existing GlueCrawlerHook"""
if self.hook:
return self.hook

self.hook = AwsGlueCrawlerHook(aws_conn_id=self.aws_conn_id)
self.hook = GlueCrawlerHook(aws_conn_id=self.aws_conn_id)
return self.hook


class AwsGlueCrawlerSensor(GlueCrawlerSensor):
"""
This sensor is deprecated. Please use
:class:`airflow.providers.amazon.aws.sensors.glue_crawler.GlueCrawlerSensor`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"This sensor is deprecated. "
"Please use :class:`airflow.providers.amazon.aws.sensors.glue_crawler.GlueCrawlerSensor`.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
Loading

0 comments on commit 1baa648

Please sign in to comment.