Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transfer operator S3 to (generic) SQL #28964

Closed
wants to merge 71 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
1fd240f
added JSON linter to connection edit / add UI for field extra. On con…
maggesssss Dec 25, 2022
b84cc56
Merge branch 'apache:main' into feat_conn_ui
maggesssss Jan 15, 2023
cd204a3
Released new S3 to SQL Transfer Operator Including Documentation and …
Jan 15, 2023
a1394dc
added missing sql_conn_id parameter to example DAG to make it more cl…
Jan 15, 2023
02c3252
fixed non-unique task_ids and operator variable names in S3 to Sql ex…
Jan 15, 2023
172e6e9
Merge branch 'apache:main' into feat_s3_to_sql_transfer
maggesssss Jan 18, 2023
f147113
save WIP
Jan 18, 2023
74a63ec
Merge branch 'backup' into feat_s3_to_sql_transfer
Jan 18, 2023
078ec76
Added parser
maggesssss Jan 18, 2023
d13cd79
WIP: migrated example DAG to system tests
maggesssss Jan 18, 2023
e313bb7
removed example DAG (migrated to system tests)
maggesssss Jan 18, 2023
f6893a1
Update airflow/providers/amazon/aws/transfers/s3_to_sql.py
maggesssss Jan 18, 2023
197dd54
Update airflow/providers/amazon/aws/transfers/s3_to_sql.py
maggesssss Jan 19, 2023
49c921a
changed internal method _get_hook
maggesssss Jan 18, 2023
28f0cd5
removed unused imports
maggesssss Jan 18, 2023
feab443
changed conn type from s3 to aws
maggesssss Jan 19, 2023
24cc4d5
added sql_default conn_id
maggesssss Jan 19, 2023
07af1b7
added seek(0) to tempfile after it was
maggesssss Jan 19, 2023
663903c
converted sample_data to raw
maggesssss Jan 19, 2023
dd5789f
added test for bad hook
maggesssss Jan 19, 2023
8d928ba
Skip DockerOperator task when it returns a provided exit code (#28996)
hussein-awala Jan 18, 2023
0838e38
Clarify that we are using lowest supported version for static checks …
potiuk Jan 18, 2023
5eb58b2
Update owner list for release dockerhub image workflow (#29023)
pierrejeambrun Jan 18, 2023
5872798
Update how PythonSensor returns values from python_callable (#28932)
SoxMax Jan 19, 2023
232acf0
Update gantt chart UI to display queued state of tasks (#28686)
yxiao1996 Jan 19, 2023
97ffd63
Patch MockExecutor in tests instead of "prod" code (#29028)
o-nikolas Jan 19, 2023
0f1dc02
Add log for AWS Glue Job Console URL (#28925)
IAL32 Jan 19, 2023
36d913e
Add norm to triage group (#29029)
jedcunningham Jan 19, 2023
2f02e06
Fix `project-name` generation for Providers `test-type` in Breeze (#2…
josh-fell Jan 19, 2023
7d81008
Fix params rendering in AzureSynapseHook Python API docs (#29041)
josh-fell Jan 19, 2023
e1fa038
Pin ruff to specific version and prevent from "fixing" cli conftest (…
potiuk Jan 19, 2023
0a792ac
Sanitize url_for arguments before they are passed (#29039)
potiuk Jan 19, 2023
bd41cbd
Add support for write_on_empty in BaseSQLToGCSOperator (#28959)
vchiapaikeo Jan 19, 2023
8c1293f
AIP-44 Initialize methods map for Internal API RPC endpoint in the me…
mhenc Jan 19, 2023
94f17dd
uniformize getting hook through cached property in aws sensors (#29001)
vandonr-amz Jan 20, 2023
0e2acb3
Update provide_bucket_name() decorator to handle new conn_type (#28706)
RachitSharma2001 Jan 20, 2023
6976dda
nice table at the end of system test execs (#29051)
vandonr-amz Jan 20, 2023
7a1fdec
Move project and license docs down in menu to start with developer-fo…
BasPH Jan 20, 2023
84cd030
Annotate and simplify code samples in DAGs doc (#29027)
BasPH Jan 20, 2023
8ce35a2
Fix kerberos authentication for the REST API. (#29054)
potiuk Jan 20, 2023
d683870
Migrate Helm tests to `pytest` (#29063)
Taragolis Jan 20, 2023
b5f61a5
logging poke info when external dag is not none and task_id and task_…
surabathini Jan 20, 2023
094a540
AIP-44: Add CLI command for running standalone Internal API (#28425)
mhenc Jan 20, 2023
b3c5629
Fix pre-commit warning for exclude in inclusive-language check (#29057)
potiuk Jan 20, 2023
143eedf
AIP-44 Migrate DagModel.get_paused_dag_ids to Internal API (#28693)
vincbeck Jan 20, 2023
e2a4727
Renaming nose compatible methods in flavour of regular pytest naming …
Taragolis Jan 20, 2023
3c2dba5
Capitalize dag to DAG (#29064)
BasPH Jan 20, 2023
ce320d2
Fix SFTP operator's template fields processing (#29068)
potiuk Jan 20, 2023
c1b2d83
fixing import error for dataset (#29007)
bharat99k Jan 20, 2023
a2090a8
Properly test that audit log secrets are masked for connection object…
ephraimbuddy Jan 20, 2023
3ef98e7
Minor improvements to serde helpers (#28978)
uranusjr Jan 20, 2023
bbf11d7
Add deferrable mode for Big Query Transfer operator (#27833)
MrGeorgeOwl Jan 20, 2023
974039c
Add deferrable mode to DataFusionStartPipelineOperator (#28690)
VladaZakharova Jan 20, 2023
c7b2419
Fix minor typo in breeze example command. (#29072)
snjypl Jan 20, 2023
e5009f9
Emit DataDog statsd metrics with metadata tags (#28961)
hussein-awala Jan 20, 2023
8250a1c
listener plugin example added (#27905)
Bowrna Jan 20, 2023
6a93ff6
Refactor `TestLocalTaskJob.test_process_sigterm_works_with_retries` (…
Taragolis Jan 20, 2023
b8fcd20
Mark `test_process_sigterm_works_with_retries` quarantined again (#29…
Taragolis Jan 21, 2023
cb8308c
FTP operator has logic in __init__ (#29073)
snjypl Jan 21, 2023
2269fd7
Upgrade bitnami/postgresql helm depenency from 10.5.3 to 12.1.9 (#29071)
snjypl Jan 21, 2023
3cc6673
Chart: add doc note about podtemplate images (#29032)
Aakcht Jan 21, 2023
10427eb
bump (#28602)
DrFaust92 Jan 21, 2023
1b40759
Check for run_id url param when linking to graph/gantt views (#29066)
bbovenzi Jan 21, 2023
ce46baa
emit dagrun failed duration when timeout (#29076)
hussein-awala Jan 21, 2023
d870a9b
Migrate DagFileProcessorManager.clear_nonexistent_import_errors to In…
snjypl Jan 21, 2023
dcf7e87
save WIP
Jan 18, 2023
db11996
removed type-hint from property db_hook.
maggesssss Jan 21, 2023
db1cb02
fixed import sorting
maggesssss Jan 21, 2023
1d902ff
added flush() to
maggesssss Jan 21, 2023
42a9b76
removed unneccessary
maggesssss Jan 21, 2023
aa5e2da
Merge branch 'apache:main' into feat_s3_to_sql_transfer
maggesssss Jan 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions airflow/providers/amazon/aws/transfers/s3_to_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Callable, Iterable, Sequence

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.common.sql.hooks.sql import DbApiHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class S3ToSqlOperator(BaseOperator):
"""
Loads Data from S3 into a SQL Database.
You need to provide a parser function that takes a filename as an input
and returns a iterable of rows
maggesssss marked this conversation as resolved.
Show resolved Hide resolved

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:S3ToSqlOperator`

:param schema: reference to a specific schema in SQL database
:param table: reference to a specific table in SQL database
:param s3_bucket: reference to a specific S3 bucket
:param s3_key: reference to a specific S3 key
:param sql_conn_id: reference to a specific SQL database. Must be of type DBApiHook
:param aws_conn_id: reference to a specific S3 / AWS connection
:param column_list: list of column names to use in the insert SQL.
:param commit_every: The maximum number of rows to insert in one
transaction. Set to `0` to insert all rows in one transaction.
:param parser: parser function that takes a filepath as input and returns an iterable.
e.g. to use a CSV parser that yields rows line-by-line, pass the following
function:

def parse_csv(filepath):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it!

import csv

with open(filepath, newline="") as file:
yield from csv.reader(file)

"""

template_fields: Sequence[str] = (
"s3_bucket",
"s3_key",
"schema",
"table",
"column_list",
"sql_conn_id",
)
template_ext: Sequence[str] = ()
ui_color = "#f4a460"

def __init__(
self,
*,
s3_key: str,
s3_bucket: str,
table: str,
parser: Callable[[str], Iterable[Iterable]],
column_list: list[str] | None = None,
commit_every: int = 1000,
schema: str | None = None,
sql_conn_id: str = "sql_default",
aws_conn_id: str = "aws_default",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.s3_bucket = s3_bucket
self.s3_key = s3_key
self.table = table
self.schema = schema
self.aws_conn_id = aws_conn_id
self.sql_conn_id = sql_conn_id
self.column_list = column_list
self.commit_every = commit_every
self.parser = parser

def execute(self, context: Context) -> None:

self.log.info("Loading %s to SQL table %s...", self.s3_key, self.table)

db_hook = self._get_hook()
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
s3_obj = s3_hook.get_key(key=self.s3_key, bucket_name=self.s3_bucket)

with NamedTemporaryFile() as local_tempfile:

s3_obj.download_fileobj(local_tempfile)

db_hook.insert_rows(
table=self.table,
schema=self.schema,
target_fields=self.column_list,
rows=self.parser(local_tempfile.name),
commit_every=self.commit_every,
)

def _get_hook(self) -> DbApiHook:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. You can decorate this function with @cached_property

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this doesn't need to be a property at all since it's only used in the execute() method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some work going on to standardize the hook access in Amazon provider package. See #29001. I agree with you it is not necessary to store the hook in a property but (and this is only my personal opinion), using @cached_property makes the code cleaner

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincbeck I have pushed some changes, please let me know if it's fine now

self.log.debug("Get connection for %s", self.sql_conn_id)
conn = BaseHook.get_connection(self.sql_conn_id)
hook = conn.get_hook()
if not callable(getattr(hook, "insert_rows", None)):
raise AirflowException(
"This hook is not supported. The hook class must have an `insert_rows` method."
maggesssss marked this conversation as resolved.
Show resolved Hide resolved
)
return hook
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

================
Amazon S3 to SQL
================

Use the ``S3ToSqlOperator`` transfer to copy data from an Amazon Simple Storage Service (S3)
file into an existing SQL table. By providing a parser function which is applied to the
downloaded file, this operator can accept a variety of file formats.


Prerequisite Tasks
------------------

.. include:: ../_partials/prerequisite_tasks.rst

Operators
---------

.. _howto/operator:S3ToSqlOperator:

Amazon S3 To SQL Transfer Operator
==================================

To get more information about this operator visit:
:class:`~airflow.providers.amazon.aws.transfers.s3_to_sql.S3ToSqlOperator`

Example usage with a parser for a csv file. This parser loads the
file into memory and returns a list of rows:

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3_to_sql.py
:language: python
:dedent: 4
:start-after: [START howto_transfer_s3_to_sql]
:end-before: [END howto_transfer_s3_to_sql]


Example usage with a parser function that returns a generator.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3_to_sql.py
:language: python
:dedent: 4
:start-after: [START howto_transfer_s3_to_sql_generator]
:end-before: [END howto_transfer_s3_to_sql_generator]


Reference
---------

* `csv.reader documentation <https://docs.python.org/3/library/csv.html>`__
102 changes: 102 additions & 0 deletions tests/providers/amazon/aws/transfers/test_s3_to_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from unittest.mock import MagicMock, patch

import pytest
from sqlalchemy import or_

from airflow import configuration, models
from airflow.providers.amazon.aws.transfers.s3_to_sql import S3ToSqlOperator
from airflow.utils import db
from airflow.utils.session import create_session


class TestS3ToSqlTransfer:
def setup_method(self):
configuration.conf.load_test_config()

db.merge_conn(
models.Connection(
conn_id="s3_test",
conn_type="s3",
maggesssss marked this conversation as resolved.
Show resolved Hide resolved
schema="test",
extra='{"aws_access_key_id": "aws_access_key_id", "aws_secret_access_key":'
' "aws_secret_access_key"}',
)
)
db.merge_conn(
models.Connection(
conn_id="sql_test",
conn_type="postgres",
host="some.host.com",
schema="test_db",
login="user",
password="password",
)
)

self.s3_to_sql_transfer_kwargs = {
"task_id": "s3_to_sql_task",
"aws_conn_id": "s3_test",
"sql_conn_id": "sql_test",
"s3_key": "test/test.csv",
"s3_bucket": "testbucket",
"table": "sql_table",
"column_list": ["Column1", "Column2"],
"schema": "sql_schema",
"commit_every": 5000,
}

@pytest.fixture()
def mock_parser(self):
return MagicMock()

@patch("airflow.providers.amazon.aws.transfers.s3_to_sql.NamedTemporaryFile")
@patch("airflow.providers.amazon.aws.transfers.s3_to_sql.DbApiHook.insert_rows")
@patch("airflow.providers.amazon.aws.transfers.s3_to_sql.S3Hook.get_key")
def test_execute(self, mock_get_key, mock_insert_rows, mock_tempfile, mock_parser):
maggesssss marked this conversation as resolved.
Show resolved Hide resolved

S3ToSqlOperator(parser=mock_parser, **self.s3_to_sql_transfer_kwargs).execute({})

mock_get_key.assert_called_once_with(
key=self.s3_to_sql_transfer_kwargs["s3_key"],
bucket_name=self.s3_to_sql_transfer_kwargs["s3_bucket"],
)

mock_get_key.return_value.download_fileobj.assert_called_once_with(
mock_tempfile.return_value.__enter__.return_value
)

mock_parser.assert_called_once_with(mock_tempfile.return_value.__enter__.return_value.name)

mock_insert_rows.assert_called_once_with(
table=self.s3_to_sql_transfer_kwargs["table"],
schema=self.s3_to_sql_transfer_kwargs["schema"],
target_fields=self.s3_to_sql_transfer_kwargs["column_list"],
rows=mock_parser.return_value,
commit_every=self.s3_to_sql_transfer_kwargs["commit_every"],
)

def teardown_method(self):
with create_session() as session:
(
session.query(models.Connection)
.filter(or_(models.Connection.conn_id == "s3_test", models.Connection.conn_id == "sql_test"))
.delete()
)
Loading