Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XCOM push ORA error code in OracleStoredProcedure #27319

Merged
merged 16 commits into from
Dec 13, 2022
Merged

XCOM push ORA error code in OracleStoredProcedure #27319

merged 16 commits into from
Dec 13, 2022

Conversation

kolfild26
Copy link
Contributor

Summary. ORA exit codes are controlled by the DB developer and could be a part of a dag logic. So in order to get access to these codes in dag, I propose an easy enhancement - push ORA exit code to XCOM when DatabaseError occurs.

One file changed - providers/oracle/operators/oracle.py

Motivation and detailed description - #23787 @potiuk

@boring-cyborg
Copy link

boring-cyborg bot commented Oct 27, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit test to cover this change

return hook.callproc(self.procedure, autocommit=True, parameters=self.parameters)
except oracledb.DatabaseError as e:
code, mesg = e.args[0].message[:-1].split(': ', 1)
ti.xcom_push(key='ORA', value=str(code.split('-')[1]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check for self.do_xcom_push ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal Do you mean checking whether self.do_xcom_push is True and only if so, then push to XCOM, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal I just tested, it works, no problem to introduce it, but as it's mentioned in #27370 for many operators, pushing is not optional.
Again, if needed, will add.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag exist so this could be user choice. The fact that some operators dont use it is not something we should encourage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal fixed, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, there is a difference between data returned from an operator and meta-data describing the operator's execution. The self.do_xcom_push controls the storing (pushing) of data returned by the logic of a given operator and not meta-data.

@eladkal What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider that not all users need this functionality. Why should we "spam" thier xcom table with data that has no value for them?
The use case of doing something in downstream task with error code of upstream task is not that common. at least from reviewing our issue tracker and questions on slack/other platforms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal

Please add unit test to cover this change

Let me ask, the tests have to be placed in airflow/tests/providers/oracle/operators/test_oracle.py right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@uranusjr
Copy link
Member

uranusjr commented Nov 3, 2022

Can you run pre-commit locally and fix linting errors, and add a test case for this? Some note in the operator docstring to explain what exactly is pushed to XCom would be nice as well; the various split calls make it pretty difficult to follow what exactly is being extracted from the exception. (Why not just push str(e) and move extraction logic into downstream tasks, for example?)

@kolfild26
Copy link
Contributor Author

@uranusjr yes, sure, working on this

@kolfild26
Copy link
Contributor Author

kolfild26 commented Nov 3, 2022

@uranusjr

the various split calls make it pretty difficult to follow what exactly is being extracted from the exception. (Why not just push str(e) and move extraction logic into downstream tasks, for example?)

okaaay... I was actually thinking of these alternatives and previuosly decided to split fully in the operator)

But, okay I could accept that here I relied on too many tiny steps in which smth could be changed in future.

Assume Oracle throws the following:
ORA-28365: wallet is not open

Is that acceptable to just split the original output into two parts by : and push ORA-28365 to XCOM ❓
Doing so, code.split('-')[1] will disappear.

The reasons I'd like to cut the message is that

  1. It could be long length
  2. It's useless in XCOM and, if interested, it's still presented in logs.
    I mean ORA-code needs for logic, text output only needs for debuging.

@kolfild26
Copy link
Contributor Author

kolfild26 commented Nov 3, 2022

@uranusjr
Can you run pre-commit locally and

doing this first time, I'm a bit stuck. Could you plz clarify, do these pre-commit scripts only work with Airflow Breeze docker env through .yaml scenario?
Or could be run manually one by one? Doesn't look they could be.

@bdsoha
Copy link
Contributor

bdsoha commented Nov 3, 2022

@uranusjr

the various split calls make it pretty difficult to follow what exactly is being extracted from the exception. (Why not just push str(e) and move extraction logic into downstream tasks, for example?)

okaaay... I was actually thinking of these alternatives and previuosly decided to split fully in the operator)

But, okay I could accept that here I relied on too many tiny steps in which smth could be changed in future.

Assume Oracle throws the following: ORA-28365: wallet is not open

Is that acceptable to just split the original output into two parts by : and push ORA-28365 to XCOM ❓ Doing so, code.split('-')[1] will disappear.

The reasons I'd like to cut the message is that

  1. It could be long length
  2. It's useless in XCOM and, if interested, it's still presented in logs.
    I mean ORA-code needs for logic, text output only needs for debuging.

You can accomplish the same thing using regex:

import re

code = re.search("^ORA-(\\d+):.+", "ORA-28365: wallet is not open").group(1)

@kolfild26
Copy link
Contributor Author

@bdsoha

You can accomplish the same thing using regex:

import re
code = re.search("^ORA-(\\d+):.+", "ORA-28365: wallet is not open").group(1)

Yes, but above were the suggestion by @uranusjr not to split so much at all, no matter which way is used:

