Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transfer operator S3 to (generic) SQL #28964

Closed
wants to merge 71 commits into from

Conversation

maggesssss
Copy link
Contributor

closes: #23666

This PR includes a new Transfer Operator that reads a CSV File from S3 Storage and loads it into an existing Table of a generic SQL Database

I used csv.reader to read the file and insert_rows method of the existing DbApiHook.
Due to the fact that csv.reader is not reading the complete file into the memory, also large files can be loaded somehow efficiently.

I am happy for any feedback.

airflow/providers/amazon/aws/transfers/s3_to_sql.py Outdated Show resolved Hide resolved
Comment on lines 46 to 48
This operator downloads a file from an S3, reads it via `csv.reader`
and inserts the data into a SQL database using `insert_rows` method.
All SQL hooks are supported, as long as it is of type DbApiHook
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... Operator limited by CSV sources?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with this. Either you explicitly specify in the operator name that this operator reads only CSV, either you can make this operator generic by adding a new parameter parser which would be a function responsible of reading the input from the source. The actual processing of CSV in your would be handled by this new function parser. Between the 2 solutions, I prefer the second one. My 2 cents

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincbeck I agree. I like the Idea of using a parser parameter. Do you think it makes sense to use a default parser?

Copy link
Contributor

@vincbeck vincbeck Jan 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think it makes sense as default but I do think that providing at least one parser in code makes sense. A CSV parser would be excellent!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vincbeck, I have changed the operator to use a parser instead and have documented it accordingly.
Could you please have a and let me know what you think about it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks really good! Thanks for making the changes. I added some comments but they are mostly nitpicks and minor comments. Good job!

airflow/providers/amazon/aws/transfers/s3_to_sql.py Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/transfers/s3_to_sql.py Outdated Show resolved Hide resolved
@@ -0,0 +1,84 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up on the discussion here #22438, this file should be moved in tests/system/providers/amazon/aws/example_s3.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincbeck I have modified the example DAG according to AIP-47, but I was not able to test it yet. Will do it as soon as I have fixed my breeze environment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The system test looks really good! Thanks for the effort to have converted the example DAG!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincbeck I added a SQL Check operator to count the rows inserted. Tests were running fine locally.
Can you give me a hint which conn_id's I have to use?
For S3 topics I guess aws_default, but for generic sql? sql_default?

Comment on lines 46 to 48
This operator downloads a file from an S3, reads it via `csv.reader`
and inserts the data into a SQL database using `insert_rows` method.
All SQL hooks are supported, as long as it is of type DbApiHook
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with this. Either you explicitly specify in the operator name that this operator reads only CSV, either you can make this operator generic by adding a new parameter parser which would be a function responsible of reading the input from the source. The actual processing of CSV in your would be handled by this new function parser. Between the 2 solutions, I prefer the second one. My 2 cents

airflow/providers/amazon/aws/transfers/s3_to_sql.py Outdated Show resolved Hide resolved
# Remove file downloaded from s3 to be idempotent.
os.remove(self._file)

def _get_hook(self) -> DbApiHook:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. You can decorate this function with @cached_property

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this doesn't need to be a property at all since it's only used in the execute() method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some work going on to standardize the hook access in Amazon provider package. See #29001. I agree with you it is not necessary to store the hook in a property but (and this is only my personal opinion), using @cached_property makes the code cleaner

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincbeck I have pushed some changes, please let me know if it's fine now

Comment on lines 51 to 53



Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

airflow/providers/amazon/aws/transfers/s3_to_sql.py Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/transfers/s3_to_sql.py Outdated Show resolved Hide resolved
# Remove file downloaded from s3 to be idempotent.
os.remove(self._file)

def _get_hook(self) -> DbApiHook:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this doesn't need to be a property at all since it's only used in the execute() method?

@maggesssss maggesssss marked this pull request as draft January 18, 2023 20:56
parameter which allows the user to add a custom parser.
Example parser added to docs

removed following args:
- csv_reader_kwargs
- skip_first_row
- column_list "infer" option
These arguments are not working with a customer parser at the moment

Changed to NamedTempoaryFile

Added s3_hook.get_key before downloading to check if file exists

Updated test and docs
airflow/providers/amazon/aws/transfers/s3_to_sql.py Outdated Show resolved Hide resolved
e.g. to use a CSV parser that yields rows line-by-line, pass the following
function:

def parse_csv(filepath):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it!

# Remove file downloaded from s3 to be idempotent.
os.remove(self._file)

def _get_hook(self) -> DbApiHook:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some work going on to standardize the hook access in Amazon provider package. See #29001. I agree with you it is not necessary to store the hook in a property but (and this is only my personal opinion), using @cached_property makes the code cleaner

tests/system/providers/amazon/aws/example_s3_to_sql.py Outdated Show resolved Hide resolved
Co-authored-by: Vincent <97131062+vincbeck@users.noreply.github.com>
maggesssss and others added 7 commits January 19, 2023 18:41
Co-authored-by: Niko Oliveira <onikolas@amazon.com>
to cached property db_hook
use imported watcher task
for SqlExecuteQueryOperators
string and added

SQLTableCheckOperator to check if
lines have been successfully imported
Bowrna and others added 15 commits January 21, 2023 13:48
Move the logic from __init__ to executor for FTP operator
…he#29071)

This version of the chart uses different variable names for setting usernames and passwords in the postgres database.
`postgresql.auth.enablePostgresUser` is used to determine if the "postgres" admin account will be created.
`postgresql.auth.postgresPassword` sets the password for the "postgres" user.
`postgresql.auth.username` and `postrgesql.auth.password` are used to set credentials for a non-admin account if desired.
`postgresql.postgresqlUsername` and `postgresql.postresqlPassword`, which were used in the previous version of the chart, are no longer used.
Users will need to change these variable names in their values files if they are using the helm chart.

Co-authored-by: Caleb Woofenden <caleb.woofenden@bitsighttech.com>
As long as the hook has an insert_rows method, this operator can work.
local_tempfile after download is finished
self._get_hook()
db_hook is now a cached property
@maggesssss
Copy link
Contributor Author

sorry, I think I did something wrong when rebasing...
Most of those commits are not from me

Shall I create a new PR?

@potiuk
Copy link
Member

potiuk commented Jan 21, 2023

Feel free.

@maggesssss
Copy link
Contributor Author

Closed (replaced by #29085)

@maggesssss maggesssss closed this Jan 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add transfers operator S3 to SQL / SQL to SQL