Why not just push str(e) and move extraction logic into downstream tasks, for example

I prefer the way @bdsoha (to split). So if no objections to this I would commit re.search

@uranusjr
Copy link
Member

uranusjr commented Nov 3, 2022

If we are sure oracledb always emits errors in this format, splitting out the part before : makes sense. Using re is also a good idea; it is more readable, and it’s easier to catch cases where somehow the format is different (we can simply not push the XCom in this case).

@kolfild26
Copy link
Contributor Author

  1. Changed to re.search() in ora exit parsing
  2. description added

Can you run pre-commit locally and fix linting errors,

  1. There were some static check failures since the last run #step:6:304. Now fixed, locally tested via breeze static-checks -t run-mypy:
breeze static-checks -t run-mypy --all-files
Checking pre-commit installed for /home/userml/.local/pipx/venvs/apache-airflow-breeze/bin/python

Package pre_commit is installed. Good version 2.20.0 (>= 2.0.0)

Good version of Docker: 20.10.21.
Good version of docker-compose: 2.12.2
Good Docker context used: default.
Run mypy for dev.........................................................Passed
Run mypy for core........................................................Passed
Run mypy for providers...................................................Passed
Run mypy for /docs/ folder...............................................Passed
  1. u-tests in progress. ⏳

Comment on lines 106 to 109
if code_match is not None:
if self.do_xcom_push:
if ti is not None:
ti.xcom_push(key="ORA", value=code_match.group(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this is cognitively complex.

Suggested change
if code_match is not None:
if self.do_xcom_push:
if ti is not None:
ti.xcom_push(key="ORA", value=code_match.group(1))
if code_match and self.do_xcom_push and ti:
ti.xcom_push(key="ORA", value=code_match.group(1))

@kolfild26
Copy link
Contributor Author

@bdsoha applied.
@eladkal Some tests failed during the last run in pre-commit run isort, pre-commit run run-mypy, pre-commit run black.
I ran locally again. Fixed isort and mypy errors. black did not point to any errors in my env.

Also I ran breeze testing tests --test-type "Providers[oracle]" and one existing (not mine) test failed:

tests/providers/oracle/operators/test_oracle.py:70:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Task(OracleStoredProcedureOperator): test_task_id>
context = 'test_context'

    def execute(self, context: Context):
        self.log.info("Executing: %s", self.procedure)
        hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
>       ti=context.get("task_instance")
      AttributeError: 'str' object has no attribute 'get'

Here is context mocked as str. Surely it has no get method.

@bdsoha
Copy link
Contributor

bdsoha commented Nov 9, 2022

Tests are failing since test_context is a string and not an object.

class TestOracleOperator(unittest.TestCase):
@mock.patch("airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator.get_db_hook")
def test_execute(self, mock_get_db_hook):
sql = "SELECT * FROM test_table"
oracle_conn_id = "oracle_default"
parameters = {"parameter": "value"}
autocommit = False
context = "test_context"
task_id = "test_task_id"

@kolfild26
Copy link
Contributor Author

Tests are failing since test_context is a string and not an object.

that's what I wrote above.

@kolfild26
Copy link
Contributor Author

@bdsoha
It looks like we could pass **context to test_execute.
def test_execute(self, mock_run, **context):
And then delete context = "test_context"

Tests breeze testing tests --test-type "Providers[oracle]" have completed successfully with this change.
I don't know what was the purpose of context = "test_context". It's not used further in test.

@potiuk
Copy link
Member

potiuk commented Nov 14, 2022

Needstests to be fixed.

kolfild26 and others added 6 commits December 3, 2022 20:56
Motivation:
    #23787

ORA exit codes is controlled by the DB developer and could be a part of a dag logic.
So in order to get access to these codes in dag, I propose an easy enhancement - push ORA exit code to XCOM when DatabaseError occurs.

One file changed - providers/oracle/operators/oracle.py
Co-authored-by: Dov Benyomin Sohacheski <b@kloud.email>
@kolfild26
Copy link
Contributor Author

Hi, I'm a bit stuck with the unit test and will update once it's resolved.

@potiuk
Copy link
Member

potiuk commented Dec 6, 2022

Hi, I'm a bit stuck with the unit test and will update once it's resolved.

OK. Cool

@kolfild26
Copy link
Contributor Author

kolfild26 commented Dec 11, 2022

Hi

I'm a bit stuck with the unit test and will update once it's resolved.

Finally it's done. Please review.

Files modified:
airflow/providers/oracle/operators/oracle.py
tests/providers/oracle/operators/test_oracle.py

In test_oracle.py a new method test_push_oracle_exit_to_xcom added.

In oracle.py added some check that context is available. If so, try to pull from xcom. It's needed as other tests don't use context when running task.execute() and we need to keep them working.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants